You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:04 UTC
[35/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
deleted file mode 100644
index a25c402..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan
-
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.plan.logical.{LogicalNode, Project}
-
-import scala.collection.mutable.ListBuffer
-
-object ProjectionTranslator {
-
- /**
- * Extracts and deduplicates all aggregation and window property expressions (zero, one, or more)
- * from the given expressions.
- *
- * @param exprs a list of expressions to extract
- * @param tableEnv the TableEnvironment
- * @return a Tuple2, the first field contains the extracted and deduplicated aggregations,
- * and the second field contains the extracted and deduplicated window properties.
- */
- def extractAggregationsAndProperties(
- exprs: Seq[Expression],
- tableEnv: TableEnvironment): (Map[Expression, String], Map[Expression, String]) = {
- exprs.foldLeft((Map[Expression, String](), Map[Expression, String]())) {
- (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2)
- }
- }
-
- /** Identifies and deduplicates aggregation functions and window properties. */
- private def identifyAggregationsAndProperties(
- exp: Expression,
- tableEnv: TableEnvironment,
- aggNames: Map[Expression, String],
- propNames: Map[Expression, String]) : (Map[Expression, String], Map[Expression, String]) = {
-
- exp match {
- case agg: Aggregation =>
- if (aggNames contains agg) {
- (aggNames, propNames)
- } else {
- (aggNames + (agg -> tableEnv.createUniqueAttributeName()), propNames)
- }
- case prop: WindowProperty =>
- if (propNames contains prop) {
- (aggNames, propNames)
- } else {
- (aggNames, propNames + (prop -> tableEnv.createUniqueAttributeName()))
- }
- case l: LeafExpression =>
- (aggNames, propNames)
- case u: UnaryExpression =>
- identifyAggregationsAndProperties(u.child, tableEnv, aggNames, propNames)
- case b: BinaryExpression =>
- val l = identifyAggregationsAndProperties(b.left, tableEnv, aggNames, propNames)
- identifyAggregationsAndProperties(b.right, tableEnv, l._1, l._2)
-
- // Functions calls
- case c @ Call(name, args) =>
- args.foldLeft((aggNames, propNames)){
- (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2)
- }
-
- case sfc @ ScalarFunctionCall(clazz, args) =>
- args.foldLeft((aggNames, propNames)){
- (x, y) => identifyAggregationsAndProperties(y, tableEnv, x._1, x._2)
- }
-
- // General expression
- case e: Expression =>
- e.productIterator.foldLeft((aggNames, propNames)){
- (x, y) => y match {
- case e: Expression => identifyAggregationsAndProperties(e, tableEnv, x._1, x._2)
- case _ => (x._1, x._2)
- }
- }
- }
- }
-
- /**
- * Replaces expressions with deduplicated aggregations and properties.
- *
- * @param exprs a list of expressions to replace
- * @param tableEnv the TableEnvironment
- * @param aggNames the deduplicated aggregations
- * @param propNames the deduplicated properties
- * @return a list of replaced expressions
- */
- def replaceAggregationsAndProperties(
- exprs: Seq[Expression],
- tableEnv: TableEnvironment,
- aggNames: Map[Expression, String],
- propNames: Map[Expression, String]): Seq[NamedExpression] = {
- exprs.map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames))
- .map(UnresolvedAlias)
- }
-
- private def replaceAggregationsAndProperties(
- exp: Expression,
- tableEnv: TableEnvironment,
- aggNames: Map[Expression, String],
- propNames: Map[Expression, String]) : Expression = {
-
- exp match {
- case agg: Aggregation =>
- val name = aggNames(agg)
- Alias(UnresolvedFieldReference(name), tableEnv.createUniqueAttributeName())
- case prop: WindowProperty =>
- val name = propNames(prop)
- Alias(UnresolvedFieldReference(name), tableEnv.createUniqueAttributeName())
- case n @ Alias(agg: Aggregation, name, _) =>
- val aName = aggNames(agg)
- Alias(UnresolvedFieldReference(aName), name)
- case n @ Alias(prop: WindowProperty, name, _) =>
- val pName = propNames(prop)
- Alias(UnresolvedFieldReference(pName), name)
- case l: LeafExpression => l
- case u: UnaryExpression =>
- val c = replaceAggregationsAndProperties(u.child, tableEnv, aggNames, propNames)
- u.makeCopy(Array(c))
- case b: BinaryExpression =>
- val l = replaceAggregationsAndProperties(b.left, tableEnv, aggNames, propNames)
- val r = replaceAggregationsAndProperties(b.right, tableEnv, aggNames, propNames)
- b.makeCopy(Array(l, r))
-
- // Functions calls
- case c @ Call(name, args) =>
- val newArgs = args.map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames))
- c.makeCopy(Array(name, newArgs))
-
- case sfc @ ScalarFunctionCall(clazz, args) =>
- val newArgs: Seq[Expression] = args
- .map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames))
- sfc.makeCopy(Array(clazz, newArgs))
-
- // array constructor
- case c @ ArrayConstructor(args) =>
- val newArgs = c.elements
- .map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames))
- c.makeCopy(Array(newArgs))
-
- // General expression
- case e: Expression =>
- val newArgs = e.productIterator.map {
- case arg: Expression =>
- replaceAggregationsAndProperties(arg, tableEnv, aggNames, propNames)
- }
- e.makeCopy(newArgs.toArray)
- }
- }
-
- /**
- * Expands an UnresolvedFieldReference("*") to parent's full project list.
- */
- def expandProjectList(
- exprs: Seq[Expression],
- parent: LogicalNode,
- tableEnv: TableEnvironment)
- : Seq[Expression] = {
-
- val projectList = new ListBuffer[Expression]
-
- exprs.foreach {
- case n: UnresolvedFieldReference if n.name == "*" =>
- projectList ++= parent.output.map(a => UnresolvedFieldReference(a.name))
-
- case Flattening(unresolved) =>
- // simulate a simple project to resolve fields using current parent
- val project = Project(Seq(UnresolvedAlias(unresolved)), parent).validate(tableEnv)
- val resolvedExpr = project
- .output
- .headOption
- .getOrElse(throw new RuntimeException("Could not find resolved composite."))
- resolvedExpr.validateInput()
- val newProjects = resolvedExpr.resultType match {
- case ct: CompositeType[_] =>
- (0 until ct.getArity).map { idx =>
- projectList += GetCompositeField(unresolved, ct.getFieldNames()(idx))
- }
- case _ =>
- projectList += unresolved
- }
-
- case e: Expression => projectList += e
- }
- projectList
- }
-
- /**
- * Extract all field references from the given expressions.
- *
- * @param exprs a list of expressions to extract
- * @return a list of field references extracted from the given expressions
- */
- def extractFieldReferences(exprs: Seq[Expression]): Seq[NamedExpression] = {
- exprs.foldLeft(Set[NamedExpression]()) {
- (fieldReferences, expr) => identifyFieldReferences(expr, fieldReferences)
- }.toSeq
- }
-
- private def identifyFieldReferences(
- expr: Expression,
- fieldReferences: Set[NamedExpression]): Set[NamedExpression] = expr match {
-
- case f: UnresolvedFieldReference =>
- fieldReferences + UnresolvedAlias(f)
-
- case b: BinaryExpression =>
- val l = identifyFieldReferences(b.left, fieldReferences)
- identifyFieldReferences(b.right, l)
-
- // Functions calls
- case c @ Call(name, args) =>
- args.foldLeft(fieldReferences) {
- (fieldReferences, expr) => identifyFieldReferences(expr, fieldReferences)
- }
- case sfc @ ScalarFunctionCall(clazz, args) =>
- args.foldLeft(fieldReferences) {
- (fieldReferences, expr) => identifyFieldReferences(expr, fieldReferences)
- }
-
- // array constructor
- case c @ ArrayConstructor(args) =>
- args.foldLeft(fieldReferences) {
- (fieldReferences, expr) => identifyFieldReferences(expr, fieldReferences)
- }
-
- // ignore fields from window property
- case w : WindowProperty =>
- fieldReferences
-
- // keep this case after all unwanted unary expressions
- case u: UnaryExpression =>
- identifyFieldReferences(u.child, fieldReferences)
-
- // General expression
- case e: Expression =>
- e.productIterator.foldLeft(fieldReferences) {
- (fieldReferences, expr) => expr match {
- case e: Expression => identifyFieldReferences(e, fieldReferences)
- case _ => fieldReferences
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCost.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCost.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCost.scala
deleted file mode 100644
index 58537dd..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCost.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.cost
-
-import org.apache.calcite.plan.{RelOptUtil, RelOptCostFactory, RelOptCost}
-import org.apache.calcite.util.Util
-
-/**
- * This class is based on Apache Calcite's `org.apache.calcite.plan.volcano.VolcanoCost` and has
- * an adapted cost comparison method `isLe(other: RelOptCost)` that takes io and cpu into account.
- */
-class DataSetCost(val rowCount: Double, val cpu: Double, val io: Double) extends RelOptCost {
-
- def getCpu: Double = cpu
-
- def isInfinite: Boolean = {
- (this eq DataSetCost.Infinity) ||
- (this.rowCount == Double.PositiveInfinity) ||
- (this.cpu == Double.PositiveInfinity) ||
- (this.io == Double.PositiveInfinity)
- }
-
- def getIo: Double = io
-
- def isLe(other: RelOptCost): Boolean = {
- val that: DataSetCost = other.asInstanceOf[DataSetCost]
- (this eq that) ||
- (this.io < that.io) ||
- (this.io == that.io && this.cpu < that.cpu) ||
- (this.io == that.io && this.cpu == that.cpu && this.rowCount < that.rowCount)
- }
-
- def isLt(other: RelOptCost): Boolean = {
- isLe(other) && !(this == other)
- }
-
- def getRows: Double = rowCount
-
- override def hashCode: Int = Util.hashCode(rowCount) + Util.hashCode(cpu) + Util.hashCode(io)
-
- def equals(other: RelOptCost): Boolean = {
- (this eq other) ||
- other.isInstanceOf[DataSetCost] &&
- (this.rowCount == other.asInstanceOf[DataSetCost].rowCount) &&
- (this.cpu == other.asInstanceOf[DataSetCost].cpu) &&
- (this.io == other.asInstanceOf[DataSetCost].io)
- }
-
- def isEqWithEpsilon(other: RelOptCost): Boolean = {
- if (!other.isInstanceOf[DataSetCost]) {
- return false
- }
- val that: DataSetCost = other.asInstanceOf[DataSetCost]
- (this eq that) ||
- ((Math.abs(this.rowCount - that.rowCount) < RelOptUtil.EPSILON) &&
- (Math.abs(this.cpu - that.cpu) < RelOptUtil.EPSILON) &&
- (Math.abs(this.io - that.io) < RelOptUtil.EPSILON))
- }
-
- def minus(other: RelOptCost): RelOptCost = {
- if (this eq DataSetCost.Infinity) {
- return this
- }
- val that: DataSetCost = other.asInstanceOf[DataSetCost]
- new DataSetCost(this.rowCount - that.rowCount, this.cpu - that.cpu, this.io - that.io)
- }
-
- def multiplyBy(factor: Double): RelOptCost = {
- if (this eq DataSetCost.Infinity) {
- return this
- }
- new DataSetCost(rowCount * factor, cpu * factor, io * factor)
- }
-
- def divideBy(cost: RelOptCost): Double = {
- val that: DataSetCost = cost.asInstanceOf[DataSetCost]
- var d: Double = 1
- var n: Double = 0
- if ((this.rowCount != 0) && !this.rowCount.isInfinite &&
- (that.rowCount != 0) && !that.rowCount.isInfinite)
- {
- d *= this.rowCount / that.rowCount
- n += 1
- }
- if ((this.cpu != 0) && !this.cpu.isInfinite && (that.cpu != 0) && !that.cpu.isInfinite) {
- d *= this.cpu / that.cpu
- n += 1
- }
- if ((this.io != 0) && !this.io.isInfinite && (that.io != 0) && !that.io.isInfinite) {
- d *= this.io / that.io
- n += 1
- }
- if (n == 0) {
- return 1.0
- }
- Math.pow(d, 1 / n)
- }
-
- def plus(other: RelOptCost): RelOptCost = {
- val that: DataSetCost = other.asInstanceOf[DataSetCost]
- if ((this eq DataSetCost.Infinity) || (that eq DataSetCost.Infinity)) {
- return DataSetCost.Infinity
- }
- new DataSetCost(this.rowCount + that.rowCount, this.cpu + that.cpu, this.io + that.io)
- }
-
- override def toString: String = s"{$rowCount rows, $cpu cpu, $io io}"
-
-}
-
-object DataSetCost {
-
- private[flink] val Infinity = new DataSetCost(
- Double.PositiveInfinity,
- Double.PositiveInfinity,
- Double.PositiveInfinity)
- {
- override def toString: String = "{inf}"
- }
-
- private[flink] val Huge = new DataSetCost(Double.MaxValue, Double.MaxValue, Double.MaxValue) {
- override def toString: String = "{huge}"
- }
-
- private[flink] val Zero = new DataSetCost(0.0, 0.0, 0.0) {
- override def toString: String = "{0}"
- }
-
- private[flink] val Tiny = new DataSetCost(1.0, 1.0, 0.0) {
- override def toString = "{tiny}"
- }
-
- val FACTORY: RelOptCostFactory = new DataSetCostFactory
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCostFactory.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCostFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCostFactory.scala
deleted file mode 100644
index 87d57d6..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/cost/DataSetCostFactory.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.cost
-
-import org.apache.calcite.plan.{RelOptCost, RelOptCostFactory}
-
-/**
- * This class is based on Apache Calcite's `org.apache.calcite.plan.volcano.VolcanoCost#Factory`.
- */
-class DataSetCostFactory extends RelOptCostFactory {
-
- override def makeCost(dRows: Double, dCpu: Double, dIo: Double): RelOptCost = {
- new DataSetCost(dRows, dCpu, dIo)
- }
-
- override def makeHugeCost: RelOptCost = {
- DataSetCost.Huge
- }
-
- override def makeInfiniteCost: RelOptCost = {
- DataSetCost.Infinity
- }
-
- override def makeTinyCost: RelOptCost = {
- DataSetCost.Tiny
- }
-
- override def makeZeroCost: RelOptCost = {
- DataSetCost.Zero
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
deleted file mode 100644
index 21290d4..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalNode.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.logical
-
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.table.{TableEnvironment, ValidationException}
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.trees.TreeNode
-import org.apache.flink.api.table.typeutils.TypeCoercion
-import org.apache.flink.api.table.validate._
-
-/**
- * LogicalNode is created and validated as we construct query plan using Table API.
- *
- * The main validation procedure is separated into two phases:
- *
- * Expressions' resolution and transformation ([[resolveExpressions]]):
- *
- * - translate [[UnresolvedFieldReference]] into [[ResolvedFieldReference]]
- * using child operator's output
- * - translate [[Call]](UnresolvedFunction) into solid Expression
- * - generate alias names for query output
- * - ....
- *
- * LogicalNode validation ([[validate]]):
- *
- * - check no [[UnresolvedFieldReference]] exists any more
- * - check if all expressions have children of needed type
- * - check each logical operator have desired input
- *
- * Once we pass the validation phase, we can safely convert LogicalNode into Calcite's RelNode.
- */
-abstract class LogicalNode extends TreeNode[LogicalNode] {
- def output: Seq[Attribute]
-
- def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
- // resolve references and function calls
- val exprResolved = expressionPostOrderTransform {
- case u @ UnresolvedFieldReference(name) =>
- resolveReference(tableEnv, name).getOrElse(u)
- case c @ Call(name, children) if c.childrenValid =>
- tableEnv.getFunctionCatalog.lookupFunction(name, children)
- }
-
- exprResolved.expressionPostOrderTransform {
- case ips: InputTypeSpec if ips.childrenValid =>
- var changed: Boolean = false
- val newChildren = ips.expectedTypes.zip(ips.children).map { case (tpe, child) =>
- val childType = child.resultType
- if (childType != tpe && TypeCoercion.canSafelyCast(childType, tpe)) {
- changed = true
- Cast(child, tpe)
- } else {
- child
- }
- }.toArray[AnyRef]
- if (changed) ips.makeCopy(newChildren) else ips
- }
- }
-
- final def toRelNode(relBuilder: RelBuilder): RelNode = construct(relBuilder).build()
-
- protected[logical] def construct(relBuilder: RelBuilder): RelBuilder
-
- def validate(tableEnv: TableEnvironment): LogicalNode = {
- val resolvedNode = resolveExpressions(tableEnv)
- resolvedNode.expressionPostOrderTransform {
- case a: Attribute if !a.valid =>
- val from = children.flatMap(_.output).map(_.name).mkString(", ")
- failValidation(s"Cannot resolve [${a.name}] given input [$from].")
-
- case e: Expression if e.validateInput().isFailure =>
- failValidation(s"Expression $e failed on input check: " +
- s"${e.validateInput().asInstanceOf[ValidationFailure].message}")
- }
- }
-
- /**
- * Resolves the given strings to a [[NamedExpression]] using the input from all child
- * nodes of this LogicalPlan.
- */
- def resolveReference(tableEnv: TableEnvironment, name: String): Option[NamedExpression] = {
- val childrenOutput = children.flatMap(_.output)
- val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(name))
- if (candidates.length > 1) {
- failValidation(s"Reference $name is ambiguous.")
- } else if (candidates.isEmpty) {
- None
- } else {
- Some(candidates.head.withName(name))
- }
- }
-
- /**
- * Runs [[postOrderTransform]] with `rule` on all expressions present in this logical node.
- *
- * @param rule the rule to be applied to every expression in this logical node.
- */
- def expressionPostOrderTransform(rule: PartialFunction[Expression, Expression]): LogicalNode = {
- var changed = false
-
- def expressionPostOrderTransform(e: Expression): Expression = {
- val newExpr = e.postOrderTransform(rule)
- if (newExpr.fastEquals(e)) {
- e
- } else {
- changed = true
- newExpr
- }
- }
-
- val newArgs = productIterator.map {
- case e: Expression => expressionPostOrderTransform(e)
- case Some(e: Expression) => Some(expressionPostOrderTransform(e))
- case seq: Traversable[_] => seq.map {
- case e: Expression => expressionPostOrderTransform(e)
- case other => other
- }
- case r: Resolvable[_] => r.resolveExpressions(e => expressionPostOrderTransform(e))
- case other: AnyRef => other
- }.toArray
-
- if (changed) makeCopy(newArgs) else this
- }
-
- protected def failValidation(msg: String): Nothing = {
- throw new ValidationException(msg)
- }
-}
-
-abstract class LeafNode extends LogicalNode {
- override def children: Seq[LogicalNode] = Nil
-}
-
-abstract class UnaryNode extends LogicalNode {
- def child: LogicalNode
-
- override def children: Seq[LogicalNode] = child :: Nil
-}
-
-abstract class BinaryNode extends LogicalNode {
- def left: LogicalNode
- def right: LogicalNode
-
- override def children: Seq[LogicalNode] = left :: right :: Nil
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalWindow.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalWindow.scala
deleted file mode 100644
index 19fd603..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalWindow.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.logical
-
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.api.table.expressions.{Expression, WindowReference}
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-
-abstract class LogicalWindow(val alias: Option[Expression]) extends Resolvable[LogicalWindow] {
-
- def resolveExpressions(resolver: (Expression) => Expression): LogicalWindow = this
-
- def validate(tableEnv: TableEnvironment): ValidationResult = alias match {
- case Some(WindowReference(_)) => ValidationSuccess
- case Some(_) => ValidationFailure("Window reference for window expected.")
- case None => ValidationSuccess
- }
-
- override def toString: String = getClass.getSimpleName
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala
deleted file mode 100644
index 7540d43..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.logical
-
-import org.apache.flink.api.table.expressions.Expression
-
-/**
- * A class implementing this interface can resolve the expressions of its parameters and
- * return a new instance with resolved parameters. This is necessary if expression are nested in
- * a not supported structure. By default, the validation of a logical node can resolve common
- * structures like `Expression`, `Option[Expression]`, `Traversable[Expression]`.
- *
- * See also [[LogicalNode.expressionPostOrderTransform(scala.PartialFunction)]].
- *
- * @tparam T class which expression parameters need to be resolved
- */
-trait Resolvable[T <: AnyRef] {
-
- /**
- * An implementing class can resolve its expressions by applying the given resolver
- * function on its parameters.
- *
- * @param resolver function that can resolve an expression
- * @return class with resolved expression parameters
- */
- def resolveExpressions(resolver: (Expression) => Expression): T
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala
deleted file mode 100644
index aeb9676..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.logical
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.{BatchTableEnvironment, StreamTableEnvironment, TableEnvironment}
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo, TypeCoercion}
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-
-abstract class EventTimeGroupWindow(
- name: Option[Expression],
- time: Expression)
- extends LogicalWindow(name) {
-
- override def validate(tableEnv: TableEnvironment): ValidationResult = {
- val valid = super.validate(tableEnv)
- if (valid.isFailure) {
- return valid
- }
-
- tableEnv match {
- case _: StreamTableEnvironment =>
- time match {
- case RowtimeAttribute() =>
- ValidationSuccess
- case _ =>
- ValidationFailure("Event-time window expects a 'rowtime' time field.")
- }
- case _: BatchTableEnvironment =>
- if (!TypeCoercion.canCast(time.resultType, BasicTypeInfo.LONG_TYPE_INFO)) {
- ValidationFailure(s"Event-time window expects a time field that can be safely cast " +
- s"to Long, but is ${time.resultType}")
- } else {
- ValidationSuccess
- }
- }
-
- }
-}
-
-abstract class ProcessingTimeGroupWindow(name: Option[Expression]) extends LogicalWindow(name)
-
-// ------------------------------------------------------------------------------------------------
-// Tumbling group windows
-// ------------------------------------------------------------------------------------------------
-
-object TumblingGroupWindow {
- def validate(tableEnv: TableEnvironment, size: Expression): ValidationResult = size match {
- case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
- ValidationSuccess
- case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
- ValidationSuccess
- case _ =>
- ValidationFailure("Tumbling window expects size literal of type Interval of Milliseconds " +
- "or Interval of Rows.")
- }
-}
-
-case class ProcessingTimeTumblingGroupWindow(
- name: Option[Expression],
- size: Expression)
- extends ProcessingTimeGroupWindow(name) {
-
- override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
- ProcessingTimeTumblingGroupWindow(
- name.map(resolve),
- resolve(size))
-
- override def validate(tableEnv: TableEnvironment): ValidationResult =
- super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, size))
-
- override def toString: String = s"ProcessingTimeTumblingGroupWindow($name, $size)"
-}
-
-case class EventTimeTumblingGroupWindow(
- name: Option[Expression],
- timeField: Expression,
- size: Expression)
- extends EventTimeGroupWindow(
- name,
- timeField) {
-
- override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
- EventTimeTumblingGroupWindow(
- name.map(resolve),
- resolve(timeField),
- resolve(size))
-
- override def validate(tableEnv: TableEnvironment): ValidationResult =
- super.validate(tableEnv)
- .orElse(TumblingGroupWindow.validate(tableEnv, size))
- .orElse(size match {
- case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
- ValidationFailure(
- "Event-time grouping windows on row intervals are currently not supported.")
- case _ =>
- ValidationSuccess
- })
-
- override def toString: String = s"EventTimeTumblingGroupWindow($name, $timeField, $size)"
-}
-
-// ------------------------------------------------------------------------------------------------
-// Sliding group windows
-// ------------------------------------------------------------------------------------------------
-
-object SlidingGroupWindow {
- def validate(
- tableEnv: TableEnvironment,
- size: Expression,
- slide: Expression)
- : ValidationResult = {
-
- val checkedSize = size match {
- case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
- ValidationSuccess
- case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
- ValidationSuccess
- case _ =>
- ValidationFailure("Sliding window expects size literal of type Interval of " +
- "Milliseconds or Interval of Rows.")
- }
-
- val checkedSlide = slide match {
- case Literal(_, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
- ValidationSuccess
- case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
- ValidationSuccess
- case _ =>
- ValidationFailure("Sliding window expects slide literal of type Interval of " +
- "Milliseconds or Interval of Rows.")
- }
-
- checkedSize
- .orElse(checkedSlide)
- .orElse {
- if (size.resultType != slide.resultType) {
- ValidationFailure("Sliding window expects same type of size and slide.")
- } else {
- ValidationSuccess
- }
- }
- }
-}
-
-case class ProcessingTimeSlidingGroupWindow(
- name: Option[Expression],
- size: Expression,
- slide: Expression)
- extends ProcessingTimeGroupWindow(name) {
-
- override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
- ProcessingTimeSlidingGroupWindow(
- name.map(resolve),
- resolve(size),
- resolve(slide))
-
- override def validate(tableEnv: TableEnvironment): ValidationResult =
- super.validate(tableEnv).orElse(SlidingGroupWindow.validate(tableEnv, size, slide))
-
- override def toString: String = s"ProcessingTimeSlidingGroupWindow($name, $size, $slide)"
-}
-
-case class EventTimeSlidingGroupWindow(
- name: Option[Expression],
- timeField: Expression,
- size: Expression,
- slide: Expression)
- extends EventTimeGroupWindow(name, timeField) {
-
- override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
- EventTimeSlidingGroupWindow(
- name.map(resolve),
- resolve(timeField),
- resolve(size),
- resolve(slide))
-
- override def validate(tableEnv: TableEnvironment): ValidationResult =
- super.validate(tableEnv)
- .orElse(SlidingGroupWindow.validate(tableEnv, size, slide))
- .orElse(size match {
- case Literal(_, RowIntervalTypeInfo.INTERVAL_ROWS) =>
- ValidationFailure(
- "Event-time grouping windows on row intervals are currently not supported.")
- case _ =>
- ValidationSuccess
- })
-
- override def toString: String = s"EventTimeSlidingGroupWindow($name, $timeField, $size, $slide)"
-}
-
-// ------------------------------------------------------------------------------------------------
-// Session group windows
-// ------------------------------------------------------------------------------------------------
-
-object SessionGroupWindow {
-
- def validate(tableEnv: TableEnvironment, gap: Expression): ValidationResult = gap match {
- case Literal(timeInterval: Long, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
- ValidationSuccess
- case _ =>
- ValidationFailure(
- "Session window expects gap literal of type Interval of Milliseconds.")
- }
-}
-
-case class ProcessingTimeSessionGroupWindow(
- name: Option[Expression],
- gap: Expression)
- extends ProcessingTimeGroupWindow(name) {
-
- override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
- ProcessingTimeSessionGroupWindow(
- name.map(resolve),
- resolve(gap))
-
- override def validate(tableEnv: TableEnvironment): ValidationResult =
- super.validate(tableEnv).orElse(SessionGroupWindow.validate(tableEnv, gap))
-
- override def toString: String = s"ProcessingTimeSessionGroupWindow($name, $gap)"
-}
-
-case class EventTimeSessionGroupWindow(
- name: Option[Expression],
- timeField: Expression,
- gap: Expression)
- extends EventTimeGroupWindow(
- name,
- timeField) {
-
- override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow =
- EventTimeSessionGroupWindow(
- name.map(resolve),
- resolve(timeField),
- resolve(gap))
-
- override def validate(tableEnv: TableEnvironment): ValidationResult =
- super.validate(tableEnv).orElse(SessionGroupWindow.validate(tableEnv, gap))
-
- override def toString: String = s"EventTimeSessionGroupWindow($name, $timeField, $gap)"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
deleted file mode 100644
index 438698a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ /dev/null
@@ -1,692 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.logical
-
-import java.lang.reflect.Method
-import java.util
-
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.CorrelationId
-import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableFunctionScan}
-import org.apache.calcite.rex.{RexInputRef, RexNode}
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.api.table._
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.functions.utils.TableSqlFunction
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
-import org.apache.flink.api.table.typeutils.TypeConverter
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationSuccess}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extends UnaryNode {
- override def output: Seq[Attribute] = projectList.map(_.toAttribute)
-
- override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
- val afterResolve = super.resolveExpressions(tableEnv).asInstanceOf[Project]
- val newProjectList =
- afterResolve.projectList.zipWithIndex.map { case (e, i) =>
- e match {
- case u @ UnresolvedAlias(c) => c match {
- case ne: NamedExpression => ne
- case expr if !expr.valid => u
- case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp")
- case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))
- case other => Alias(other, s"_c$i")
- }
- case _ =>
- throw new RuntimeException("This should never be called and probably points to a bug.")
- }
- }
- Project(newProjectList, child)
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- val resolvedProject = super.validate(tableEnv).asInstanceOf[Project]
- val names: mutable.Set[String] = mutable.Set()
-
- def checkName(name: String): Unit = {
- if (names.contains(name)) {
- failValidation(s"Duplicate field name $name.")
- } else if (tableEnv.isInstanceOf[StreamTableEnvironment] && name == "rowtime") {
- failValidation("'rowtime' cannot be used as field name in a streaming environment.")
- } else {
- names.add(name)
- }
- }
-
- resolvedProject.projectList.foreach {
- case n: Alias =>
- // explicit name
- checkName(n.name)
- case r: ResolvedFieldReference =>
- // simple field forwarding
- checkName(r.name)
- case _ => // Do nothing
- }
- resolvedProject
- }
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- val allAlias = projectList.forall(_.isInstanceOf[Alias])
- child.construct(relBuilder)
- if (allAlias) {
- // Calcite's RelBuilder does not translate identity projects even if they rename fields.
- // Add a projection ourselves (will be automatically removed by translation rules).
- val project = LogicalProject.create(relBuilder.peek(),
- // avoid AS call
- projectList.map(_.asInstanceOf[Alias].child.toRexNode(relBuilder)).asJava,
- projectList.map(_.name).asJava)
- relBuilder.build() // pop previous relNode
- relBuilder.push(project)
- } else {
- relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*)
- }
- }
-}
-
-case class AliasNode(aliasList: Seq[Expression], child: LogicalNode) extends UnaryNode {
- override def output: Seq[Attribute] =
- throw UnresolvedException("Invalid call to output on AliasNode")
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder =
- throw UnresolvedException("Invalid call to toRelNode on AliasNode")
-
- override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
- if (aliasList.length > child.output.length) {
- failValidation("Aliasing more fields than we actually have")
- } else if (!aliasList.forall(_.isInstanceOf[UnresolvedFieldReference])) {
- failValidation("Alias only accept name expressions as arguments")
- } else if (!aliasList.forall(_.asInstanceOf[UnresolvedFieldReference].name != "*")) {
- failValidation("Alias can not accept '*' as name")
- } else if (tableEnv.isInstanceOf[StreamTableEnvironment] && !aliasList.forall {
- case UnresolvedFieldReference(name) => name != "rowtime"
- }) {
- failValidation("'rowtime' cannot be used as field name in a streaming environment.")
- } else {
- val names = aliasList.map(_.asInstanceOf[UnresolvedFieldReference].name)
- val input = child.output
- Project(
- names.zip(input).map { case (name, attr) =>
- Alias(attr, name)} ++ input.drop(names.length), child)
- }
- }
-}
-
-case class Distinct(child: LogicalNode) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- child.construct(relBuilder)
- relBuilder.distinct()
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
- failValidation(s"Distinct on stream tables is currently not supported.")
- }
- this
- }
-}
-
-case class Sort(order: Seq[Ordering], child: LogicalNode) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- child.construct(relBuilder)
- relBuilder.sort(order.map(_.toRexNode(relBuilder)).asJava)
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
- failValidation(s"Sort on stream tables is currently not supported.")
- }
- super.validate(tableEnv)
- }
-}
-
-case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- child.construct(relBuilder)
- relBuilder.limit(offset, fetch)
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
- failValidation(s"Limit on stream tables is currently not supported.")
- }
- if (!child.validate(tableEnv).isInstanceOf[Sort]) {
- failValidation(s"Limit operator must be preceded by an OrderBy operator.")
- }
- if (offset < 0) {
- failValidation(s"Offset should be greater than or equal to zero.")
- }
- super.validate(tableEnv)
- }
-}
-
-case class Filter(condition: Expression, child: LogicalNode) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- child.construct(relBuilder)
- relBuilder.filter(condition.toRexNode(relBuilder))
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- val resolvedFilter = super.validate(tableEnv).asInstanceOf[Filter]
- if (resolvedFilter.condition.resultType != BOOLEAN_TYPE_INFO) {
- failValidation(s"Filter operator requires a boolean expression as input," +
- s" but ${resolvedFilter.condition} is of type ${resolvedFilter.condition.resultType}")
- }
- resolvedFilter
- }
-}
-
-case class Aggregate(
- groupingExpressions: Seq[Expression],
- aggregateExpressions: Seq[NamedExpression],
- child: LogicalNode) extends UnaryNode {
-
- override def output: Seq[Attribute] = {
- (groupingExpressions ++ aggregateExpressions) map {
- case ne: NamedExpression => ne.toAttribute
- case e => Alias(e, e.toString).toAttribute
- }
- }
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- child.construct(relBuilder)
- relBuilder.aggregate(
- relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
- aggregateExpressions.map {
- case Alias(agg: Aggregation, name, _) => agg.toAggCall(name)(relBuilder)
- case _ => throw new RuntimeException("This should never happen.")
- }.asJava)
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
- failValidation(s"Aggregate on stream tables is currently not supported.")
- }
-
- val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
- val groupingExprs = resolvedAggregate.groupingExpressions
- val aggregateExprs = resolvedAggregate.aggregateExpressions
- aggregateExprs.foreach(validateAggregateExpression)
- groupingExprs.foreach(validateGroupingExpression)
-
- def validateAggregateExpression(expr: Expression): Unit = expr match {
- // check no nested aggregation exists.
- case aggExpr: Aggregation =>
- aggExpr.children.foreach { child =>
- child.preOrderVisit {
- case agg: Aggregation =>
- failValidation(
- "It's not allowed to use an aggregate function as " +
- "input of another aggregate function")
- case _ => // OK
- }
- }
- case a: Attribute if !groupingExprs.exists(_.checkEquals(a)) =>
- failValidation(
- s"expression '$a' is invalid because it is neither" +
- " present in group by nor an aggregate function")
- case e if groupingExprs.exists(_.checkEquals(e)) => // OK
- case e => e.children.foreach(validateAggregateExpression)
- }
-
- def validateGroupingExpression(expr: Expression): Unit = {
- if (!expr.resultType.isKeyType) {
- failValidation(
- s"expression $expr cannot be used as a grouping expression " +
- "because it's not a valid key type which must be hashable and comparable")
- }
- }
- resolvedAggregate
- }
-}
-
-case class Minus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
- override def output: Seq[Attribute] = left.output
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- left.construct(relBuilder)
- right.construct(relBuilder)
- relBuilder.minus(all)
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
- failValidation(s"Minus on stream tables is currently not supported.")
- }
-
- val resolvedMinus = super.validate(tableEnv).asInstanceOf[Minus]
- if (left.output.length != right.output.length) {
- failValidation(s"Minus two table of different column sizes:" +
- s" ${left.output.size} and ${right.output.size}")
- }
- val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
- l.resultType == r.resultType
- }
- if (!sameSchema) {
- failValidation(s"Minus two table of different schema:" +
- s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
- s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
- }
- resolvedMinus
- }
-}
-
-case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
- override def output: Seq[Attribute] = left.output
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- left.construct(relBuilder)
- right.construct(relBuilder)
- relBuilder.union(all)
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment] && !all) {
- failValidation(s"Union on stream tables is currently not supported.")
- }
-
- val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union]
- if (left.output.length != right.output.length) {
- failValidation(s"Union two tables of different column sizes:" +
- s" ${left.output.size} and ${right.output.size}")
- }
- val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
- l.resultType == r.resultType
- }
- if (!sameSchema) {
- failValidation(s"Union two tables of different schema:" +
- s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
- s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
- }
- resolvedUnion
- }
-}
-
-case class Intersect(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
- override def output: Seq[Attribute] = left.output
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- left.construct(relBuilder)
- right.construct(relBuilder)
- relBuilder.intersect(all)
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
- failValidation(s"Intersect on stream tables is currently not supported.")
- }
-
- val resolvedIntersect = super.validate(tableEnv).asInstanceOf[Intersect]
- if (left.output.length != right.output.length) {
- failValidation(s"Intersect two tables of different column sizes:" +
- s" ${left.output.size} and ${right.output.size}")
- }
- // allow different column names between tables
- val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
- l.resultType == r.resultType
- }
- if (!sameSchema) {
- failValidation(s"Intersect two tables of different schema:" +
- s" [${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
- s" [${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
- }
- resolvedIntersect
- }
-}
-
-case class Join(
- left: LogicalNode,
- right: LogicalNode,
- joinType: JoinType,
- condition: Option[Expression],
- correlated: Boolean) extends BinaryNode {
-
- override def output: Seq[Attribute] = {
- left.output ++ right.output
- }
-
- private case class JoinFieldReference(
- name: String,
- resultType: TypeInformation[_],
- left: LogicalNode,
- right: LogicalNode) extends Attribute {
-
- val isFromLeftInput = left.output.map(_.name).contains(name)
-
- val (indexInInput, indexInJoin) = if (isFromLeftInput) {
- val indexInLeft = left.output.map(_.name).indexOf(name)
- (indexInLeft, indexInLeft)
- } else {
- val indexInRight = right.output.map(_.name).indexOf(name)
- (indexInRight, indexInRight + left.output.length)
- }
-
- override def toString = s"'$name"
-
- override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- // look up type of field
- val fieldType = relBuilder.field(2, if (isFromLeftInput) 0 else 1, name).getType
- // create a new RexInputRef with index offset
- new RexInputRef(indexInJoin, fieldType)
- }
-
- override def withName(newName: String): Attribute = {
- if (newName == name) {
- this
- } else {
- JoinFieldReference(newName, resultType, left, right)
- }
- }
- }
-
- override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
- val node = super.resolveExpressions(tableEnv).asInstanceOf[Join]
- val partialFunction: PartialFunction[Expression, Expression] = {
- case field: ResolvedFieldReference => JoinFieldReference(
- field.name,
- field.resultType,
- left,
- right)
- }
- val resolvedCondition = node.condition.map(_.postOrderTransform(partialFunction))
- Join(node.left, node.right, node.joinType, resolvedCondition, correlated)
- }
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- left.construct(relBuilder)
- right.construct(relBuilder)
-
- val corSet = mutable.Set[CorrelationId]()
- if (correlated) {
- corSet += relBuilder.peek().getCluster.createCorrel()
- }
-
- relBuilder.join(
- TypeConverter.flinkJoinTypeToRelType(joinType),
- condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)),
- corSet.asJava)
- }
-
- private def ambiguousName: Set[String] =
- left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment]
- && !right.isInstanceOf[LogicalTableFunctionCall]) {
- failValidation(s"Join on stream tables is currently not supported.")
- }
-
- val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
- if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
- failValidation(s"Filter operator requires a boolean expression as input, " +
- s"but ${resolvedJoin.condition} is of type ${resolvedJoin.joinType}")
- } else if (ambiguousName.nonEmpty) {
- failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
- }
-
- resolvedJoin.condition.foreach(testJoinCondition)
- resolvedJoin
- }
-
- private def testJoinCondition(expression: Expression): Unit = {
-
- def checkIfJoinCondition(exp : BinaryComparison) = exp.children match {
- case (x : JoinFieldReference) :: (y : JoinFieldReference) :: Nil
- if x.isFromLeftInput != y.isFromLeftInput => Unit
- case x => failValidation(
- s"Invalid non-join predicate $exp. For non-join predicates use Table#where.")
- }
-
- var equiJoinFound = false
- def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match {
- case x: And => x.children.foreach(validateConditions(_, isAndBranch))
- case x: Or => x.children.foreach(validateConditions(_, isAndBranch = false))
- case x: EqualTo =>
- if (isAndBranch) {
- equiJoinFound = true
- }
- checkIfJoinCondition(x)
- case x: BinaryComparison => checkIfJoinCondition(x)
- case x => failValidation(
- s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x")
- }
-
- validateConditions(expression, isAndBranch = true)
- if (!equiJoinFound) {
- failValidation(s"Invalid join condition: $expression. At least one equi-join required.")
- }
- }
-}
-
-case class CatalogNode(
- tableName: String,
- rowType: RelDataType) extends LeafNode {
-
- val output: Seq[Attribute] = rowType.getFieldList.asScala.map { field =>
- ResolvedFieldReference(field.getName, FlinkTypeFactory.toTypeInfo(field.getType))
- }
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- relBuilder.scan(tableName)
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = this
-}
-
-/**
- * Wrapper for valid logical plans generated from SQL String.
- */
-case class LogicalRelNode(
- relNode: RelNode) extends LeafNode {
-
- val output: Seq[Attribute] = relNode.getRowType.getFieldList.asScala.map { field =>
- ResolvedFieldReference(field.getName, FlinkTypeFactory.toTypeInfo(field.getType))
- }
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- relBuilder.push(relNode)
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = this
-}
-
-case class WindowAggregate(
- groupingExpressions: Seq[Expression],
- window: LogicalWindow,
- propertyExpressions: Seq[NamedExpression],
- aggregateExpressions: Seq[NamedExpression],
- child: LogicalNode)
- extends UnaryNode {
-
- override def output: Seq[Attribute] = {
- (groupingExpressions ++ aggregateExpressions ++ propertyExpressions) map {
- case ne: NamedExpression => ne.toAttribute
- case e => Alias(e, e.toString).toAttribute
- }
- }
-
- // resolve references of this operator's parameters
- override def resolveReference(
- tableEnv: TableEnvironment,
- name: String)
- : Option[NamedExpression] = tableEnv match {
- // resolve reference to rowtime attribute in a streaming environment
- case _: StreamTableEnvironment if name == "rowtime" =>
- Some(RowtimeAttribute())
- case _ =>
- window.alias match {
- // resolve reference to this window's alias
- case Some(UnresolvedFieldReference(alias)) if name == alias =>
- // check if reference can already be resolved by input fields
- val found = super.resolveReference(tableEnv, name)
- if (found.isDefined) {
- failValidation(s"Reference $name is ambiguous.")
- } else {
- Some(WindowReference(name))
- }
- case _ =>
- // resolve references as usual
- super.resolveReference(tableEnv, name)
- }
- }
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- val flinkRelBuilder = relBuilder.asInstanceOf[FlinkRelBuilder]
- child.construct(flinkRelBuilder)
- flinkRelBuilder.aggregate(
- window,
- relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
- propertyExpressions.map {
- case Alias(prop: WindowProperty, name, _) => prop.toNamedWindowProperty(name)(relBuilder)
- case _ => throw new RuntimeException("This should never happen.")
- },
- aggregateExpressions.map {
- case Alias(agg: Aggregation, name, _) => agg.toAggCall(name)(relBuilder)
- case _ => throw new RuntimeException("This should never happen.")
- }.asJava)
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- val resolvedWindowAggregate = super.validate(tableEnv).asInstanceOf[WindowAggregate]
- val groupingExprs = resolvedWindowAggregate.groupingExpressions
- val aggregateExprs = resolvedWindowAggregate.aggregateExpressions
- aggregateExprs.foreach(validateAggregateExpression)
- groupingExprs.foreach(validateGroupingExpression)
-
- def validateAggregateExpression(expr: Expression): Unit = expr match {
- // check no nested aggregation exists.
- case aggExpr: Aggregation =>
- aggExpr.children.foreach { child =>
- child.preOrderVisit {
- case agg: Aggregation =>
- failValidation(
- "It's not allowed to use an aggregate function as " +
- "input of another aggregate function")
- case _ => // ok
- }
- }
- case a: Attribute if !groupingExprs.exists(_.checkEquals(a)) =>
- failValidation(
- s"Expression '$a' is invalid because it is neither" +
- " present in group by nor an aggregate function")
- case e if groupingExprs.exists(_.checkEquals(e)) => // ok
- case e => e.children.foreach(validateAggregateExpression)
- }
-
- def validateGroupingExpression(expr: Expression): Unit = {
- if (!expr.resultType.isKeyType) {
- failValidation(
- s"Expression $expr cannot be used as a grouping expression " +
- "because it's not a valid key type which must be hashable and comparable")
- }
- }
-
- // validate window
- resolvedWindowAggregate.window.validate(tableEnv) match {
- case ValidationFailure(msg) =>
- failValidation(s"$window is invalid: $msg")
- case ValidationSuccess => // ok
- }
-
- resolvedWindowAggregate
- }
-}
-
-/**
- * LogicalNode for calling a user-defined table functions.
- *
- * @param functionName function name
- * @param tableFunction table function to be called (might be overloaded)
- * @param parameters actual parameters
- * @param fieldNames output field names
- * @param child child logical node
- */
-case class LogicalTableFunctionCall(
- functionName: String,
- tableFunction: TableFunction[_],
- parameters: Seq[Expression],
- resultType: TypeInformation[_],
- fieldNames: Array[String],
- child: LogicalNode)
- extends UnaryNode {
-
- private val (_, fieldIndexes, fieldTypes) = getFieldInfo(resultType)
- private var evalMethod: Method = _
-
- override def output: Seq[Attribute] = fieldNames.zip(fieldTypes).map {
- case (n, t) => ResolvedFieldReference(n, t)
- }
-
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- val node = super.validate(tableEnv).asInstanceOf[LogicalTableFunctionCall]
- // check if not Scala object
- checkNotSingleton(tableFunction.getClass)
- // check if class could be instantiated
- checkForInstantiation(tableFunction.getClass)
- // look for a signature that matches the input types
- val signature = node.parameters.map(_.resultType)
- val foundMethod = getEvalMethod(tableFunction, signature)
- if (foundMethod.isEmpty) {
- failValidation(
- s"Given parameters of function '$functionName' do not match any signature. \n" +
- s"Actual: ${signatureToString(signature)} \n" +
- s"Expected: ${signaturesToString(tableFunction)}")
- } else {
- node.evalMethod = foundMethod.get
- }
- node
- }
-
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- val fieldIndexes = getFieldInfo(resultType)._2
- val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames, evalMethod)
- val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- val sqlFunction = TableSqlFunction(
- tableFunction.toString,
- tableFunction,
- resultType,
- typeFactory,
- function)
-
- val scan = LogicalTableFunctionScan.create(
- relBuilder.peek().getCluster,
- new util.ArrayList[RelNode](),
- relBuilder.call(sqlFunction, parameters.map(_.toRexNode(relBuilder)).asJava),
- function.getElementType(null),
- function.getRowType(relBuilder.getTypeFactory, null),
- null)
-
- relBuilder.push(scan)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/rel/LogicalWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/rel/LogicalWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/rel/LogicalWindowAggregate.scala
deleted file mode 100644
index 9615168..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/rel/LogicalWindowAggregate.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.logical.rel
-
-import java.util
-
-import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.{Aggregate, AggregateCall}
-import org.apache.calcite.rel.{RelNode, RelShuttle}
-import org.apache.calcite.util.ImmutableBitSet
-import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.api.table.FlinkTypeFactory
-import org.apache.flink.api.table.plan.logical.LogicalWindow
-
-class LogicalWindowAggregate(
- window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty],
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- child: RelNode,
- indicator: Boolean,
- groupSet: ImmutableBitSet,
- groupSets: util.List[ImmutableBitSet],
- aggCalls: util.List[AggregateCall])
- extends Aggregate(
- cluster,
- traitSet,
- child,
- indicator,
- groupSet,
- groupSets,
- aggCalls) {
-
- def getWindow = window
-
- def getNamedProperties = namedProperties
-
- override def copy(
- traitSet: RelTraitSet,
- input: RelNode,
- indicator: Boolean,
- groupSet: ImmutableBitSet,
- groupSets: util.List[ImmutableBitSet],
- aggCalls: util.List[AggregateCall])
- : Aggregate = {
-
- new LogicalWindowAggregate(
- window,
- namedProperties,
- cluster,
- traitSet,
- input,
- indicator,
- groupSet,
- groupSets,
- aggCalls)
- }
-
- override def accept(shuttle: RelShuttle): RelNode = shuttle.visit(this)
-
- override def deriveRowType(): RelDataType = {
- val aggregateRowType = super.deriveRowType()
- val typeFactory = getCluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- val builder = typeFactory.builder
- builder.addAll(aggregateRowType.getFieldList)
- namedProperties.foreach { namedProp =>
- builder.add(
- namedProp.name,
- typeFactory.createTypeFromTypeInfo(namedProp.property.resultType)
- )
- }
- builder.build()
- }
-}
-
-object LogicalWindowAggregate {
-
- def create(
- window: LogicalWindow,
- namedProperties: Seq[NamedWindowProperty],
- aggregate: Aggregate)
- : LogicalWindowAggregate = {
-
- val cluster: RelOptCluster = aggregate.getCluster
- val traitSet: RelTraitSet = cluster.traitSetOf(Convention.NONE)
- new LogicalWindowAggregate(
- window,
- namedProperties,
- cluster,
- traitSet,
- aggregate.getInput,
- aggregate.indicator,
- aggregate.getGroupSet,
- aggregate.getGroupSets,
- aggregate.getAggCallList)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkAggregate.scala
deleted file mode 100644
index 85129c4..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkAggregate.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.AggregateCall
-import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
-
-import scala.collection.JavaConverters._
-
-trait FlinkAggregate {
-
- private[flink] def groupingToString(inputType: RelDataType, grouping: Array[Int]): String = {
-
- val inFields = inputType.getFieldNames.asScala
- grouping.map( inFields(_) ).mkString(", ")
- }
-
- private[flink] def aggregationToString(
- inputType: RelDataType,
- grouping: Array[Int],
- rowType: RelDataType,
- namedAggregates: Seq[CalcitePair[AggregateCall, String]],
- namedProperties: Seq[NamedWindowProperty])
- : String = {
-
- val inFields = inputType.getFieldNames.asScala
- val outFields = rowType.getFieldNames.asScala
-
- val groupStrings = grouping.map( inFields(_) )
-
- val aggs = namedAggregates.map(_.getKey)
- val aggStrings = aggs.map( a => s"${a.getAggregation}(${
- if (a.getArgList.size() > 0) {
- inFields(a.getArgList.get(0))
- } else {
- "*"
- }
- })")
-
- val propStrings = namedProperties.map(_.property.toString)
-
- (groupStrings ++ aggStrings ++ propStrings).zip(outFields).map {
- case (f, o) => if (f == o) {
- f
- } else {
- s"$f AS $o"
- }
- }.mkString(", ")
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
deleted file mode 100644
index d5f8010..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexNode, RexProgram}
-import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.{GeneratedFunction, CodeGenerator}
-import org.apache.flink.api.table.runtime.FlatMapRunner
-import org.apache.flink.api.table.typeutils.TypeConverter._
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-trait FlinkCalc {
-
- private[flink] def functionBody(
- generator: CodeGenerator,
- inputType: TypeInformation[Any],
- rowType: RelDataType,
- calcProgram: RexProgram,
- config: TableConfig,
- expectedType: Option[TypeInformation[Any]]): String = {
-
- val returnType = determineReturnType(
- rowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- val condition = calcProgram.getCondition
- val expandedExpressions = calcProgram.getProjectList.map(
- expr => calcProgram.expandLocalRef(expr))
- val projection = generator.generateResultExpression(
- returnType,
- rowType.getFieldNames,
- expandedExpressions)
-
- // only projection
- if (condition == null) {
- s"""
- |${projection.code}
- |${generator.collectorTerm}.collect(${projection.resultTerm});
- |""".stripMargin
- }
- else {
- val filterCondition = generator.generateExpression(
- calcProgram.expandLocalRef(calcProgram.getCondition))
- // only filter
- if (projection == null) {
- // conversion
- if (inputType != returnType) {
- val conversion = generator.generateConverterResultExpression(
- returnType,
- rowType.getFieldNames)
-
- s"""
- |${filterCondition.code}
- |if (${filterCondition.resultTerm}) {
- | ${conversion.code}
- | ${generator.collectorTerm}.collect(${conversion.resultTerm});
- |}
- |""".stripMargin
- }
- // no conversion
- else {
- s"""
- |${filterCondition.code}
- |if (${filterCondition.resultTerm}) {
- | ${generator.collectorTerm}.collect(${generator.input1Term});
- |}
- |""".stripMargin
- }
- }
- // both filter and projection
- else {
- s"""
- |${filterCondition.code}
- |if (${filterCondition.resultTerm}) {
- | ${projection.code}
- | ${generator.collectorTerm}.collect(${projection.resultTerm});
- |}
- |""".stripMargin
- }
- }
- }
-
- private[flink] def calcMapFunction(
- genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]): RichFlatMapFunction[Any, Any] = {
-
- new FlatMapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
- }
-
- private[flink] def conditionToString(
- calcProgram: RexProgram,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
-
- val cond = calcProgram.getCondition
- val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
- val localExprs = calcProgram.getExprList.asScala.toList
-
- if (cond != null) {
- expression(cond, inFields, Some(localExprs))
- } else {
- ""
- }
- }
-
- private[flink] def selectionToString(
- calcProgram: RexProgram,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = {
-
- val proj = calcProgram.getProjectList.asScala.toList
- val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
- val localExprs = calcProgram.getExprList.asScala.toList
- val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList
-
- proj
- .map(expression(_, inFields, Some(localExprs)))
- .zip(outFields).map { case (e, o) => {
- if (e != o) {
- e + " AS " + o
- } else {
- e
- }
- }
- }.mkString(", ")
- }
-
- private[flink] def calcOpName(
- calcProgram: RexProgram,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
-
- val conditionStr = conditionToString(calcProgram, expression)
- val selectionStr = selectionToString(calcProgram, expression)
-
- s"${if (calcProgram.getCondition != null) {
- s"where: ($conditionStr), "
- } else {
- ""
- }}select: ($selectionStr)"
- }
-
- private[flink] def calcToString(
- calcProgram: RexProgram,
- expression: (RexNode, List[String], Option[List[RexNode]]) => String) = {
-
- val name = calcOpName(calcProgram, expression)
- s"Calc($name)"
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
deleted file mode 100644
index c058265..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan.nodes
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexCall, RexNode}
-import org.apache.calcite.sql.SemiJoinType
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression, GeneratedFunction}
-import org.apache.flink.api.table.codegen.CodeGenUtils.primitiveDefaultValue
-import org.apache.flink.api.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
-import org.apache.flink.api.table.functions.utils.TableSqlFunction
-import org.apache.flink.api.table.runtime.FlatMapRunner
-import org.apache.flink.api.table.typeutils.TypeConverter._
-import org.apache.flink.api.table.{TableConfig, TableException}
-
-import scala.collection.JavaConverters._
-
-/**
- * Join a user-defined table function
- */
-trait FlinkCorrelate {
-
- private[flink] def functionBody(
- generator: CodeGenerator,
- udtfTypeInfo: TypeInformation[Any],
- rowType: RelDataType,
- rexCall: RexCall,
- condition: Option[RexNode],
- config: TableConfig,
- joinType: SemiJoinType,
- expectedType: Option[TypeInformation[Any]]): String = {
-
- val returnType = determineReturnType(
- rowType,
- expectedType,
- config.getNullCheck,
- config.getEfficientTypeUsage)
-
- val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs
-
- val call = generator.generateExpression(rexCall)
- var body =
- s"""
- |${call.code}
- |java.util.Iterator iter = ${call.resultTerm}.getRowsIterator();
- """.stripMargin
-
- if (joinType == SemiJoinType.INNER) {
- // cross join
- body +=
- s"""
- |if (!iter.hasNext()) {
- | return;
- |}
- """.stripMargin
- } else if (joinType == SemiJoinType.LEFT) {
- // left outer join
-
- // in case of left outer join and the returned row of table function is empty,
- // fill all fields of row with null
- val input2NullExprs = input2AccessExprs.map { x =>
- GeneratedExpression(
- primitiveDefaultValue(x.resultType),
- ALWAYS_NULL,
- NO_CODE,
- x.resultType)
- }
- val outerResultExpr = generator.generateResultExpression(
- input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala)
- body +=
- s"""
- |if (!iter.hasNext()) {
- | ${outerResultExpr.code}
- | ${generator.collectorTerm}.collect(${outerResultExpr.resultTerm});
- | return;
- |}
- """.stripMargin
- } else {
- throw TableException(s"Unsupported SemiJoinType: $joinType for correlate join.")
- }
-
- val crossResultExpr = generator.generateResultExpression(
- input1AccessExprs ++ input2AccessExprs,
- returnType,
- rowType.getFieldNames.asScala)
-
- val projection = if (condition.isEmpty) {
- s"""
- |${crossResultExpr.code}
- |${generator.collectorTerm}.collect(${crossResultExpr.resultTerm});
- """.stripMargin
- } else {
- val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo)
- filterGenerator.input1Term = filterGenerator.input2Term
- val filterCondition = filterGenerator.generateExpression(condition.get)
- s"""
- |${filterGenerator.reuseInputUnboxingCode()}
- |${filterCondition.code}
- |if (${filterCondition.resultTerm}) {
- | ${crossResultExpr.code}
- | ${generator.collectorTerm}.collect(${crossResultExpr.resultTerm});
- |}
- |""".stripMargin
- }
-
- val outputTypeClass = udtfTypeInfo.getTypeClass.getCanonicalName
- body +=
- s"""
- |while (iter.hasNext()) {
- | $outputTypeClass ${generator.input2Term} = ($outputTypeClass) iter.next();
- | $projection
- |}
- """.stripMargin
- body
- }
-
- private[flink] def correlateMapFunction(
- genFunction: GeneratedFunction[FlatMapFunction[Any, Any]])
- : FlatMapRunner[Any, Any] = {
-
- new FlatMapRunner[Any, Any](
- genFunction.name,
- genFunction.code,
- genFunction.returnType)
- }
-
- private[flink] def selectToString(rowType: RelDataType): String = {
- rowType.getFieldNames.asScala.mkString(",")
- }
-
- private[flink] def correlateOpName(
- rexCall: RexCall,
- sqlFunction: TableSqlFunction,
- rowType: RelDataType)
- : String = {
-
- s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}"
- }
-
- private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = {
- val udtfName = sqlFunction.getName
- val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",")
- s"table($udtfName($operands))"
- }
-
-}