You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/10/26 21:11:25 UTC
[3/5] flink git commit: [FLINK-4691] [table] Add group-windows for
streaming tables to Table API.
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
index a692c9e..56b5b5e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
@@ -32,7 +32,7 @@ import org.apache.flink.api.table.validate._
case class CharLength(child: Expression) extends UnaryExpression {
override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
- override private[flink] def validateInput(): ExprValidationResult = {
+ override private[flink] def validateInput(): ValidationResult = {
if (child.resultType == STRING_TYPE_INFO) {
ValidationSuccess
} else {
@@ -55,7 +55,7 @@ case class CharLength(child: Expression) extends UnaryExpression {
case class InitCap(child: Expression) extends UnaryExpression {
override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
- override private[flink] def validateInput(): ExprValidationResult = {
+ override private[flink] def validateInput(): ValidationResult = {
if (child.resultType == STRING_TYPE_INFO) {
ValidationSuccess
} else {
@@ -80,7 +80,7 @@ case class Like(str: Expression, pattern: Expression) extends BinaryExpression {
override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
- override private[flink] def validateInput(): ExprValidationResult = {
+ override private[flink] def validateInput(): ValidationResult = {
if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) {
ValidationSuccess
} else {
@@ -102,7 +102,7 @@ case class Like(str: Expression, pattern: Expression) extends BinaryExpression {
case class Lower(child: Expression) extends UnaryExpression {
override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
- override private[flink] def validateInput(): ExprValidationResult = {
+ override private[flink] def validateInput(): ValidationResult = {
if (child.resultType == STRING_TYPE_INFO) {
ValidationSuccess
} else {
@@ -127,7 +127,7 @@ case class Similar(str: Expression, pattern: Expression) extends BinaryExpressio
override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
- override private[flink] def validateInput(): ExprValidationResult = {
+ override private[flink] def validateInput(): ValidationResult = {
if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) {
ValidationSuccess
} else {
@@ -179,7 +179,7 @@ case class Trim(
override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
- override private[flink] def validateInput(): ExprValidationResult = {
+ override private[flink] def validateInput(): ValidationResult = {
trimMode match {
case SymbolExpression(_: TrimMode) =>
if (trimString.resultType != STRING_TYPE_INFO) {
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
index 488fd33..cd5ca0a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
@@ -29,8 +29,8 @@ import org.apache.flink.api.table.FlinkRelBuilder
import org.apache.flink.api.table.expressions.ExpressionUtils.{divide, getFactor, mod}
import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, TypeCheckUtils}
-import org.apache.flink.api.table.validate.{ExprValidationResult, ValidationFailure, ValidationSuccess}
+import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils}
+import org.apache.flink.api.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess}
import scala.collection.JavaConversions._
@@ -40,7 +40,7 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
- override private[flink] def validateInput(): ExprValidationResult = {
+ override private[flink] def validateInput(): ValidationResult = {
if (!TypeCheckUtils.isTemporal(temporal.resultType)) {
return ValidationFailure(s"Extract operator requires Temporal input, " +
s"but $temporal is of type ${temporal.resultType}")
@@ -52,8 +52,8 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
| SymbolExpression(TimeIntervalUnit.DAY)
if temporal.resultType == SqlTimeTypeInfo.DATE
|| temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
- || temporal.resultType == IntervalTypeInfo.INTERVAL_MILLIS
- || temporal.resultType == IntervalTypeInfo.INTERVAL_MONTHS =>
+ || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS
+ || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MONTHS =>
ValidationSuccess
case SymbolExpression(TimeIntervalUnit.HOUR)
@@ -61,7 +61,7 @@ case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends E
| SymbolExpression(TimeIntervalUnit.SECOND)
if temporal.resultType == SqlTimeTypeInfo.TIME
|| temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
- || temporal.resultType == IntervalTypeInfo.INTERVAL_MILLIS =>
+ || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS =>
ValidationSuccess
case _ =>
@@ -146,7 +146,7 @@ abstract class TemporalCeilFloor(
override private[flink] def resultType: TypeInformation[_] = temporal.resultType
- override private[flink] def validateInput(): ExprValidationResult = {
+ override private[flink] def validateInput(): ValidationResult = {
if (!TypeCheckUtils.isTimePoint(temporal.resultType)) {
return ValidationFailure(s"Temporal ceil/floor operator requires Time Point input, " +
s"but $temporal is of type ${temporal.resultType}")
@@ -211,7 +211,7 @@ abstract class CurrentTimePoint(
override private[flink] def resultType: TypeInformation[_] = targetType
- override private[flink] def validateInput(): ExprValidationResult = {
+ override private[flink] def validateInput(): ValidationResult = {
if (!TypeCheckUtils.isTimePoint(targetType)) {
ValidationFailure(s"CurrentTimePoint operator requires Time Point target type, " +
s"but get $targetType.")
@@ -293,7 +293,7 @@ case class TemporalOverlaps(
override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
- override private[flink] def validateInput(): ExprValidationResult = {
+ override private[flink] def validateInput(): ValidationResult = {
if (!TypeCheckUtils.isTimePoint(leftTimePoint.resultType)) {
return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint to be of type " +
s"Time Point, but get ${leftTimePoint.resultType}.")
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/windowProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/windowProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/windowProperties.scala
new file mode 100644
index 0000000..8386c46
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/windowProperties.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.api.table.validate.{ValidationFailure, ValidationSuccess}
+
+abstract class WindowProperty(child: Expression) extends UnaryExpression {
+
+ override def toString = s"WindowProperty($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+ throw new UnsupportedOperationException("WindowProperty cannot be transformed to RexNode.")
+
+ override private[flink] def validateInput() =
+ if (child.isInstanceOf[WindowReference]) {
+ ValidationSuccess
+ } else {
+ ValidationFailure("Child must be a window reference.")
+ }
+
+ private[flink] def toNamedWindowProperty(name: String)(implicit relBuilder: RelBuilder)
+ : NamedWindowProperty = NamedWindowProperty(name, this)
+}
+
+case class WindowStart(child: Expression) extends WindowProperty(child) {
+
+ override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
+
+ override def toString: String = s"start($child)"
+}
+
+case class WindowEnd(child: Expression) extends WindowProperty(child) {
+
+ override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
+
+ override def toString: String = s"end($child)"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
new file mode 100644
index 0000000..2299bd1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.plan.logical.LogicalNode
+
+import scala.collection.mutable.ListBuffer
+
+object ProjectionTranslator {
+
+ /**
+ * Extracts all aggregation and property expressions (zero, one, or more) from an expression,
+ * and replaces the original expressions by field accesses expressions.
+ */
+ def extractAggregationsAndProperties(
+ exp: Expression,
+ tableEnv: TableEnvironment)
+ : (Expression, List[NamedExpression], List[NamedExpression]) = {
+
+ exp match {
+ case agg: Aggregation =>
+ val name = tableEnv.createUniqueAttributeName()
+ val aggCall = Alias(agg, name)
+ val fieldExp = UnresolvedFieldReference(name)
+ (fieldExp, List(aggCall), Nil)
+ case prop: WindowProperty =>
+ val name = tableEnv.createUniqueAttributeName()
+ val propCall = Alias(prop, name)
+ val fieldExp = UnresolvedFieldReference(name)
+ (fieldExp, Nil, List(propCall))
+ case n @ Alias(agg: Aggregation, name) =>
+ val fieldExp = UnresolvedFieldReference(name)
+ (fieldExp, List(n), Nil)
+ case n @ Alias(prop: WindowProperty, name) =>
+ val fieldExp = UnresolvedFieldReference(name)
+ (fieldExp, Nil, List(n))
+ case l: LeafExpression =>
+ (l, Nil, Nil)
+ case u: UnaryExpression =>
+ val c = extractAggregationsAndProperties(u.child, tableEnv)
+ (u.makeCopy(Array(c._1)), c._2, c._3)
+ case b: BinaryExpression =>
+ val l = extractAggregationsAndProperties(b.left, tableEnv)
+ val r = extractAggregationsAndProperties(b.right, tableEnv)
+ (b.makeCopy(Array(l._1, r._1)),
+ l._2 ::: r._2,
+ l._3 ::: r._3)
+
+ // Functions calls
+ case c @ Call(name, args) =>
+ val newArgs = args.map(extractAggregationsAndProperties(_, tableEnv))
+ (c.makeCopy((name :: newArgs.map(_._1) :: Nil).toArray),
+ newArgs.flatMap(_._2).toList,
+ newArgs.flatMap(_._3).toList)
+
+ case sfc @ ScalarFunctionCall(clazz, args) =>
+ val newArgs = args.map(extractAggregationsAndProperties(_, tableEnv))
+ (sfc.makeCopy((clazz :: newArgs.map(_._1) :: Nil).toArray),
+ newArgs.flatMap(_._2).toList,
+ newArgs.flatMap(_._3).toList)
+
+ // General expression
+ case e: Expression =>
+ val newArgs = e.productIterator.map {
+ case arg: Expression =>
+ extractAggregationsAndProperties(arg, tableEnv)
+ }
+ (e.makeCopy(newArgs.map(_._1).toArray),
+ newArgs.flatMap(_._2).toList,
+ newArgs.flatMap(_._3).toList)
+ }
+ }
+
+ /**
+ * Parses all input expressions to [[UnresolvedAlias]].
+ * And expands star to parent's full project list.
+ */
+ def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): Seq[NamedExpression] = {
+ val projectList = new ListBuffer[NamedExpression]
+ exprs.foreach {
+ case n: UnresolvedFieldReference if n.name == "*" =>
+ projectList ++= parent.output.map(UnresolvedAlias(_))
+ case e: Expression => projectList += UnresolvedAlias(e)
+ }
+ projectList
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
deleted file mode 100644
index eb40bba..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan
-
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.plan.logical.LogicalNode
-
-import scala.collection.mutable.ListBuffer
-
-object RexNodeTranslator {
-
- /**
- * Extracts all aggregation expressions (zero, one, or more) from an expression,
- * and replaces the original aggregation expressions by field accesses expressions.
- */
- def extractAggregations(
- exp: Expression,
- tableEnv: TableEnvironment): Pair[Expression, List[NamedExpression]] = {
-
- exp match {
- case agg: Aggregation =>
- val name = tableEnv.createUniqueAttributeName()
- val aggCall = Alias(agg, name)
- val fieldExp = UnresolvedFieldReference(name)
- (fieldExp, List(aggCall))
- case n @ Alias(agg: Aggregation, name) =>
- val fieldExp = UnresolvedFieldReference(name)
- (fieldExp, List(n))
- case l: LeafExpression =>
- (l, Nil)
- case u: UnaryExpression =>
- val c = extractAggregations(u.child, tableEnv)
- (u.makeCopy(Array(c._1)), c._2)
- case b: BinaryExpression =>
- val l = extractAggregations(b.left, tableEnv)
- val r = extractAggregations(b.right, tableEnv)
- (b.makeCopy(Array(l._1, r._1)), l._2 ::: r._2)
-
- // Functions calls
- case c @ Call(name, args) =>
- val newArgs = args.map(extractAggregations(_, tableEnv))
- (c.makeCopy((name :: newArgs.map(_._1) :: Nil).toArray), newArgs.flatMap(_._2).toList)
-
- case sfc @ ScalarFunctionCall(clazz, args) =>
- val newArgs = args.map(extractAggregations(_, tableEnv))
- (sfc.makeCopy((clazz :: newArgs.map(_._1) :: Nil).toArray), newArgs.flatMap(_._2).toList)
-
- // General expression
- case e: Expression =>
- val newArgs = e.productIterator.map {
- case arg: Expression =>
- extractAggregations(arg, tableEnv)
- }
- (e.makeCopy(newArgs.map(_._1).toArray), newArgs.flatMap(_._2).toList)
- }
- }
-
- /**
- * Parses all input expressions to [[UnresolvedAlias]].
- * And expands star to parent's full project list.
- */
- def expandProjectList(exprs: Seq[Expression], parent: LogicalNode): Seq[NamedExpression] = {
- val projectList = new ListBuffer[NamedExpression]
- exprs.foreach {
- case n: UnresolvedFieldReference if n.name == "*" =>
- projectList ++= parent.output.map(UnresolvedAlias(_))
- case e: Expression => projectList += UnresolvedAlias(e)
- }
- projectList
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
index dae02bd..55fba07 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
@@ -19,8 +19,7 @@ package org.apache.flink.api.table.plan.logical
import org.apache.calcite.rel.RelNode
import org.apache.calcite.tools.RelBuilder
-
-import org.apache.flink.api.table.{TableEnvironment, ValidationException}
+import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment, ValidationException}
import org.apache.flink.api.table.expressions._
import org.apache.flink.api.table.trees.TreeNode
import org.apache.flink.api.table.typeutils.TypeCoercion
@@ -54,7 +53,7 @@ abstract class LogicalNode extends TreeNode[LogicalNode] {
// resolve references and function calls
val exprResolved = expressionPostOrderTransform {
case u @ UnresolvedFieldReference(name) =>
- resolveReference(name).getOrElse(u)
+ resolveReference(tableEnv, name).getOrElse(u)
case c @ Call(name, children) if c.childrenValid =>
tableEnv.getFunctionCatalog.lookupFunction(name, children)
}
@@ -84,7 +83,7 @@ abstract class LogicalNode extends TreeNode[LogicalNode] {
resolvedNode.expressionPostOrderTransform {
case a: Attribute if !a.valid =>
val from = children.flatMap(_.output).map(_.name).mkString(", ")
- failValidation(s"cannot resolve [${a.name}] given input [$from]")
+ failValidation(s"Cannot resolve [${a.name}] given input [$from].")
case e: Expression if e.validateInput().isFailure =>
failValidation(s"Expression $e failed on input check: " +
@@ -96,12 +95,12 @@ abstract class LogicalNode extends TreeNode[LogicalNode] {
* Resolves the given strings to a [[NamedExpression]] using the input from all child
* nodes of this LogicalPlan.
*/
- def resolveReference(name: String): Option[NamedExpression] = {
+ def resolveReference(tableEnv: TableEnvironment, name: String): Option[NamedExpression] = {
val childrenOutput = children.flatMap(_.output)
val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(name))
if (candidates.length > 1) {
- failValidation(s"Reference $name is ambiguous")
- } else if (candidates.length == 0) {
+ failValidation(s"Reference $name is ambiguous.")
+ } else if (candidates.isEmpty) {
None
} else {
Some(candidates.head.withName(name))
@@ -133,6 +132,7 @@ abstract class LogicalNode extends TreeNode[LogicalNode] {
case e: Expression => expressionPostOrderTransform(e)
case other => other
}
+ case r: Resolvable[_] => r.resolveExpressions(e => expressionPostOrderTransform(e))
case other: AnyRef => other
}.toArray
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalWindow.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalWindow.scala
new file mode 100644
index 0000000..19fd603
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalWindow.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions.{Expression, WindowReference}
+import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+abstract class LogicalWindow(val alias: Option[Expression]) extends Resolvable[LogicalWindow] {
+
+ def resolveExpressions(resolver: (Expression) => Expression): LogicalWindow = this
+
+ def validate(tableEnv: TableEnvironment): ValidationResult = alias match {
+ case Some(WindowReference(_)) => ValidationSuccess
+ case Some(_) => ValidationFailure("Window reference for window expected.")
+ case None => ValidationSuccess
+ }
+
+ override def toString: String = getClass.getSimpleName
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala
new file mode 100644
index 0000000..7540d43
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.table.expressions.Expression
+
+/**
+ * A class implementing this interface can resolve the expressions of its parameters and
+ * return a new instance with resolved parameters. This is necessary if expression are nested in
+ * a not supported structure. By default, the validation of a logical node can resolve common
+ * structures like `Expression`, `Option[Expression]`, `Traversable[Expression]`.
+ *
+ * See also [[LogicalNode.expressionPostOrderTransform(scala.PartialFunction)]].
+ *
+ * @tparam T class which expression parameters need to be resolved
+ */
+trait Resolvable[T <: AnyRef] {
+
+ /**
+ * An implementing class can resolve its expressions by applying the given resolver
+ * function on its parameters.
+ *
+ * @param resolver function that can resolve an expression
+ * @return class with resolved expression parameters
+ */
+ def resolveExpressions(resolver: (Expression) => Expression): T
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala
new file mode 100644
index 0000000..aeb9676
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.{BatchTableEnvironment, StreamTableEnvironment, TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeCoercion}
+import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+abstract class EventTimeGroupWindow(
+ name: Option[Expression],
+ time: Expression)
+ extends LogicalWindow(name) {
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult = {
+ val valid = super.validate(tableEnv)
+ if (valid.isFailure) {
+ return valid
+ }
+
+ tableEnv match {
+ case _: StreamTableEnvironment =>
+ time match {
+ case RowtimeAttribute() =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure("Event-time window expects a 'rowtime' time field.")
+ }
+ case _: BatchTableEnvironment =>
+ if (!TypeCoercion.canCast(time.resultType, BasicTypeInfo.LONG_TYPE_INFO)) {
+ ValidationFailure(s"Event-time window expects a time field that can be safely cast " +
+ s"to Long, but is ${time.resultType}")
+ } else {
+ ValidationSuccess
+ }
+ }
+
+ }
+}
+
+abstract class ProcessingTimeGroupWindow(name: Option[Expression]) extends LogicalWindow(name)
+
+// ------------------------------------------------------------------------------------------------
+// Tumbling group windows
+// ------------------------------------------------------------------------------------------------
+
+object TumblingGroupWindow {
+ def validate(tableEnv: TableEnvironment, size: Expression): ValidationResult = size match {
+ case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+ ValidationSuccess
+ case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure("Tumbling window expects size literal of type Interval of Milliseconds " +
+ "or Interval of Rows.")
+ }
+}
+
+case class ProcessingTimeTumblingGroupWindow(
+ name: Option[Expression],
+ size: Expression)
+ extends ProcessingTimeGroupWindow(name) {
+
+ override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
+ ProcessingTimeTumblingGroupWindow(
+ name.map(resolve),
+ resolve(size))
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult =
+ super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, size))
+
+ override def toString: String = s"ProcessingTimeTumblingGroupWindow($name, $size)"
+}
+
+case class EventTimeTumblingGroupWindow(
+ name: Option[Expression],
+ timeField: Expression,
+ size: Expression)
+ extends EventTimeGroupWindow(
+ name,
+ timeField) {
+
+ override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
+ EventTimeTumblingGroupWindow(
+ name.map(resolve),
+ resolve(timeField),
+ resolve(size))
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult =
+ super.validate(tableEnv)
+ .orElse(TumblingGroupWindow.validate(tableEnv, size))
+ .orElse(size match {
+ case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
+ ValidationFailure(
+ "Event-time grouping windows on row intervals are currently not supported.")
+ case _ =>
+ ValidationSuccess
+ })
+
+ override def toString: String = s"EventTimeTumblingGroupWindow($name, $timeField, $size)"
+}
+
+// ------------------------------------------------------------------------------------------------
+// Sliding group windows
+// ------------------------------------------------------------------------------------------------
+
+object SlidingGroupWindow {
+ def validate(
+ tableEnv: TableEnvironment,
+ size: Expression,
+ slide: Expression)
+ : ValidationResult = {
+
+ val checkedSize = size match {
+ case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+ ValidationSuccess
+ case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure("Sliding window expects size literal of type Interval of " +
+ "Milliseconds or Interval of Rows.")
+ }
+
+ val checkedSlide = slide match {
+ case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+ ValidationSuccess
+ case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure("Sliding window expects slide literal of type Interval of " +
+ "Milliseconds or Interval of Rows.")
+ }
+
+ checkedSize
+ .orElse(checkedSlide)
+ .orElse {
+ if (size.resultType != slide.resultType) {
+ ValidationFailure("Sliding window expects same type of size and slide.")
+ } else {
+ ValidationSuccess
+ }
+ }
+ }
+}
+
+case class ProcessingTimeSlidingGroupWindow(
+ name: Option[Expression],
+ size: Expression,
+ slide: Expression)
+ extends ProcessingTimeGroupWindow(name) {
+
+ override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
+ ProcessingTimeSlidingGroupWindow(
+ name.map(resolve),
+ resolve(size),
+ resolve(slide))
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult =
+ super.validate(tableEnv).orElse(SlidingGroupWindow.validate(tableEnv, size, slide))
+
+ override def toString: String = s"ProcessingTimeSlidingGroupWindow($name, $size, $slide)"
+}
+
+case class EventTimeSlidingGroupWindow(
+ name: Option[Expression],
+ timeField: Expression,
+ size: Expression,
+ slide: Expression)
+ extends EventTimeGroupWindow(name, timeField) {
+
+ override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
+ EventTimeSlidingGroupWindow(
+ name.map(resolve),
+ resolve(timeField),
+ resolve(size),
+ resolve(slide))
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult =
+ super.validate(tableEnv)
+ .orElse(SlidingGroupWindow.validate(tableEnv, size, slide))
+ .orElse(size match {
+ case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
+ ValidationFailure(
+ "Event-time grouping windows on row intervals are currently not supported.")
+ case _ =>
+ ValidationSuccess
+ })
+
+ override def toString: String = s"EventTimeSlidingGroupWindow($name, $timeField, $size, $slide)"
+}
+
+// ------------------------------------------------------------------------------------------------
+// Session group windows
+// ------------------------------------------------------------------------------------------------
+
+object SessionGroupWindow {
+
+ def validate(tableEnv: TableEnvironment, gap: Expression): ValidationResult = gap match {
+ case Literal(timeInterval: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
+ ValidationSuccess
+ case _ =>
+ ValidationFailure(
+ "Session window expects gap literal of type Interval of Milliseconds.")
+ }
+}
+
+case class ProcessingTimeSessionGroupWindow(
+ name: Option[Expression],
+ gap: Expression)
+ extends ProcessingTimeGroupWindow(name) {
+
+ override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
+ ProcessingTimeSessionGroupWindow(
+ name.map(resolve),
+ resolve(gap))
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult =
+ super.validate(tableEnv).orElse(SessionGroupWindow.validate(tableEnv, gap))
+
+ override def toString: String = s"ProcessingTimeSessionGroupWindow($name, $gap)"
+}
+
+case class EventTimeSessionGroupWindow(
+ name: Option[Expression],
+ timeField: Expression,
+ gap: Expression)
+ extends EventTimeGroupWindow(
+ name,
+ timeField) {
+
+ override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
+ EventTimeSessionGroupWindow(
+ name.map(resolve),
+ resolve(timeField),
+ resolve(gap))
+
+ override def validate(tableEnv: TableEnvironment): ValidationResult =
+ super.validate(tableEnv).orElse(SessionGroupWindow.validate(tableEnv, gap))
+
+ override def toString: String = s"EventTimeSessionGroupWindow($name, $timeField, $gap)"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index 066e9d6..1d7ed5f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.api.table._
import org.apache.flink.api.table.expressions._
import org.apache.flink.api.table.typeutils.TypeConverter
+import org.apache.flink.api.table.validate.{ValidationFailure, ValidationSuccess}
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -55,28 +56,27 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
override def validate(tableEnv: TableEnvironment): LogicalNode = {
val resolvedProject = super.validate(tableEnv).asInstanceOf[Project]
+ val names: mutable.Set[String] = mutable.Set()
- def checkUniqueNames(exprs: Seq[Expression]): Unit = {
- val names: mutable.Set[String] = mutable.Set()
- exprs.foreach {
- case n: Alias =>
- // explicit name
- if (names.contains(n.name)) {
- throw ValidationException(s"Duplicate field name ${n.name}.")
- } else {
- names.add(n.name)
- }
- case r: ResolvedFieldReference =>
- // simple field forwarding
- if (names.contains(r.name)) {
- throw ValidationException(s"Duplicate field name ${r.name}.")
- } else {
- names.add(r.name)
- }
- case _ => // Do nothing
+ def checkName(name: String): Unit = {
+ if (names.contains(name)) {
+ failValidation(s"Duplicate field name $name.")
+ } else if (tableEnv.isInstanceOf[StreamTableEnvironment] && name == "rowtime") {
+ failValidation("'rowtime' cannot be used as field name in a streaming environment.")
+ } else {
+ names.add(name)
}
}
- checkUniqueNames(resolvedProject.projectList)
+
+ resolvedProject.projectList.foreach {
+ case n: Alias =>
+ // explicit name
+ checkName(n.name)
+ case r: ResolvedFieldReference =>
+ // simple field forwarding
+ checkName(r.name)
+ case _ => // Do nothing
+ }
resolvedProject
}
@@ -112,6 +112,10 @@ case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends Una
failValidation("Alias only accept name expressions as arguments")
} else if (!aliasList.forall(_.asInstanceOf[UnresolvedFieldReference].name != "*")) {
failValidation("Alias can not accept '*' as name")
+ } else if (tableEnv.isInstanceOf[StreamTableEnvironment] && !aliasList.forall {
+ case UnresolvedFieldReference(name) => name != "rowtime"
+ }) {
+ failValidation("'rowtime' cannot be used as field name in a streaming environment.")
} else {
val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
val input = child.output
@@ -498,3 +502,105 @@ case class LogicalRelNode(
override def validate(tableEnv: TableEnvironment): LogicalNode = this
}
+
+case class WindowAggregate(
+ groupingExpressions: Seq[Expression],
+ window: LogicalWindow,
+ propertyExpressions: Seq[NamedExpression],
+ aggregateExpressions: Seq[NamedExpression],
+ child: LogicalNode)
+ extends UnaryNode {
+
+ override def output: Seq[Attribute] = {
+ (groupingExpressions ++ aggregateExpressions ++ propertyExpressions) map {
+ case ne: NamedExpression => ne.toAttribute
+ case e => Alias(e, e.toString).toAttribute
+ }
+ }
+
+ // resolve references of this operator's parameters
+ override def resolveReference(
+ tableEnv: TableEnvironment,
+ name: String)
+ : Option[NamedExpression] = tableEnv match {
+ // resolve reference to rowtime attribute in a streaming environment
+ case _: StreamTableEnvironment if name == "rowtime" =>
+ Some(RowtimeAttribute())
+ case _ =>
+ window.alias match {
+ // resolve reference to this window's alias
+ case Some(UnresolvedFieldReference(alias)) if name == alias =>
+ // check if reference can already be resolved by input fields
+ val found = super.resolveReference(tableEnv, name)
+ if (found.isDefined) {
+ failValidation(s"Reference $name is ambiguous.")
+ } else {
+ Some(WindowReference(name))
+ }
+ case _ =>
+ // resolve references as usual
+ super.resolveReference(tableEnv, name)
+ }
+ }
+
+ override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
+ val flinkRelBuilder = relBuilder.asInstanceOf[FlinkRelBuilder]
+ child.construct(flinkRelBuilder)
+ flinkRelBuilder.aggregate(
+ window,
+ relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
+ propertyExpressions.map {
+ case Alias(prop: WindowProperty, name) => prop.toNamedWindowProperty(name)(relBuilder)
+ case _ => throw new RuntimeException("This should never happen.")
+ },
+ aggregateExpressions.map {
+ case Alias(agg: Aggregation, name) => agg.toAggCall(name)(relBuilder)
+ case _ => throw new RuntimeException("This should never happen.")
+ }.asJava)
+ }
+
+ override def validate(tableEnv: TableEnvironment): LogicalNode = {
+ val resolvedWindowAggregate = super.validate(tableEnv).asInstanceOf[WindowAggregate]
+ val groupingExprs = resolvedWindowAggregate.groupingExpressions
+ val aggregateExprs = resolvedWindowAggregate.aggregateExpressions
+ aggregateExprs.foreach(validateAggregateExpression)
+ groupingExprs.foreach(validateGroupingExpression)
+
+ def validateAggregateExpression(expr: Expression): Unit = expr match {
+ // check no nested aggregation exists.
+ case aggExpr: Aggregation =>
+ aggExpr.children.foreach { child =>
+ child.preOrderVisit {
+ case agg: Aggregation =>
+ failValidation(
+ "It's not allowed to use an aggregate function as " +
+ "input of another aggregate function")
+ case _ => // ok
+ }
+ }
+ case a: Attribute if !groupingExprs.exists(_.checkEquals(a)) =>
+ failValidation(
+ s"Expression '$a' is invalid because it is neither" +
+ " present in group by nor an aggregate function")
+ case e if groupingExprs.exists(_.checkEquals(e)) => // ok
+ case e => e.children.foreach(validateAggregateExpression)
+ }
+
+ def validateGroupingExpression(expr: Expression): Unit = {
+ if (!expr.resultType.isKeyType) {
+ failValidation(
+ s"Expression $expr cannot be used as a grouping expression " +
+ "because it's not a valid key type which must be hashable and comparable")
+ }
+ }
+
+ // validate window
+ resolvedWindowAggregate.window.validate(tableEnv) match {
+ case ValidationFailure(msg) =>
+ failValidation(s"$window is invalid: $msg")
+ case ValidationSuccess => // ok
+ }
+
+ resolvedWindowAggregate
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/rel/LogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/rel/LogicalWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/rel/LogicalWindowAggregate.scala
new file mode 100644
index 0000000..9615168
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/rel/LogicalWindowAggregate.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical.rel
+
+import java.util
+
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
+import org.apache.calcite.rel.{RelNode, RelShuttle}
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.api.table.FlinkTypeFactory
+import org.apache.flink.api.table.plan.logical.LogicalWindow
+
+class LogicalWindowAggregate(
+ window: LogicalWindow,
+ namedProperties: Seq[NamedWindowProperty],
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ child: RelNode,
+ indicator: Boolean,
+ groupSet: ImmutableBitSet,
+ groupSets: util.List[ImmutableBitSet],
+ aggCalls: util.List[AggregateCall])
+ extends Aggregate(
+ cluster,
+ traitSet,
+ child,
+ indicator,
+ groupSet,
+ groupSets,
+ aggCalls) {
+
+ def getWindow = window
+
+ def getNamedProperties = namedProperties
+
+ override def copy(
+ traitSet: RelTraitSet,
+ input: RelNode,
+ indicator: Boolean,
+ groupSet: ImmutableBitSet,
+ groupSets: util.List[ImmutableBitSet],
+ aggCalls: util.List[AggregateCall])
+ : Aggregate = {
+
+ new LogicalWindowAggregate(
+ window,
+ namedProperties,
+ cluster,
+ traitSet,
+ input,
+ indicator,
+ groupSet,
+ groupSets,
+ aggCalls)
+ }
+
+ override def accept(shuttle: RelShuttle): RelNode = shuttle.visit(this)
+
+ override def deriveRowType(): RelDataType = {
+ val aggregateRowType = super.deriveRowType()
+ val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ val builder = typeFactory.builder
+ builder.addAll(aggregateRowType.getFieldList)
+ namedProperties.foreach { namedProp =>
+ builder.add(
+ namedProp.name,
+ typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
+ )
+ }
+ builder.build()
+ }
+}
+
+object LogicalWindowAggregate {
+
+ def create(
+ window: LogicalWindow,
+ namedProperties: Seq[NamedWindowProperty],
+ aggregate: Aggregate)
+ : LogicalWindowAggregate = {
+
+ val cluster: RelOptCluster = aggregate.getCluster
+ val traitSet: RelTraitSet = cluster.traitSetOf(Convention.NONE)
+ new LogicalWindowAggregate(
+ window,
+ namedProperties,
+ cluster,
+ traitSet,
+ aggregate.getInput,
+ aggregate.indicator,
+ aggregate.getGroupSet,
+ aggregate.getGroupSets,
+ aggregate.getAggCallList)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkAggregate.scala
new file mode 100644
index 0000000..85129c4
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkAggregate.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+
+import scala.collection.JavaConverters._
+
+trait FlinkAggregate {
+
+ private[flink] def groupingToString(inputType: RelDataType, grouping: Array[Int]): String = {
+
+ val inFields = inputType.getFieldNames.asScala
+ grouping.map( inFields(_) ).mkString(", ")
+ }
+
+ private[flink] def aggregationToString(
+ inputType: RelDataType,
+ grouping: Array[Int],
+ rowType: RelDataType,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ namedProperties: Seq[NamedWindowProperty])
+ : String = {
+
+ val inFields = inputType.getFieldNames.asScala
+ val outFields = rowType.getFieldNames.asScala
+
+ val groupStrings = grouping.map( inFields(_) )
+
+ val aggs = namedAggregates.map(_.getKey)
+ val aggStrings = aggs.map( a => s"${a.getAggregation}(${
+ if (a.getArgList.size() > 0) {
+ inFields(a.getArgList.get(0))
+ } else {
+ "*"
+ }
+ })")
+
+ val propStrings = namedProperties.map(_.property.toString)
+
+ (groupStrings ++ aggStrings ++ propStrings).zip(outFields).map {
+ case (f, o) => if (f == o) {
+ f
+ } else {
+ s"$f AS $o"
+ }
+ }.mkString(", ")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
index dad50a3..a4c7589 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkRel.scala
@@ -19,6 +19,12 @@
package org.apache.flink.api.table.plan.nodes
import org.apache.calcite.rex._
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.CodeGenerator
+import org.apache.flink.api.table.runtime.MapRunner
+
import scala.collection.JavaConversions._
trait FlinkRel {
@@ -44,4 +50,41 @@ trait FlinkRel {
case _ => throw new IllegalArgumentException("Unknown expression type: " + expr)
}
}
+
+ private[flink] def getConversionMapper(
+ config: TableConfig,
+ nullableInput: Boolean,
+ inputType: TypeInformation[Any],
+ expectedType: TypeInformation[Any],
+ conversionOperatorName: String,
+ fieldNames: Seq[String],
+ inputPojoFieldMapping: Option[Array[Int]] = None)
+ : MapFunction[Any, Any] = {
+
+ val generator = new CodeGenerator(
+ config,
+ nullableInput,
+ inputType,
+ None,
+ inputPojoFieldMapping)
+ val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
+
+ val body =
+ s"""
+ |${conversion.code}
+ |return ${conversion.resultTerm};
+ |""".stripMargin
+
+ val genFunction = generator.generateFunction(
+ conversionOperatorName,
+ classOf[MapFunction[Any, Any]],
+ body,
+ expectedType)
+
+ new MapRunner[Any, Any](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
index c826d83..c73d781 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala
@@ -25,10 +25,11 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil
import org.apache.flink.api.table.runtime.aggregate.AggregateUtil.CalcitePair
import org.apache.flink.api.table.typeutils.{RowTypeInfo, TypeConverter}
-import org.apache.flink.api.table.{FlinkTypeFactory, BatchTableEnvironment, Row}
+import org.apache.flink.api.table.{BatchTableEnvironment, FlinkTypeFactory, Row}
import scala.collection.JavaConverters._
@@ -38,12 +39,13 @@ import scala.collection.JavaConverters._
class DataSetAggregate(
cluster: RelOptCluster,
traitSet: RelTraitSet,
- input: RelNode,
+ inputNode: RelNode,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
rowRelDataType: RelDataType,
inputType: RelDataType,
grouping: Array[Int])
- extends SingleRel(cluster, traitSet, input)
+ extends SingleRel(cluster, traitSet, inputNode)
+ with FlinkAggregate
with DataSetRel {
override def deriveRowType() = rowRelDataType
@@ -61,16 +63,16 @@ class DataSetAggregate(
override def toString: String = {
s"Aggregate(${ if (!grouping.isEmpty) {
- s"groupBy: ($groupingToString), "
+ s"groupBy: (${groupingToString(inputType, grouping)}), "
} else {
""
- }}}select:($aggregationToString))"
+ }}select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)}))"
}
override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
- .itemIf("groupBy",groupingToString, !grouping.isEmpty)
- .item("select", aggregationToString)
+ .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
+ .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil))
}
override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
@@ -90,8 +92,11 @@ class DataSetAggregate(
val groupingKeys = grouping.indices.toArray
// add grouping fields, position keys in the input, and input type
- val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
- inputType, getRowType, grouping, config)
+ val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(
+ namedAggregates,
+ inputType,
+ getRowType,
+ grouping)
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
tableEnv,
@@ -103,7 +108,7 @@ class DataSetAggregate(
.map(field => FlinkTypeFactory.toTypeInfo(field.getType))
.toArray
- val aggString = aggregationToString
+ val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil)
val prepareOpName = s"prepare select: ($aggString)"
val mappedInput = inputDS
.map(aggregateResult._1)
@@ -115,7 +120,8 @@ class DataSetAggregate(
val result = {
if (groupingKeys.length > 0) {
// grouped aggregation
- val aggOpName = s"groupBy: ($groupingToString), select:($aggString)"
+ val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+ s"select: ($aggString)"
mappedInput.asInstanceOf[DataSet[Row]]
.groupBy(groupingKeys: _*)
@@ -151,36 +157,4 @@ class DataSetAggregate(
case _ => result
}
}
-
- private def groupingToString: String = {
-
- val inFields = inputType.getFieldNames.asScala
- grouping.map( inFields(_) ).mkString(", ")
- }
-
- private def aggregationToString: String = {
-
- val inFields = inputType.getFieldNames.asScala
- val outFields = getRowType.getFieldNames.asScala
-
- val groupStrings = grouping.map( inFields(_) )
-
- val aggs = namedAggregates.map(_.getKey)
- val aggStrings = aggs.map( a => s"${a.getAggregation}(${
- if (a.getArgList.size() > 0) {
- inFields(a.getArgList.get(0))
- } else {
- "*"
- }
- })")
-
- (groupStrings ++ aggStrings).zip(outFields).map {
- case (f, o) => if (f == o) {
- f
- } else {
- s"$f AS $o"
- }
- }.mkString(", ")
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
index 39532f0..82c75e1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala
@@ -71,40 +71,4 @@ trait DataSetRel extends RelNode with FlinkRel {
}
- private[dataset] def getConversionMapper(
- config: TableConfig,
- nullableInput: Boolean,
- inputType: TypeInformation[Any],
- expectedType: TypeInformation[Any],
- conversionOperatorName: String,
- fieldNames: Seq[String],
- inputPojoFieldMapping: Option[Array[Int]] = None): MapFunction[Any, Any] = {
-
- val generator = new CodeGenerator(
- config,
- nullableInput,
- inputType,
- None,
- inputPojoFieldMapping)
- val conversion = generator.generateConverterResultExpression(expectedType, fieldNames)
-
- val body =
- s"""
- |${conversion.code}
- |return ${conversion.resultTerm};
- |""".stripMargin
-
- val genFunction = generator.generateFunction(
- conversionOperatorName,
- classOf[MapFunction[Any, Any]],
- body,
- expectedType)
-
- new MapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
new file mode 100644
index 0000000..b9b4561
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
+import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream, createNonKeyedWindowedStream, transformToPropertyReads}
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+import org.apache.flink.api.table.runtime.aggregate._
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, RowTypeInfo, TimeIntervalTypeInfo, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+ window: LogicalWindow,
+ namedProperties: Seq[NamedWindowProperty],
+ cluster: RelOptCluster,
+ traitSet: RelTraitSet,
+ inputNode: RelNode,
+ namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+ rowRelDataType: RelDataType,
+ inputType: RelDataType,
+ grouping: Array[Int])
+ extends SingleRel(cluster, traitSet, inputNode)
+ with FlinkAggregate
+ with DataStreamRel {
+
+ override def deriveRowType() = rowRelDataType
+
+ override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
+ new DataStreamAggregate(
+ window,
+ namedProperties,
+ cluster,
+ traitSet,
+ inputs.get(0),
+ namedAggregates,
+ getRowType,
+ inputType,
+ grouping)
+ }
+
+ override def toString: String = {
+ s"Aggregate(${
+ if (!grouping.isEmpty) {
+ s"groupBy: (${groupingToString(inputType, grouping)}), "
+ } else {
+ ""
+ }
+ }window: ($window), " +
+ s"select: (${
+ aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties)
+ }))"
+ }
+
+ override def explainTerms(pw: RelWriter): RelWriter = {
+ super.explainTerms(pw)
+ .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty)
+ .item("window", window)
+ .item("select", aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties))
+ }
+
+ override def translateToPlan(
+ tableEnv: StreamTableEnvironment,
+ expectedType: Option[TypeInformation[Any]])
+ : DataStream[Any] = {
+
+ val config = tableEnv.getConfig
+
+ val groupingKeys = grouping.indices.toArray
+ // add grouping fields, position keys in the input, and input type
+ val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(
+ namedAggregates,
+ inputType,
+ getRowType,
+ grouping)
+
+ val propertyReads = transformToPropertyReads(namedProperties.map(_.property))
+
+ val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(
+ tableEnv,
+ // tell the input operator that this operator currently only supports Rows as input
+ Some(TypeConverter.DEFAULT_ROW_TYPE))
+
+ // get the output types
+ val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
+ .map(field => FlinkTypeFactory.toTypeInfo(field.getType))
+ .toArray
+
+ val aggString = aggregationToString(
+ inputType,
+ grouping,
+ getRowType,
+ namedAggregates,
+ namedProperties)
+
+ val prepareOpName = s"prepare select: ($aggString)"
+ val mappedInput = inputDS
+ .map(aggregateResult._1)
+ .name(prepareOpName)
+
+ val groupReduceFunction = aggregateResult._2
+ val rowTypeInfo = new RowTypeInfo(fieldTypes)
+
+ val result = {
+ // grouped / keyed aggregation
+ if (groupingKeys.length > 0) {
+ val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " +
+ s"window: ($window), " +
+ s"select: ($aggString)"
+ val aggregateFunction = new AggregateWindowFunction(propertyReads, groupReduceFunction)
+
+ val keyedStream = mappedInput.keyBy(groupingKeys: _*)
+
+ val windowedStream = createKeyedWindowedStream(window, keyedStream)
+ .asInstanceOf[WindowedStream[Row, Tuple, DataStreamWindow]]
+
+ windowedStream
+ .apply(aggregateFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataStream[Any]]
+ }
+ // global / non-keyed aggregation
+ else {
+ val aggOpName = s"window: ($window), select: ($aggString)"
+ val aggregateFunction = new AggregateAllWindowFunction(propertyReads, groupReduceFunction)
+
+ val windowedStream = createNonKeyedWindowedStream(window, mappedInput)
+ .asInstanceOf[AllWindowedStream[Row, DataStreamWindow]]
+
+ windowedStream
+ .apply(aggregateFunction)
+ .returns(rowTypeInfo)
+ .name(aggOpName)
+ .asInstanceOf[DataStream[Any]]
+ }
+ }
+
+ // if the expected type is not a Row, inject a mapper to convert to the expected type
+ expectedType match {
+ case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
+ val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
+ result.map(getConversionMapper(
+ config = config,
+ nullableInput = false,
+ inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
+ expectedType = expectedType.get,
+ conversionOperatorName = "DataStreamAggregateConversion",
+ fieldNames = getRowType.getFieldNames.asScala
+ ))
+ .name(mapName)
+ case _ => result
+ }
+ }
+}
+
+object DataStreamAggregate {
+
+ private def transformToPropertyReads(namedProperties: Seq[WindowProperty])
+ : Array[WindowPropertyRead[_ <: Any]] = namedProperties.map {
+ case WindowStart(_) => new WindowStartRead()
+ case WindowEnd(_) => new WindowEndRead()
+ }.toArray
+
+ private def createKeyedWindowedStream(groupWindow: LogicalWindow, stream: KeyedStream[Row, Tuple])
+ : WindowedStream[Row, Tuple, _ <: DataStreamWindow] = groupWindow match {
+
+ case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
+ stream.window(TumblingProcessingTimeWindows.of(asTime(size)))
+
+ case ProcessingTimeTumblingGroupWindow(_, size) =>
+ stream.countWindow(asCount(size))
+
+ case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+ stream.window(TumblingEventTimeWindows.of(asTime(size)))
+
+ case EventTimeTumblingGroupWindow(_, _, size) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
+ "currently not supported.")
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
+ stream.window(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
+ stream.countWindow(asCount(size), asCount(slide))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+ stream.window(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
+ "currently not supported.")
+
+ case ProcessingTimeSessionGroupWindow(_, gap: Expression) =>
+ stream.window(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+
+ case EventTimeSessionGroupWindow(_, _, gap) =>
+ stream.window(EventTimeSessionWindows.withGap(asTime(gap)))
+ }
+
+ private def createNonKeyedWindowedStream(groupWindow: LogicalWindow, stream: DataStream[Row])
+ : AllWindowedStream[Row, _ <: DataStreamWindow] = groupWindow match {
+
+ case ProcessingTimeTumblingGroupWindow(_, size) if isTimeInterval(size.resultType) =>
+ stream.windowAll(TumblingProcessingTimeWindows.of(asTime(size)))
+
+ case ProcessingTimeTumblingGroupWindow(_, size) =>
+ stream.countWindowAll(asCount(size))
+
+ case EventTimeTumblingGroupWindow(_, _, size) if isTimeInterval(size.resultType) =>
+ stream.windowAll(TumblingEventTimeWindows.of(asTime(size)))
+
+ case EventTimeTumblingGroupWindow(_, _, size) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
+ "currently not supported.")
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) if isTimeInterval(size.resultType) =>
+ stream.windowAll(SlidingProcessingTimeWindows.of(asTime(size), asTime(slide)))
+
+ case ProcessingTimeSlidingGroupWindow(_, size, slide) =>
+ stream.countWindowAll(asCount(size), asCount(slide))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) if isTimeInterval(size.resultType) =>
+ stream.windowAll(SlidingEventTimeWindows.of(asTime(size), asTime(slide)))
+
+ case EventTimeSlidingGroupWindow(_, _, size, slide) =>
+ // TODO: EventTimeTumblingGroupWindow should sort the stream on event time
+ // before applying the windowing logic. Otherwise, this would be the same as a
+ // ProcessingTimeTumblingGroupWindow
+ throw new UnsupportedOperationException("Event-time grouping windows on row intervals are " +
+ "currently not supported.")
+
+ case ProcessingTimeSessionGroupWindow(_, gap) =>
+ stream.windowAll(ProcessingTimeSessionWindows.withGap(asTime(gap)))
+
+ case EventTimeSessionGroupWindow(_, _, gap) =>
+ stream.windowAll(EventTimeSessionWindows.withGap(asTime(gap)))
+ }
+
+ def asTime(expr: Expression): Time = expr match {
+ case Literal(value: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) => Time.milliseconds(value)
+ case _ => throw new IllegalArgumentException()
+ }
+
+ def asCount(expr: Expression): Long = expr match {
+ case Literal(value: Long, RowIntervalTypeInfo.INTERVAL_ROWS) => value
+ case _ => throw new IllegalArgumentException()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
index 3ed4385..638deac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala
@@ -146,6 +146,7 @@ object FlinkRuleSets {
UnionEliminatorRule.INSTANCE,
// translate to DataStream nodes
+ DataStreamAggregateRule.INSTANCE,
DataStreamCalcRule.INSTANCE,
DataStreamScanRule.INSTANCE,
DataStreamUnionRule.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
index 9f78adb..72ed27e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetAggregateRule.scala
@@ -40,13 +40,13 @@ class DataSetAggregateRule
// check if we have distinct aggregates
val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
if (distinctAggs) {
- throw new TableException("DISTINCT aggregates are currently not supported.")
+ throw TableException("DISTINCT aggregates are currently not supported.")
}
// check if we have grouping sets
val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
if (groupSets || agg.indicator) {
- throw new TableException("GROUPING SETS are currently not supported.")
+ throw TableException("GROUPING SETS are currently not supported.")
}
!distinctAggs && !groupSets && !agg.indicator
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala
new file mode 100644
index 0000000..dff2adc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamAggregateRule.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet}
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.flink.api.table.TableException
+import org.apache.flink.api.table.expressions.Alias
+import org.apache.flink.api.table.plan.logical.rel.LogicalWindowAggregate
+import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamAggregate, DataStreamConvention}
+
+import scala.collection.JavaConversions._
+
+class DataStreamAggregateRule
+ extends ConverterRule(
+ classOf[LogicalWindowAggregate],
+ Convention.NONE,
+ DataStreamConvention.INSTANCE,
+ "DataStreamAggregateRule")
+ {
+
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg: LogicalWindowAggregate = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // check if we have distinct aggregates
+ val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+ if (distinctAggs) {
+ throw TableException("DISTINCT aggregates are currently not supported.")
+ }
+
+ // check if we have grouping sets
+ val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
+ if (groupSets || agg.indicator) {
+ throw TableException("GROUPING SETS are currently not supported.")
+ }
+
+ !distinctAggs && !groupSets && !agg.indicator
+ }
+
+ override def convert(rel: RelNode): RelNode = {
+ val agg: LogicalWindowAggregate = rel.asInstanceOf[LogicalWindowAggregate]
+ val traitSet: RelTraitSet = rel.getTraitSet.replace(DataStreamConvention.INSTANCE)
+ val convInput: RelNode = RelOptRule.convert(agg.getInput, DataStreamConvention.INSTANCE)
+
+ new DataStreamAggregate(
+ agg.getWindow,
+ agg.getNamedProperties,
+ rel.getCluster,
+ traitSet,
+ convInput,
+ agg.getNamedAggCalls,
+ rel.getRowType,
+ agg.getInput.getRowType,
+ agg.getGroupSet.toArray)
+ }
+ }
+
+object DataStreamAggregateRule {
+ val INSTANCE: RelOptRule = new DataStreamAggregateRule
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
new file mode 100644
index 0000000..86f8a20
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateAllWindowFunction.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+class AggregateAllWindowFunction(
+ propertyReads: Array[WindowPropertyRead[_ <: Any]],
+ groupReduceFunction: RichGroupReduceFunction[Row, Row])
+ extends RichAllWindowFunction[Row, Row, Window] {
+
+ private var propertyCollector: PropertyCollector = _
+
+ override def open(parameters: Configuration): Unit = {
+ groupReduceFunction.open(parameters)
+ propertyCollector = new PropertyCollector(propertyReads)
+ }
+
+ override def apply(window: Window, input: Iterable[Row], out: Collector[Row]): Unit = {
+
+ // extract the properties from window
+ propertyReads.foreach(_.extract(window))
+
+ // set final collector
+ propertyCollector.finalCollector = out
+
+ // call wrapped reduce function with property collector
+ groupReduceFunction.reduce(input, propertyCollector)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/44f3977e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
new file mode 100644
index 0000000..ca074cc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction}
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+
+/**
+ * It wraps the aggregate logic inside of
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+ * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
+ *
+ * @param aggregates The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row
+ * and output Row.
+ * @param aggregateMapping The index mapping between aggregate function list and aggregated value
+ * index in output Row.
+ */
+class AggregateReduceCombineFunction(
+ private val aggregates: Array[Aggregate[_ <: Any]],
+ private val groupKeysMapping: Array[(Int, Int)],
+ private val aggregateMapping: Array[(Int, Int)],
+ private val intermediateRowArity: Int,
+ private val finalRowArity: Int)
+ extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] {
+
+ private var aggregateBuffer: Row = _
+ private var output: Row = _
+
+ override def open(config: Configuration): Unit = {
+ Preconditions.checkNotNull(aggregates)
+ Preconditions.checkNotNull(groupKeysMapping)
+ aggregateBuffer = new Row(intermediateRowArity)
+ output = new Row(finalRowArity)
+ }
+
+ /**
+ * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
+ * calculate aggregated values output by aggregate buffer, and set them into output
+ * Row based on the mapping relation between intermediate aggregate Row and output Row.
+ *
+ * @param records Grouped intermediate aggregate Rows iterator.
+ * @param out The collector to hand results to.
+ *
+ */
+ override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = {
+
+ // Initiate intermediate aggregate value.
+ aggregates.foreach(_.initiate(aggregateBuffer))
+
+ // Merge intermediate aggregate value to buffer.
+ var last: Row = null
+ records.foreach((record) => {
+ aggregates.foreach(_.merge(record, aggregateBuffer))
+ last = record
+ })
+
+ // Set group keys value to final output.
+ groupKeysMapping.foreach {
+ case (after, previous) =>
+ output.setField(after, last.productElement(previous))
+ }
+
+ // Evaluate final aggregate value and set to output.
+ aggregateMapping.foreach {
+ case (after, previous) =>
+ output.setField(after, aggregates(previous).evaluate(aggregateBuffer))
+ }
+
+ out.collect(output)
+ }
+
+ /**
+ * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer,
+ *
+ * @param records Sub-grouped intermediate aggregate Rows iterator.
+ * @return Combined intermediate aggregate Row.
+ *
+ */
+ override def combine(records: Iterable[Row]): Row = {
+
+ // Initiate intermediate aggregate value.
+ aggregates.foreach(_.initiate(aggregateBuffer))
+
+ // Merge intermediate aggregate value to buffer.
+ var last: Row = null
+ records.foreach((record) => {
+ aggregates.foreach(_.merge(record, aggregateBuffer))
+ last = record
+ })
+
+ // Set group keys to aggregateBuffer.
+ for (i <- groupKeysMapping.indices) {
+ aggregateBuffer.setField(i, last.productElement(i))
+ }
+
+ aggregateBuffer
+ }
+}