You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/05 23:52:41 UTC
[07/15] flink git commit: [FLINK-5884] [table] Integrate time
indicators for Table API & SQL. Continued
[FLINK-5884] [table] Integrate time indicators for Table API & SQL. Continued
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24bf61ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24bf61ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24bf61ce
Branch: refs/heads/master
Commit: 24bf61ceb332f2db2dc4bab624b73beffae1160a
Parents: 495f104
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu May 4 18:05:27 2017 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat May 6 01:51:54 2017 +0200
----------------------------------------------------------------------
.../flink/table/api/BatchTableEnvironment.scala | 25 +--
.../table/api/StreamTableEnvironment.scala | 86 ++++++++--
.../flink/table/api/TableEnvironment.scala | 64 +-------
.../flink/table/calcite/FlinkTypeFactory.scala | 17 +-
.../table/expressions/ExpressionParser.scala | 18 +-
.../table/plan/logical/LogicalWindow.scala | 2 +-
.../flink/table/plan/logical/groupWindows.scala | 11 +-
.../flink/table/plan/nodes/CommonCalc.scala | 10 +-
.../plan/nodes/PhysicalTableSourceScan.scala | 2 +-
.../plan/nodes/dataset/DataSetAggregate.scala | 2 +-
.../table/plan/nodes/dataset/DataSetCalc.scala | 5 +-
.../datastream/DataStreamOverAggregate.scala | 5 +-
.../datastream/StreamTableSourceScan.scala | 28 +---
.../logical/FlinkLogicalTableSourceScan.scala | 32 +++-
.../plan/schema/StreamTableSourceTable.scala | 65 ++++++++
.../table/runtime/aggregate/AggregateUtil.scala | 3 +-
.../table/sources/DefinedTimeAttributes.scala | 47 ++++--
.../flink/table/TableEnvironmentTest.scala | 52 ++----
.../api/scala/batch/table/GroupWindowTest.scala | 4 +-
.../stream/StreamTableEnvironmentTest.scala | 164 +++++++++++++++++++
.../api/scala/stream/TableSourceTest.scala | 154 +++++++++++++++++
.../scala/stream/sql/WindowAggregateTest.scala | 59 ++++---
22 files changed, 628 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 3eb2ffc..02c6063 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -31,7 +31,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.expressions.{Expression, RowtimeAttribute, TimeAttribute}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
@@ -196,26 +196,11 @@ abstract class BatchTableEnvironment(
val (fieldNames, fieldIndexes) = getFieldInfo[T](
dataSet.getType,
- fields,
- ignoreTimeAttributes = true)
+ fields)
- // validate and extract time attributes
- val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)
-
- // don't allow proctime on batch
- proctime match {
- case Some(_) =>
- throw new ValidationException(
- "A proctime attribute is not allowed in a batch environment. " +
- "Working with processing-time on batch would lead to non-deterministic results.")
- case _ => // ok
- }
- // rowtime must not extend the schema of a batch table
- rowtime match {
- case Some((idx, _)) if idx >= dataSet.getType.getArity =>
- throw new ValidationException(
- "A rowtime attribute must be defined on an existing field in a batch environment.")
- case _ => // ok
+ if (fields.exists(_.isInstanceOf[TimeAttribute])) {
+ throw new ValidationException(
+ ".rowtime and .proctime time indicators are not allowed in a batch environment.")
}
val dataSetTable = new DataSetTable[T](dataSet, fieldIndexes, fieldNames)
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index d1f2fb5..dd2c09d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -26,19 +26,21 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{RuleSet, RuleSets}
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.calcite.RelTimeIndicatorConverter
import org.apache.flink.table.explain.PlanJsonParser
-import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference}
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
import org.apache.flink.table.plan.rules.FlinkRuleSets
-import org.apache.flink.table.plan.schema.{DataStreamTable, TableSourceTable}
+import org.apache.flink.table.plan.schema.{DataStreamTable, StreamTableSourceTable, TableSourceTable}
import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
+import org.apache.flink.table.typeutils.TypeCheckUtils
import org.apache.flink.types.Row
import _root_.scala.collection.JavaConverters._
@@ -99,7 +101,7 @@ abstract class StreamTableEnvironment(
tableSource match {
case streamTableSource: StreamTableSource[_] =>
- registerTableInternal(name, new TableSourceTable(streamTableSource))
+ registerTableInternal(name, new StreamTableSourceTable(streamTableSource))
case _ =>
throw new TableException("Only StreamTableSource can be registered in " +
"StreamTableEnvironment")
@@ -168,14 +170,13 @@ abstract class StreamTableEnvironment(
fields: Array[Expression])
: Unit = {
- val (fieldNames, fieldIndexes) = getFieldInfo[T](
- dataStream.getType,
- fields,
- ignoreTimeAttributes = false)
+ val streamType = dataStream.getType
- // validate and extract time attributes
- val (rowtime, proctime) = validateAndExtractTimeAttributes(fieldNames, fieldIndexes, fields)
+ // get field names and types for all non-replaced fields
+ val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, fields)
+ // validate and extract time attributes
+ val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields)
val dataStreamTable = new DataStreamTable[T](
dataStream,
@@ -188,6 +189,71 @@ abstract class StreamTableEnvironment(
}
/**
+ * Checks for at most one rowtime and proctime attribute.
+ * Returns the time attributes.
+ *
+ * @return rowtime attribute and proctime attribute
+ */
+ private def validateAndExtractTimeAttributes(
+ streamType: TypeInformation[_],
+ exprs: Array[Expression])
+ : (Option[(Int, String)], Option[(Int, String)]) = {
+
+ val fieldTypes: Array[TypeInformation[_]] = streamType match {
+ case c: CompositeType[_] => (0 until c.getArity).map(i => c.getTypeAt(i)).toArray
+ case a: AtomicType[_] => Array(a)
+ }
+
+ var fieldNames: List[String] = Nil
+ var rowtime: Option[(Int, String)] = None
+ var proctime: Option[(Int, String)] = None
+
+ exprs.zipWithIndex.foreach {
+ case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
+ if (rowtime.isDefined) {
+ throw new TableException(
+ "The rowtime attribute can only be defined once in a table schema.")
+ } else {
+ // check type of field that is replaced
+ if (idx < fieldTypes.length &&
+ !(TypeCheckUtils.isLong(fieldTypes(idx)) ||
+ TypeCheckUtils.isTimePoint(fieldTypes(idx)))) {
+ throw new TableException(
+ "The rowtime attribute can only be replace a field with a valid time type, such as " +
+ "Timestamp or Long.")
+ }
+ rowtime = Some(idx, name)
+ }
+ case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
+ if (proctime.isDefined) {
+ throw new TableException(
+ "The proctime attribute can only be defined once in a table schema.")
+ } else {
+ // check that proctime is only appended
+ if (idx < fieldTypes.length) {
+ throw new TableException(
+ "The proctime attribute can only be appended to the table schema and not replace " +
+ "an existing field. Please move it to the end of the schema.")
+ }
+ proctime = Some(idx, name)
+ }
+ case (u: UnresolvedFieldReference, _) => fieldNames = u.name :: fieldNames
+ }
+
+ if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) {
+ throw new TableException(
+ "The rowtime attribute may not have the same name as an another field.")
+ }
+
+ if (proctime.isDefined && fieldNames.contains(proctime.get._2)) {
+ throw new TableException(
+ "The proctime attribute may not have the same name as an another field.")
+ }
+
+ (rowtime, proctime)
+ }
+
+ /**
* Returns the decoration rule set for this environment
* including a custom RuleSet configuration.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 4c72e8f..9ed5000 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -601,50 +601,36 @@ abstract class TableEnvironment(val config: TableConfig) {
/**
* Returns field names and field positions for a given [[TypeInformation]] and [[Array]] of
- * [[Expression]]. It does not handle time attributes but considers them in indices, if
- * ignore flag is not false.
+ * [[Expression]]. It does not handle time attributes but considers them in indices.
*
* @param inputType The [[TypeInformation]] against which the [[Expression]]s are evaluated.
- * @param exprs The expressions that define the field names.
- * @param ignoreTimeAttributes ignore time attributes and handle them as regular expressions.
+ * @param exprs The expressions that define the field names.
* @tparam A The type of the TypeInformation.
* @return A tuple of two arrays holding the field names and corresponding field positions.
*/
protected[flink] def getFieldInfo[A](
inputType: TypeInformation[A],
- exprs: Array[Expression],
- ignoreTimeAttributes: Boolean)
+ exprs: Array[Expression])
: (Array[String], Array[Int]) = {
TableEnvironment.validateType(inputType)
- val filteredExprs = if (ignoreTimeAttributes) {
- exprs.map {
- case ta: TimeAttribute => ta.expression
- case e@_ => e
- }
- } else {
- exprs
- }
-
val indexedNames: Array[(Int, String)] = inputType match {
case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
throw new TableException(
"An input of GenericTypeInfo<Row> cannot be converted to Table. " +
"Please specify the type of the input with a RowTypeInfo.")
case a: AtomicType[A] =>
- filteredExprs.zipWithIndex flatMap {
+ exprs.zipWithIndex flatMap {
case (UnresolvedFieldReference(name), idx) =>
if (idx > 0) {
throw new TableException("Table of atomic type can only have a single field.")
}
Some((0, name))
- case (_: TimeAttribute, _) if ignoreTimeAttributes =>
- None
case _ => throw new TableException("Field reference expression requested.")
}
case t: TupleTypeInfo[A] =>
- filteredExprs.zipWithIndex flatMap {
+ exprs.zipWithIndex flatMap {
case (UnresolvedFieldReference(name), idx) =>
Some((idx, name))
case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
@@ -659,7 +645,7 @@ abstract class TableEnvironment(val config: TableConfig) {
"Field reference expression or alias on field expression expected.")
}
case c: CaseClassTypeInfo[A] =>
- filteredExprs.zipWithIndex flatMap {
+ exprs.zipWithIndex flatMap {
case (UnresolvedFieldReference(name), idx) =>
Some((idx, name))
case (Alias(UnresolvedFieldReference(origName), name, _), _) =>
@@ -674,7 +660,7 @@ abstract class TableEnvironment(val config: TableConfig) {
"Field reference expression or alias on field expression expected.")
}
case p: PojoTypeInfo[A] =>
- filteredExprs flatMap {
+ exprs flatMap {
case (UnresolvedFieldReference(name)) =>
val idx = p.getFieldIndex(name)
if (idx < 0) {
@@ -822,42 +808,6 @@ abstract class TableEnvironment(val config: TableConfig) {
Some(mapFunction)
}
- /**
- * Checks for at most one rowtime and proctime attribute.
- * Returns the time attributes.
- *
- * @return rowtime attribute and proctime attribute
- */
- protected def validateAndExtractTimeAttributes(
- fieldNames: Seq[String],
- fieldIndices: Seq[Int],
- exprs: Array[Expression])
- : (Option[(Int, String)], Option[(Int, String)]) = {
-
- var rowtime: Option[(Int, String)] = None
- var proctime: Option[(Int, String)] = None
-
- exprs.zipWithIndex.foreach {
- case (RowtimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
- if (rowtime.isDefined) {
- throw new TableException(
- "The rowtime attribute can only be defined once in a table schema.")
- } else {
- rowtime = Some(idx, name)
- }
- case (ProctimeAttribute(reference@UnresolvedFieldReference(name)), idx) =>
- if (proctime.isDefined) {
- throw new TableException(
- "The proctime attribute can only be defined once in a table schema.")
- } else {
- proctime = Some(idx, name)
- }
- case _ =>
- // do nothing
- }
-
- (rowtime, proctime)
- }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 001011b..9281ad8 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -65,11 +65,12 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
createSqlIntervalType(
new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
- case TimeIndicatorTypeInfo.ROWTIME_INDICATOR =>
- createRowtimeIndicatorType()
-
- case TimeIndicatorTypeInfo.PROCTIME_INDICATOR =>
- createProctimeIndicatorType()
+ case TIMESTAMP if typeInfo.isInstanceOf[TimeIndicatorTypeInfo] =>
+ if (typeInfo.asInstanceOf[TimeIndicatorTypeInfo].isEventTime) {
+ createRowtimeIndicatorType()
+ } else {
+ createProctimeIndicatorType()
+ }
case _ =>
createSqlType(sqlType)
@@ -114,9 +115,11 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
* @param fieldNames field names
* @param fieldTypes field types, every element is Flink's [[TypeInformation]]
* @param rowtime optional system field to indicate event-time; the index determines the index
- * in the final record and might replace an existing field
+ * in the final record. If the index is smaller than the number of specified
+ * fields, it shifts all following fields.
* @param proctime optional system field to indicate processing-time; the index determines the
- * index in the final record and might replace an existing field
+ * index in the final record. If the index is smaller than the number of
+ * specified fields, it shifts all following fields.
* @return a struct type with the input fieldNames, input fieldTypes, and system fields
*/
def buildLogicalRowType(
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
index c33f8fc..98580ba 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -98,11 +98,14 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val UNBOUNDED_ROW: Keyword = Keyword("unbounded_row")
lazy val UNBOUNDED_RANGE: Keyword = Keyword("unbounded_range")
lazy val ASIN: Keyword = Keyword("asin")
+ lazy val ROWTIME: Keyword = Keyword("rowtime")
+ lazy val PROCTIME: Keyword = Keyword("proctime")
def functionIdent: ExpressionParser.Parser[String] =
not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
not(SUM0) ~ not(STDDEV_POP) ~ not(STDDEV_SAMP) ~ not(VAR_POP) ~ not(VAR_SAMP) ~
not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~ not(IF) ~
+ not(ROWTIME) ~ not(PROCTIME) ~
not(CURRENT_ROW) ~ not(UNBOUNDED_ROW) ~ not(CURRENT_RANGE) ~ not(UNBOUNDED_RANGE) ~>
super.ident
@@ -532,12 +535,25 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
// alias
- lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
+ lazy val alias: PackratParser[Expression] = timeIndicator |
+ logic ~ AS ~ fieldReference ^^ {
case e ~ _ ~ name => Alias(e, name.name)
} | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.tail.map(_.name))
} | logic
+ // time indicators
+
+ lazy val timeIndicator: PackratParser[Expression] = procTime | rowTime
+
+ lazy val procTime: PackratParser[Expression] = fieldReference ~ "." ~ PROCTIME ^^ {
+ case f ~ _ ~ _ => ProctimeAttribute(f)
+ }
+
+ lazy val rowTime: PackratParser[Expression] = fieldReference ~ "." ~ ROWTIME ^^ {
+ case f ~ _ ~ _ => RowtimeAttribute(f)
+ }
+
lazy val expression: PackratParser[Expression] = alias |
failure("Invalid expression.")
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
index 92dc501..6161ef0 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
@@ -23,7 +23,7 @@ import org.apache.flink.table.expressions.{Expression, WindowReference}
import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
/**
- * Logical super class for all types of windows (group-windows and row-windows).
+ * Logical super class for group windows.
*
* @param aliasAttribute window alias
* @param timeAttribute time field indicating event-time or processing-time
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
index 3e5de28..4a8fb52 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/groupWindows.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.plan.logical
import org.apache.flink.table.api.{BatchTableEnvironment, StreamTableEnvironment, TableEnvironment}
import org.apache.flink.table.expressions.ExpressionUtils.{isRowCountLiteral, isRowtimeAttribute, isTimeAttribute, isTimeIntervalLiteral}
import org.apache.flink.table.expressions._
-import org.apache.flink.table.typeutils.TypeCheckUtils.isTimePoint
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isTimePoint, isLong}
import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
// ------------------------------------------------------------------------------------------------
@@ -56,7 +56,8 @@ case class TumblingGroupWindow(
case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
ValidationFailure(
"Tumbling window expects a time attribute for grouping in a stream environment.")
- case _: BatchTableEnvironment if isTimePoint(size.resultType) =>
+ case _: BatchTableEnvironment
+ if !(isTimePoint(timeField.resultType) || isLong(timeField.resultType)) =>
ValidationFailure(
"Tumbling window expects a time attribute for grouping in a stream environment.")
@@ -119,7 +120,8 @@ case class SlidingGroupWindow(
case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
ValidationFailure(
"Sliding window expects a time attribute for grouping in a stream environment.")
- case _: BatchTableEnvironment if isTimePoint(size.resultType) =>
+ case _: BatchTableEnvironment
+ if !(isTimePoint(timeField.resultType) || isLong(timeField.resultType)) =>
ValidationFailure(
"Sliding window expects a time attribute for grouping in a stream environment.")
@@ -169,7 +171,8 @@ case class SessionGroupWindow(
case _: StreamTableEnvironment if !isTimeAttribute(timeField) =>
ValidationFailure(
"Session window expects a time attribute for grouping in a stream environment.")
- case _: BatchTableEnvironment if isTimePoint(gap.resultType) =>
+ case _: BatchTableEnvironment
+ if !(isTimePoint(timeField.resultType) || isLong(timeField.resultType)) =>
ValidationFailure(
"Session window expects a time attribute for grouping in a stream environment.")
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
index 5c35129..ff5ffb2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala
@@ -34,11 +34,11 @@ import scala.collection.JavaConverters._
trait CommonCalc {
private[flink] def functionBody(
- generator: CodeGenerator,
- inputSchema: RowSchema,
- returnSchema: RowSchema,
- calcProgram: RexProgram,
- config: TableConfig)
+ generator: CodeGenerator,
+ inputSchema: RowSchema,
+ returnSchema: RowSchema,
+ calcProgram: RexProgram,
+ config: TableConfig)
: String = {
val expandedExpressions = calcProgram
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
index c18c3d1..dc7a0d6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/PhysicalTableSourceScan.scala
@@ -46,7 +46,7 @@ abstract class PhysicalTableSourceScan(
override def explainTerms(pw: RelWriter): RelWriter = {
val terms = super.explainTerms(pw)
- .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", "))
+ .item("fields", deriveRowType().getFieldNames.asScala.mkString(", "))
val sourceDesc = tableSource.explainSource()
if (sourceDesc.nonEmpty) {
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
index c22dc54..b53081c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -90,8 +90,8 @@ class DataSetAggregate(
override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = {
- val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val input = inputNode.asInstanceOf[DataSetRel]
+ val inputDS = input.translateToPlan(tableEnv)
val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo]
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
index 9e18082..5274fa1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetCalc.scala
@@ -103,10 +103,7 @@ class DataSetCalc(
body,
rowTypeInfo)
- val runner = new FlatMapRunner[Row, Row](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
+ val runner = calcMapFunction(genFunction)
inputDS.flatMap(runner).name(calcOpName(calcProgram, getExpressionString))
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 8eb9d40..db31f32 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -241,10 +241,7 @@ class DataStreamOverAggregate(
}
val precedingOffset =
- getLowerBoundary(
- logicWindow,
- overWindow,
- input) + (if (isRowsClause) 1 else 0)
+ getLowerBoundary(logicWindow, overWindow, input) + (if (isRowsClause) 1 else 0)
val processFunction = AggregateUtil.createBoundedOverProcessFunction(
generator,
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
index 5dc3da8..e34e416 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/StreamTableSourceScan.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.PhysicalTableSourceScan
import org.apache.flink.table.plan.schema.{RowSchema, TableSourceTable}
-import org.apache.flink.table.sources.{DefinedTimeAttributes, StreamTableSource, TableSource}
+import org.apache.flink.table.sources._
import org.apache.flink.types.Row
/** Flink RelNode to read data from an external source defined by a [[StreamTableSource]]. */
@@ -41,35 +41,23 @@ class StreamTableSourceScan(
override def deriveRowType() = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- def removeIndex[T](idx: Int, l: List[T]): List[T] = {
- if (l.size < idx) {
- l
- } else {
- l.take(idx) ++ l.drop(idx + 1)
- }
- }
+ val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
+ val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
- var fieldNames = TableEnvironment.getFieldNames(tableSource).toList
- var fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
+ val fieldCnt = fieldNames.length
val rowtime = tableSource match {
- case timeSource: DefinedTimeAttributes if timeSource.getRowtimeAttribute != null =>
+ case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null =>
val rowtimeAttribute = timeSource.getRowtimeAttribute
- // remove physical field if it is overwritten by time attribute
- fieldNames = removeIndex(rowtimeAttribute.f0, fieldNames)
- fieldTypes = removeIndex(rowtimeAttribute.f0, fieldTypes)
- Some((rowtimeAttribute.f0, rowtimeAttribute.f1))
+ Some((fieldCnt, rowtimeAttribute))
case _ =>
None
}
val proctime = tableSource match {
- case timeSource: DefinedTimeAttributes if timeSource.getProctimeAttribute != null =>
+ case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null =>
val proctimeAttribute = timeSource.getProctimeAttribute
- // remove physical field if it is overwritten by time attribute
- fieldNames = removeIndex(proctimeAttribute.f0, fieldNames)
- fieldTypes = removeIndex(proctimeAttribute.f0, fieldTypes)
- Some((proctimeAttribute.f0, proctimeAttribute.f1))
+ Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
case _ =>
None
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
index 53e7b31..a2777ec 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableSourceScan.scala
@@ -29,7 +29,7 @@ import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.schema.TableSourceTable
-import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, TableSource}
import scala.collection.JavaConverters._
@@ -47,11 +47,33 @@ class FlinkLogicalTableSourceScan(
override def deriveRowType(): RelDataType = {
val flinkTypeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+
+ val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
+ val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
+
+ val fieldCnt = fieldNames.length
+
+ val rowtime = tableSource match {
+ case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null =>
+ val rowtimeAttribute = timeSource.getRowtimeAttribute
+ Some((fieldCnt, rowtimeAttribute))
+ case _ =>
+ None
+ }
+
+ val proctime = tableSource match {
+ case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null =>
+ val proctimeAttribute = timeSource.getProctimeAttribute
+ Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
+ case _ =>
+ None
+ }
+
flinkTypeFactory.buildLogicalRowType(
- TableEnvironment.getFieldNames(tableSource),
- TableEnvironment.getFieldTypes(tableSource.getReturnType),
- None,
- None)
+ fieldNames,
+ fieldTypes,
+ rowtime,
+ proctime)
}
override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
new file mode 100644
index 0000000..75deca5
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/StreamTableSourceTable.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.schema
+
+import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, TableSource}
+
+class StreamTableSourceTable[T](
+ override val tableSource: TableSource[T],
+ override val statistic: FlinkStatistic = FlinkStatistic.UNKNOWN)
+ extends TableSourceTable[T](tableSource, statistic) {
+
+
+ override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = {
+ val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+
+ val fieldNames = TableEnvironment.getFieldNames(tableSource).toList
+ val fieldTypes = TableEnvironment.getFieldTypes(tableSource.getReturnType).toList
+
+ val fieldCnt = fieldNames.length
+
+ val rowtime = tableSource match {
+ case timeSource: DefinedRowTimeAttribute if timeSource.getRowtimeAttribute != null =>
+ val rowtimeAttribute = timeSource.getRowtimeAttribute
+ Some((fieldCnt, rowtimeAttribute))
+ case _ =>
+ None
+ }
+
+ val proctime = tableSource match {
+ case timeSource: DefinedProcTimeAttribute if timeSource.getProctimeAttribute != null =>
+ val proctimeAttribute = timeSource.getProctimeAttribute
+ Some((fieldCnt + (if (rowtime.isDefined) 1 else 0), proctimeAttribute))
+ case _ =>
+ None
+ }
+
+ flinkTypeFactory.buildLogicalRowType(
+ fieldNames,
+ fieldTypes,
+ rowtime,
+ proctime)
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index 07992cd..dfed34a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -81,7 +81,6 @@ object AggregateUtil {
isRowsClause: Boolean)
: ProcessFunction[Row, Row] = {
- val needRetract = false
val (aggFields, aggregates) =
transformToAggregateFunctions(
namedAggregates.map(_.getKey),
@@ -107,7 +106,7 @@ object AggregateUtil {
None,
None,
outputArity,
- needRetract,
+ needRetract = false,
needMerge = false,
needReset = false
)
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
index 8466cdf..6d87663 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/DefinedTimeAttributes.scala
@@ -18,30 +18,43 @@
package org.apache.flink.table.sources
-import org.apache.flink.api.java.tuple.Tuple2
-
/**
- * Defines logical time attributes for a [[TableSource]]. Time attributes can be used for
- * indicating, accessing, and working with Flink's event-time or processing-time. A
- * [[TableSource]] that implements this interface can define names and positions of rowtime
- * and proctime attributes in the rows it produces.
+ * Defines a logical event-time attribute for a [[TableSource]].
+ * The event-time attribute can be used for indicating, accessing, and working with Flink's
+ * event-time.
+ *
+ * A [[TableSource]] that implements this interface defines the name of
+ * the event-time attribute. The attribute will be added to the schema of the
+ * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
*/
-trait DefinedTimeAttributes {
+trait DefinedRowTimeAttribute {
/**
- * Defines a name and position (starting at 0) of rowtime attribute that represents Flink's
- * event-time. Null if no rowtime should be available. If the position is within the arity of
- * the result row, the logical attribute will overwrite the physical attribute. If the position
- * is higher than the result row, the time attribute will be appended logically.
+ * Defines a name of the event-time attribute that represents Flink's
+ * event-time. Null if no rowtime should be available.
+ *
+ * The field will be appended to the schema provided by the [[TableSource]].
*/
- def getRowtimeAttribute: Tuple2[Int, String]
+ def getRowtimeAttribute: String
+}
+
+/**
+ * Defines a logical processing-time attribute for a [[TableSource]].
+ * The processing-time attribute can be used for indicating, accessing, and working with Flink's
+ * processing-time.
+ *
+ * A [[TableSource]] that implements this interface defines the name of
+ * the processing-time attribute. The attribute will be added to the schema of the
+ * [[org.apache.flink.table.api.Table]] produced by the [[TableSource]].
+ */
+trait DefinedProcTimeAttribute {
/**
- * Defines a name and position (starting at 0) of proctime attribute that represents Flink's
- * processing-time. Null if no proctime should be available. If the position is within the arity
- * of the result row, the logical attribute will overwrite the physical attribute. If the
- * position is higher than the result row, the time attribute will be appended logically.
+ * Defines a name of the processing-time attribute that represents Flink's
+ * processing-time. Null if no rowtime should be available.
+ *
+ * The field will be appended to the schema provided by the [[TableSource]].
*/
- def getProctimeAttribute: Tuple2[Int, String]
+ def getProctimeAttribute: String
}
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index faacc54..5247685 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -93,8 +93,7 @@ class TableEnvironmentTest extends TableTestBase {
UnresolvedFieldReference("name1"),
UnresolvedFieldReference("name2"),
UnresolvedFieldReference("name3")
- ),
- ignoreTimeAttributes = true)
+ ))
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -108,8 +107,7 @@ class TableEnvironmentTest extends TableTestBase {
UnresolvedFieldReference("name1"),
UnresolvedFieldReference("name2"),
UnresolvedFieldReference("name3")
- ),
- ignoreTimeAttributes = true)
+ ))
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -123,8 +121,7 @@ class TableEnvironmentTest extends TableTestBase {
UnresolvedFieldReference("name1"),
UnresolvedFieldReference("name2"),
UnresolvedFieldReference("name3")
- ),
- ignoreTimeAttributes = true)
+ ))
}
@Test
@@ -135,8 +132,7 @@ class TableEnvironmentTest extends TableTestBase {
UnresolvedFieldReference("pf3"),
UnresolvedFieldReference("pf1"),
UnresolvedFieldReference("pf2")
- ),
- ignoreTimeAttributes = true)
+ ))
fieldInfo._1.zip(Array("pf3", "pf1", "pf2")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -146,9 +142,7 @@ class TableEnvironmentTest extends TableTestBase {
def testGetFieldInfoAtomicName1(): Unit = {
val fieldInfo = tEnv.getFieldInfo(
atomicType,
- Array(UnresolvedFieldReference("name")),
- ignoreTimeAttributes = true
- )
+ Array(UnresolvedFieldReference("name")))
fieldInfo._1.zip(Array("name")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
@@ -161,8 +155,7 @@ class TableEnvironmentTest extends TableTestBase {
Array(
UnresolvedFieldReference("name1"),
UnresolvedFieldReference("name2")
- ),
- ignoreTimeAttributes = true)
+ ))
}
@Test
@@ -173,8 +166,7 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("f0"), "name1"),
Alias(UnresolvedFieldReference("f1"), "name2"),
Alias(UnresolvedFieldReference("f2"), "name3")
- ),
- ignoreTimeAttributes = true)
+ ))
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -188,8 +180,7 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("f2"), "name1"),
Alias(UnresolvedFieldReference("f0"), "name2"),
Alias(UnresolvedFieldReference("f1"), "name3")
- ),
- ignoreTimeAttributes = true)
+ ))
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -203,8 +194,7 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("xxx"), "name1"),
Alias(UnresolvedFieldReference("yyy"), "name2"),
Alias(UnresolvedFieldReference("zzz"), "name3")
- ),
- ignoreTimeAttributes = true)
+ ))
}
@Test
@@ -215,8 +205,7 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("cf1"), "name1"),
Alias(UnresolvedFieldReference("cf2"), "name2"),
Alias(UnresolvedFieldReference("cf3"), "name3")
- ),
- ignoreTimeAttributes = true)
+ ))
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -230,8 +219,7 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("cf3"), "name1"),
Alias(UnresolvedFieldReference("cf1"), "name2"),
Alias(UnresolvedFieldReference("cf2"), "name3")
- ),
- ignoreTimeAttributes = true)
+ ))
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -245,8 +233,7 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("xxx"), "name1"),
Alias(UnresolvedFieldReference("yyy"), "name2"),
Alias(UnresolvedFieldReference("zzz"), "name3")
- ),
- ignoreTimeAttributes = true)
+ ))
}
@Test
@@ -257,8 +244,7 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("pf1"), "name1"),
Alias(UnresolvedFieldReference("pf2"), "name2"),
Alias(UnresolvedFieldReference("pf3"), "name3")
- ),
- ignoreTimeAttributes = true)
+ ))
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -272,8 +258,7 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("pf3"), "name1"),
Alias(UnresolvedFieldReference("pf1"), "name2"),
Alias(UnresolvedFieldReference("pf2"), "name3")
- ),
- ignoreTimeAttributes = true)
+ ))
fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1))
fieldInfo._2.zip(Array(2, 0, 1)).foreach(x => assertEquals(x._2, x._1))
@@ -287,8 +272,7 @@ class TableEnvironmentTest extends TableTestBase {
Alias(UnresolvedFieldReference("xxx"), "name1"),
Alias(UnresolvedFieldReference("yyy"), "name2"),
Alias(UnresolvedFieldReference("zzz"), "name3")
- ),
- ignoreTimeAttributes = true)
+ ))
}
@Test(expected = classOf[TableException])
@@ -297,16 +281,14 @@ class TableEnvironmentTest extends TableTestBase {
atomicType,
Array(
Alias(UnresolvedFieldReference("name1"), "name2")
- ),
- ignoreTimeAttributes = true)
+ ))
}
@Test(expected = classOf[TableException])
def testGetFieldInfoGenericRowAlias(): Unit = {
tEnv.getFieldInfo(
genericRowType,
- Array(UnresolvedFieldReference("first")),
- ignoreTimeAttributes = true)
+ Array(UnresolvedFieldReference("first")))
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
index c481105..aa6edd3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
@@ -100,7 +100,7 @@ class GroupWindowTest extends TableTestBase {
@Test
def testEventTimeTumblingGroupWindowOverCount(): Unit = {
val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
val windowedTable = table
.window(Tumble over 2.rows on 'long as 'w)
@@ -144,7 +144,7 @@ class GroupWindowTest extends TableTestBase {
@Test
def testEventTimeTumblingGroupWindowOverTime(): Unit = {
val util = batchTestUtil()
- val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
val windowedTable = table
.window(Tumble over 5.milli on 'long as 'w)
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
new file mode 100644
index 0000000..e9384c7
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import java.lang.{Integer => JInt, Long => JLong}
+import java.util.Collections
+import java.util.{List => JList}
+
+import org.apache.flink.api.java.tuple.{Tuple5 => JTuple5}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecEnv}
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
+import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+import org.mockito.Mockito.{mock, when}
+
+class StreamTableEnvironmentTest extends TableTestBase {
+
+ @Test(expected = classOf[TableException])
+ def testInvalidProctimeAttribute(): Unit = {
+ val util = streamTestUtil()
+ // cannot replace an attribute with proctime
+ util.addTable[(Long, Int, String, Int, Long)]('a, 'b.proctime, 'c, 'd, 'e)
+ }
+
+ @Test
+ def testProctimeAttribute(): Unit = {
+ val util = streamTestUtil()
+ // cannot replace an attribute with proctime
+ util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt.proctime)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRowtimeAttributeReplaceFieldOfInvalidType(): Unit = {
+ val util = streamTestUtil()
+ // cannot replace a non-time attribute with rowtime
+ util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c.rowtime, 'd, 'e)
+ }
+
+ @Test
+ def testReplacedRowtimeAttribute(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'e)
+ }
+
+ @Test
+ def testAppendedRowtimeAttribute(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rt.rowtime)
+ }
+
+ @Test
+ def testRowtimeAndProctimeAttribute1(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rt.rowtime, 'pt.proctime)
+ }
+
+ @Test
+ def testRowtimeAndProctimeAttribute2(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt.proctime, 'rt.rowtime)
+ }
+
+ @Test
+ def testRowtimeAndProctimeAttribute3(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'e, 'pt.proctime)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRowtimeAndInvalidProctimeAttribute(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int, String, Int, Long)]('rt.rowtime, 'b, 'c, 'd, 'pt.proctime)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testOnlyOneRowtimeAttribute1(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int, String, Int, Long)]('a.rowtime, 'b, 'c, 'd, 'e, 'rt.rowtime)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testOnlyOneProctimeAttribute1(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'pt1.proctime, 'pt2.proctime)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testRowtimeAttributeUsedName(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'a.rowtime)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testProctimeAttributeUsedName(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'b.proctime)
+ }
+
+ @Test
+ def testProctimeAttributeParsed(): Unit = {
+ val (jTEnv, ds) = prepareSchemaExpressionParser
+ jTEnv.fromDataStream(ds, "a, b, c, d, e, pt.proctime")
+ }
+
+ @Test
+ def testReplacingRowtimeAttributeParsed(): Unit = {
+ val (jTEnv, ds) = prepareSchemaExpressionParser
+ jTEnv.fromDataStream(ds, "a.rowtime, b, c, d, e")
+ }
+
+ @Test
+ def testAppedingRowtimeAttributeParsed(): Unit = {
+ val (jTEnv, ds) = prepareSchemaExpressionParser
+ jTEnv.fromDataStream(ds, "a, b, c, d, e, rt.rowtime")
+ }
+
+ @Test
+ def testRowtimeAndProctimeAttributeParsed1(): Unit = {
+ val (jTEnv, ds) = prepareSchemaExpressionParser
+ jTEnv.fromDataStream(ds, "a, b, c, d, e, pt.proctime, rt.rowtime")
+ }
+
+ @Test
+ def testRowtimeAndProctimeAttributeParsed2(): Unit = {
+ val (jTEnv, ds) = prepareSchemaExpressionParser
+ jTEnv.fromDataStream(ds, "rt.rowtime, b, c, d, e, pt.proctime")
+ }
+
+ private def prepareSchemaExpressionParser:
+ (JStreamTableEnv, DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]) = {
+
+ val jTEnv = TableEnvironment.getTableEnvironment(mock(classOf[JStreamExecEnv]))
+
+ val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
+ .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
+ val ds = mock(classOf[DataStream[JTuple5[JLong, JInt, String, JInt, JLong]]])
+ when(ds.getType).thenReturn(sType)
+
+ (jTEnv, ds)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
new file mode 100644
index 0000000..7673266
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.{DefinedProcTimeAttribute, DefinedRowTimeAttribute, StreamTableSource}
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil.{term, unaryNode}
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class TableSourceTest extends TableTestBase {
+
+ @Test
+ def testRowTimeTableSourceSimple(): Unit = {
+ val util = streamTestUtil()
+ util.tEnv.registerTableSource("rowTimeT", new TestRowTimeSource("addTime"))
+
+ val t = util.tEnv.scan("rowTimeT").select("addTime, id, name, val")
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])",
+ term("select", "addTime", "id", "name", "val")
+ )
+ util.verifyTable(t, expected)
+ }
+
+ @Test
+ def testRowTimeTableSourceGroupWindow(): Unit = {
+ val util = streamTestUtil()
+ util.tEnv.registerTableSource("rowTimeT", new TestRowTimeSource("addTime"))
+
+ val t = util.tEnv.scan("rowTimeT")
+ .filter("val > 100")
+ .window(Tumble over 10.minutes on 'addTime as 'w)
+ .groupBy('name, 'w)
+ .select('name, 'w.end, 'val.avg)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ "StreamTableSourceScan(table=[[rowTimeT]], fields=[id, val, name, addTime])",
+ term("select", "name", "val", "addTime"),
+ term("where", ">(val, 100)")
+ ),
+ term("groupBy", "name"),
+ term("window", "TumblingGroupWindow(WindowReference(w), 'addTime, 600000.millis)"),
+ term("select", "name", "AVG(val) AS TMP_1", "end(WindowReference(w)) AS TMP_0")
+ ),
+ term("select", "name", "TMP_0", "TMP_1")
+ )
+ util.verifyTable(t, expected)
+ }
+
+ @Test
+ def testProcTimeTableSourceSimple(): Unit = {
+ val util = streamTestUtil()
+ util.tEnv.registerTableSource("procTimeT", new TestProcTimeSource("pTime"))
+
+ val t = util.tEnv.scan("procTimeT").select("pTime, id, name, val")
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
+ term("select", "pTime", "id", "name", "val")
+ )
+ util.verifyTable(t, expected)
+ }
+
+ @Test
+ def testProcTimeTableSourceOverWindow(): Unit = {
+ val util = streamTestUtil()
+ util.tEnv.registerTableSource("procTimeT", new TestProcTimeSource("pTime"))
+
+ val t = util.tEnv.scan("procTimeT")
+ .window(Over partitionBy 'id orderBy 'pTime preceding 2.hours as 'w)
+ .select('id, 'name, 'val.sum over 'w as 'valSum)
+ .filter('valSum > 100)
+
+ val expected =
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamOverAggregate",
+ "StreamTableSourceScan(table=[[procTimeT]], fields=[id, val, name, pTime])",
+ term("partitionBy", "id"),
+ term("orderBy", "pTime"),
+ term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
+ term("select", "id", "val", "name", "pTime", "SUM(val) AS w0$o0")
+ ),
+ term("select", "id", "name", "w0$o0 AS valSum"),
+ term("where", ">(w0$o0, 100)")
+ )
+ util.verifyTable(t, expected)
+ }
+}
+
+class TestRowTimeSource(timeField: String)
+ extends StreamTableSource[Row] with DefinedRowTimeAttribute {
+
+ override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
+
+ override def getRowtimeAttribute: String = timeField
+
+ override def getReturnType: TypeInformation[Row] = {
+ new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "val", "name"))
+ }
+}
+
+class TestProcTimeSource(timeField: String)
+ extends StreamTableSource[Row] with DefinedProcTimeAttribute {
+
+ override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = ???
+
+ override def getProctimeAttribute: String = timeField
+
+ override def getReturnType: TypeInformation[Row] = {
+ new RowTypeInfo(
+ Array(Types.INT, Types.LONG, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
+ Array("id", "val", "name"))
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/24bf61ce/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index edf7b1d..f84ae3d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala._
import org.apache.flink.table.plan.logical._
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
-import org.junit.{Ignore, Test}
+import org.junit.Test
class WindowAggregateTest extends TableTestBase {
private val streamUtil: StreamTableTestUtil = streamTestUtil()
@@ -85,7 +85,6 @@ class WindowAggregateTest extends TableTestBase {
}
@Test
- @Ignore // TODO enable once CALCITE-1761 is fixed
def testTumbleFunction() = {
streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
@@ -98,24 +97,23 @@ class WindowAggregateTest extends TableTestBase {
"GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
val expected =
unaryNode(
- "DataStreamCalc",
+ "DataStreamAggregate",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamCalc",
streamTableNode(0),
- term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
- term("select",
- "COUNT(*) AS EXPR$0, " +
- "weightedAvg(c, a) AS wAvg, " +
- "start('w$) AS w$start, " +
- "end('w$) AS w$end")
+ term("select", "1970-01-01 00:00:00 AS $f0", "c", "a")
),
- term("select", "EXPR$0, wAvg, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
+ term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+ term("select",
+ "COUNT(*) AS EXPR$0, " +
+ "weightedAvg(c, a) AS wAvg, " +
+ "start('w$) AS w$start, " +
+ "end('w$) AS w$end")
)
streamUtil.verifySql(sql, expected)
}
@Test
- @Ignore // TODO enable once CALCITE-1761 is fixed
def testHoppingFunction() = {
streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
@@ -127,24 +125,23 @@ class WindowAggregateTest extends TableTestBase {
"GROUP BY HOP(proctime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR)"
val expected =
unaryNode(
- "DataStreamCalc",
+ "DataStreamAggregate",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamCalc",
streamTableNode(0),
- term("window", SlidingGroupWindow('w$, 'rowtime, 3600000.millis, 900000.millis)),
- term("select",
- "COUNT(*) AS EXPR$0, " +
- "weightedAvg(c, a) AS wAvg, " +
- "start('w$) AS w$start, " +
- "end('w$) AS w$end")
+ term("select", "1970-01-01 00:00:00 AS $f0", "c", "a")
),
- term("select", "EXPR$0, wAvg, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
+ term("window", SlidingGroupWindow('w$, 'proctime, 3600000.millis, 900000.millis)),
+ term("select",
+ "COUNT(*) AS EXPR$0, " +
+ "weightedAvg(c, a) AS wAvg, " +
+ "start('w$) AS w$start, " +
+ "end('w$) AS w$end")
)
streamUtil.verifySql(sql, expected)
}
@Test
- @Ignore // TODO enable once CALCITE-1761 is fixed
def testSessionFunction() = {
streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
@@ -157,18 +154,18 @@ class WindowAggregateTest extends TableTestBase {
"GROUP BY SESSION(proctime, INTERVAL '15' MINUTE)"
val expected =
unaryNode(
- "DataStreamCalc",
+ "DataStreamAggregate",
unaryNode(
- "DataStreamAggregate",
+ "DataStreamCalc",
streamTableNode(0),
- term("window", SessionGroupWindow('w$, 'rowtime, 900000.millis)),
- term("select",
- "COUNT(*) AS EXPR$0, " +
- "weightedAvg(c, a) AS wAvg, " +
- "start('w$) AS w$start, " +
- "end('w$) AS w$end")
+ term("select", "1970-01-01 00:00:00 AS $f0", "c", "a")
),
- term("select", "EXPR$0, wAvg, CAST(w$start) AS w$start, CAST(w$end) AS w$end")
+ term("window", SessionGroupWindow('w$, 'proctime, 900000.millis)),
+ term("select",
+ "COUNT(*) AS EXPR$0, " +
+ "weightedAvg(c, a) AS wAvg, " +
+ "start('w$) AS w$start, " +
+ "end('w$) AS w$end")
)
streamUtil.verifySql(sql, expected)
}