You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/04/03 10:34:57 UTC
[3/4] flink git commit: [FLINK-1788] [table] Make logical plans
transformable
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
new file mode 100644
index 0000000..797de55
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.common.typeutils.CompositeType
+
+import scala.collection.mutable
+
+/**
+ * Equi-join field extractor for Join Predicates and CoGroup predicates. The result is a modified
+ * expression without the equi-join predicates together with indices of the join fields
+ * from both the left and right input.
+ */
+object ExtractEquiJoinFields {
+ def apply(leftType: CompositeType[_], rightType: CompositeType[_], predicate: Expression) = {
+
+ val joinFieldsLeft = mutable.MutableList[Int]()
+ val joinFieldsRight = mutable.MutableList[Int]()
+
+ val equiJoinExprs = mutable.MutableList[EqualTo]()
+ // First get all `===` expressions that are not below an `Or`
+ predicate.transformPre {
+ case or@Or(_, _) => NopExpression()
+ case eq@EqualTo(le: ResolvedFieldReference, re: ResolvedFieldReference) =>
+ if (leftType.hasField(le.name) && rightType.hasField(re.name)) {
+ joinFieldsLeft += leftType.getFieldIndex(le.name)
+ joinFieldsRight += rightType.getFieldIndex(re.name)
+ } else if (leftType.hasField(re.name) && rightType.hasField(le.name)) {
+ joinFieldsLeft += leftType.getFieldIndex(re.name)
+ joinFieldsRight += rightType.getFieldIndex(le.name)
+ } else {
+ // not an equi-join predicate
+ }
+ equiJoinExprs += eq
+ eq
+ }
+
+ // then remove the equi join expressions from the predicate
+ val resultExpr = predicate.transformPost {
+ // For OR, we can eliminate the OR since the equi join
+ // predicate is evaluated before the expression is evaluated
+ case or@Or(NopExpression(), _) => NopExpression()
+ case or@Or(_, NopExpression()) => NopExpression()
+ // For AND we replace it with the other expression, since the
+ // equi join predicate will always be true
+ case and@And(NopExpression(), other) => other
+ case and@And(other, NopExpression()) => other
+ case eq : EqualTo if equiJoinExprs.contains(eq) =>
+ NopExpression()
+ }
+
+ (resultExpr, joinFieldsLeft.toArray, joinFieldsRight.toArray)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
new file mode 100644
index 0000000..6c7ecb2
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.expressions.{ResolvedFieldReference, Expression}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import scala.collection.mutable
+
+import org.apache.flink.api.table.trees.{Rule, Analyzer}
+
+
+/**
+ * Analyzer for grouping expressions. Only field expressions are allowed as grouping expressions.
+ */
+class GroupByAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
+ extends Analyzer[Expression] {
+
+ def rules = Seq(new ResolveFieldReferences(inputFields), CheckGroupExpression)
+
+ object CheckGroupExpression extends Rule[Expression] {
+
+ def apply(expr: Expression) = {
+ val errors = mutable.MutableList[String]()
+
+ expr match {
+ case f: ResolvedFieldReference => // this is OK
+ case other =>
+ throw new ExpressionException(
+ s"""Invalid grouping expression "$expr". Only field references are allowed.""")
+ }
+ expr
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
new file mode 100644
index 0000000..af8de38
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.common.typeinfo.{IntegerTypeInfo, BasicTypeInfo}
+import org.apache.flink.api.table.trees.Rule
+
+/**
+ * [[Rule]] that adds casts in arithmetic operations.
+ */
+class InsertAutoCasts extends Rule[Expression] {
+
+ def apply(expr: Expression) = {
+ val result = expr.transformPost {
+
+ case plus@Plus(o1, o2) =>
+ // Plus is special case since we can cast anything to String for String concat
+ if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) {
+ if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+ o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+ Plus(Cast(o1, o2.typeInfo), o2)
+ } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+ o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+ Plus(o1, Cast(o2, o1.typeInfo))
+ } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
+ Plus(o1, Cast(o2, BasicTypeInfo.STRING_TYPE_INFO))
+ } else if (o2.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
+ Plus(Cast(o1, BasicTypeInfo.STRING_TYPE_INFO), o2)
+ } else {
+ plus
+ }
+ } else {
+ plus
+ }
+
+ case ba: BinaryExpression if ba.isInstanceOf[BinaryArithmetic] ||
+ ba.isInstanceOf[BinaryComparison] =>
+ val o1 = ba.left
+ val o2 = ba.right
+ if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) {
+ if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+ o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+ ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
+ } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+ o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+ ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
+ } else {
+ ba
+ }
+ } else {
+ ba
+ }
+
+ case ba: BinaryExpression if ba.isInstanceOf[BitwiseBinaryArithmetic] =>
+ val o1 = ba.left
+ val o2 = ba.right
+ if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] &&
+ o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+ if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+ o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+ ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
+ } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+ o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+ ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
+ } else {
+ ba
+ }
+ } else {
+ ba
+ }
+ }
+
+ result
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
new file mode 100644
index 0000000..e9236f7
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.trees.Analyzer
+
+/**
+ * Analyzer for predicates, i.e. filter operations and where clauses of joins.
+ */
+class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
+ extends Analyzer[Expression] {
+ def rules = Seq(
+ new ResolveFieldReferences(inputFields),
+ new InsertAutoCasts,
+ new TypeCheck,
+ new VerifyNoAggregates,
+ new VerifyBoolean)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
new file mode 100644
index 0000000..db7ea6c
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.expressions.{ResolvedFieldReference,
+UnresolvedFieldReference, Expression}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table._
+
+import scala.collection.mutable
+
+import org.apache.flink.api.table.trees.Rule
+
+/**
+ * Rule that resolved field references. This rule verifies that field references point to existing
+ * fields of the input operation and creates [[ResolvedFieldReference]]s that hold the field
+ * [[TypeInformation]] in addition to the field name.
+ */
+class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])])
+ extends Rule[Expression] {
+
+ def apply(expr: Expression) = {
+ val errors = mutable.MutableList[String]()
+
+ val result = expr.transformPost {
+ case fe@UnresolvedFieldReference(fieldName) =>
+ inputFields.find { _._1 == fieldName } match {
+ case Some((_, tpe)) => ResolvedFieldReference(fieldName, tpe)
+
+ case None =>
+ errors +=
+ s"Field '$fieldName' is not valid for input fields ${inputFields.mkString(",")}"
+ fe
+ }
+ }
+
+ if (errors.length > 0) {
+ throw new ExpressionException(
+ s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+ }
+
+ result
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
new file mode 100644
index 0000000..625fdbf
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.trees.Analyzer
+
+/**
+ * This analyzes selection expressions.
+ */
+class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
+ extends Analyzer[Expression] {
+
+ def rules = Seq(
+ new ResolveFieldReferences(inputFields),
+ new VerifyNoNestedAggregates,
+ new InsertAutoCasts,
+ new TypeCheck)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
new file mode 100644
index 0000000..b724561
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.trees.Rule
+import org.apache.flink.api.table.{_}
+
+import scala.collection.mutable
+
+/**
+ * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] at least once.
+ * Expressions are expected to perform type verification in this method.
+ */
+class TypeCheck extends Rule[Expression] {
+
+ def apply(expr: Expression) = {
+ val errors = mutable.MutableList[String]()
+
+ val result = expr.transformPre {
+ case expr: Expression=> {
+ // simply get the typeInfo from the expression. this will perform type analysis
+ try {
+ expr.typeInfo
+ } catch {
+ case e: ExpressionException =>
+ errors += e.getMessage
+ }
+ expr
+ }
+ }
+
+ if (errors.length > 0) {
+ throw new ExpressionException(
+ s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+ }
+
+ result
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
new file mode 100644
index 0000000..e75dd20
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.expressions.{NopExpression, Expression}
+import org.apache.flink.api.table.trees.Rule
+import org.apache.flink.api.table.{_}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+import scala.collection.mutable
+
+/**
+ * [[Rule]] that verifies that the result type of an [[Expression]] is Boolean. This is required
+ * for filter/join predicates.
+ */
+class VerifyBoolean extends Rule[Expression] {
+
+ def apply(expr: Expression) = {
+ if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+ throw new ExpressionException(s"Expression $expr of type ${expr.typeInfo} is not boolean.")
+ }
+
+ expr
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
new file mode 100644
index 0000000..09dbf88
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.expressions.{Aggregation, Expression}
+
+import scala.collection.mutable
+
+import org.apache.flink.api.table.trees.Rule
+
+/**
+ * Rule that verifies that an expression does not contain aggregate operations. Right now, join
+ * predicates and filter predicates cannot contain aggregates.
+ */
+class VerifyNoAggregates extends Rule[Expression] {
+
+ def apply(expr: Expression) = {
+ val errors = mutable.MutableList[String]()
+
+ val result = expr.transformPre {
+ case agg: Aggregation=> {
+ errors +=
+ s"""Aggregations are not allowed in join/filter predicates."""
+ agg
+ }
+ }
+
+ if (errors.length > 0) {
+ throw new ExpressionException(
+ s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+ }
+
+ result
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
new file mode 100644
index 0000000..07acf1e
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.expressions.{Expression, Aggregation}
+
+import scala.collection.mutable
+
+import org.apache.flink.api.table.trees.Rule
+
+/**
+ * Rule that verifies that an expression does not contain aggregate operations
+ * as children of aggregate operations.
+ */
+class VerifyNoNestedAggregates extends Rule[Expression] {
+
+ def apply(expr: Expression) = {
+ val errors = mutable.MutableList[String]()
+
+ val result = expr.transformPre {
+ case agg: Aggregation=> {
+ if (agg.child.exists(_.isInstanceOf[Aggregation])) {
+ errors += s"""Found nested aggregation inside "$agg"."""
+ }
+ agg
+ }
+ }
+
+ if (errors.length > 0) {
+ throw new ExpressionException(
+ s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+ }
+
+ result
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
new file mode 100644
index 0000000..e866ea0
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation}
+
+abstract class BinaryArithmetic extends BinaryExpression { self: Product =>
+ def typeInfo = {
+ if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+ throw new ExpressionException(
+ s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""")
+ }
+ if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+ throw new ExpressionException(
+ s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""")
+ }
+ if (left.typeInfo != right.typeInfo) {
+ throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+ s"${right.typeInfo} in $this")
+ }
+ left.typeInfo
+ }
+}
+
+case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
+ override def typeInfo = {
+ if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
+ !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
+ throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this")
+ }
+ if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
+ !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
+ throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this")
+ }
+ if (left.typeInfo != right.typeInfo) {
+ throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+ s"${right.typeInfo} in $this")
+ }
+ left.typeInfo
+ }
+
+ override def toString = s"($left + $right)"
+}
+
+case class UnaryMinus(child: Expression) extends UnaryExpression {
+ def typeInfo = {
+ if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+ throw new ExpressionException(
+ s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""")
+ }
+ child.typeInfo
+ }
+
+ override def toString = s"-($child)"
+}
+
+case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
+ override def toString = s"($left - $right)"
+}
+
+case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
+ override def toString = s"($left / $right)"
+}
+
+case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
+ override def toString = s"($left * $right)"
+}
+
+case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
+ override def toString = s"($left * $right)"
+}
+
+case class Abs(child: Expression) extends UnaryExpression {
+ def typeInfo = child.typeInfo
+
+ override def toString = s"abs($child)"
+}
+
+abstract class BitwiseBinaryArithmetic extends BinaryExpression { self: Product =>
+ def typeInfo: TypeInformation[_] = {
+ if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+ throw new ExpressionException(
+ s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""")
+ }
+ if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+ throw new ExpressionException(
+ s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""")
+ }
+ if (left.typeInfo != right.typeInfo) {
+ throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+ s"${right.typeInfo} in $this")
+ }
+ if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+ left.typeInfo
+ } else {
+ BasicTypeInfo.INT_TYPE_INFO
+ }
+ }
+}
+
+case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
+ override def toString = s"($left & $right)"
+}
+
+case class BitwiseOr(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
+ override def toString = s"($left | $right)"
+}
+
+
+case class BitwiseXor(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
+ override def toString = s"($left ^ $right)"
+}
+
+case class BitwiseNot(child: Expression) extends UnaryExpression {
+ def typeInfo: TypeInformation[_] = {
+ if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+ throw new ExpressionException(
+ s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""")
+ }
+ if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+ child.typeInfo
+ } else {
+ BasicTypeInfo.INT_TYPE_INFO
+ }
+ }
+
+ override def toString = s"~($child)"
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
new file mode 100644
index 0000000..31dfdb6
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression {
+ def typeInfo = tpe
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
new file mode 100644
index 0000000..687ea7a
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
+
+abstract class BinaryComparison extends BinaryExpression { self: Product =>
+ def typeInfo = {
+ if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+ throw new ExpressionException(s"Non-numeric operand ${left} in $this")
+ }
+ if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+ throw new ExpressionException(s"Non-numeric operand ${right} in $this")
+ }
+ if (left.typeInfo != right.typeInfo) {
+ throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+ s"${right.typeInfo} in $this")
+ }
+ BasicTypeInfo.BOOLEAN_TYPE_INFO
+ }
+}
+
+case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
+ override def typeInfo = {
+ if (left.typeInfo != right.typeInfo) {
+ throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+ s"${right.typeInfo} in $this")
+ }
+ BasicTypeInfo.BOOLEAN_TYPE_INFO
+ }
+
+ override def toString = s"$left === $right"
+}
+
+case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
+ override def typeInfo = {
+ if (left.typeInfo != right.typeInfo) {
+ throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+ s"${right.typeInfo} in $this")
+ }
+ BasicTypeInfo.BOOLEAN_TYPE_INFO
+ }
+
+ override def toString = s"$left !== $right"
+}
+
+case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
+ override def toString = s"$left > $right"
+}
+
+case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+ override def toString = s"$left >= $right"
+}
+
+case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
+ override def toString = s"$left < $right"
+}
+
+case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+ override def toString = s"$left <= $right"
+}
+
+case class IsNull(child: Expression) extends UnaryExpression {
+ def typeInfo = {
+ BasicTypeInfo.BOOLEAN_TYPE_INFO
+ }
+
+ override def toString = s"($child).isNull"
+}
+
+case class IsNotNull(child: Expression) extends UnaryExpression {
+ def typeInfo = {
+ BasicTypeInfo.BOOLEAN_TYPE_INFO
+ }
+
+ override def toString = s"($child).isNotNull"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
new file mode 100644
index 0000000..a649aed
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class UnresolvedFieldReference(override val name: String) extends LeafExpression {
+ def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this")
+
+ override def toString = "\"" + name
+}
+
+case class ResolvedFieldReference(
+ override val name: String,
+ tpe: TypeInformation[_]) extends LeafExpression {
+ def typeInfo = tpe
+
+ override def toString = s"'$name"
+}
+
+case class Naming(child: Expression, override val name: String) extends UnaryExpression {
+ def typeInfo = child.typeInfo
+
+ override def toString = s"$child as '$name"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
new file mode 100644
index 0000000..5654649
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.scala.table.ImplicitExpressionOperations
+
+object Literal {
+ def apply(l: Any): Literal = l match {
+ case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
+ case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
+ case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
+ case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
+ case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
+ case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+ }
+}
+
+case class Literal(value: Any, tpe: TypeInformation[_])
+ extends LeafExpression with ImplicitExpressionOperations {
+ def expr = this
+ def typeInfo = tpe
+
+ override def toString = s"$value"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
new file mode 100644
index 0000000..eaf0463
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+abstract class BinaryPredicate extends BinaryExpression { self: Product =>
+ def typeInfo = {
+ if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO ||
+ right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+ throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " +
+ s"${right.typeInfo} in $this")
+ }
+ BasicTypeInfo.BOOLEAN_TYPE_INFO
+ }
+}
+
+case class Not(child: Expression) extends UnaryExpression {
+ def typeInfo = {
+ if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+ throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this")
+ }
+ BasicTypeInfo.BOOLEAN_TYPE_INFO
+ }
+
+ override val name = Expression.freshName("not-" + child.name)
+
+ override def toString = s"!($child)"
+}
+
+case class And(left: Expression, right: Expression) extends BinaryPredicate {
+ override def toString = s"$left && $right"
+
+ override val name = Expression.freshName(left.name + "-and-" + right.name)
+}
+
+case class Or(left: Expression, right: Expression) extends BinaryPredicate {
+ override def toString = s"$left || $right"
+
+ override val name = Expression.freshName(left.name + "-or-" + right.name)
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
new file mode 100644
index 0000000..c5c8c94
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * This package contains the base class of AST nodes and all the expression language AST classes.
+ * Expression trees should not be manually constructed by users. They are implicitly constructed
+ * from the implicit DSL conversions in
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API,
+ * expression trees should be generated from a string parser that parses expressions and creates
+ * AST nodes.
+ */
+package object expressions
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
new file mode 100644
index 0000000..a39d601
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo}
+
+case class Substring(
+ str: Expression,
+ beginIndex: Expression,
+ endIndex: Expression) extends Expression {
+ def typeInfo = {
+ if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
+ throw new ExpressionException(
+ s"""Operand must be of type String in $this, is ${str.typeInfo}.""")
+ }
+ if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+ throw new ExpressionException(
+ s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""")
+ }
+ if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+ throw new ExpressionException(
+ s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""")
+ }
+
+ BasicTypeInfo.STRING_TYPE_INFO
+ }
+
+ override def children: Seq[Expression] = Seq(str, beginIndex, endIndex)
+ override def toString = s"($str).substring($beginIndex, $endIndex)"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala
deleted file mode 100644
index 894dd22..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.operations
-
-import org.apache.flink.api.table.analysis.SelectionAnalyzer
-import org.apache.flink.api.table.tree._
-import org.apache.flink.api.java.aggregation.Aggregations
-
-import scala.collection.mutable
-
-/**
- * This is used to expand a [[Select]] that contains aggregations. If it is called on a [[Select]]
- * without aggregations it is simply returned.
- *
- * This select:
- * {{{
- * in.select('key, 'value.avg)
- * }}}
- *
- * is transformed to this expansion:
- * {{{
- * in
- * .select('key, 'value, Literal(1) as 'intermediate.1)
- * .aggregate('value.sum, 'intermediate.1.sum)
- * .select('key, 'value / 'intermediate.1)
- * }}}
- *
- * If the input of the [[Select]] is a [[GroupBy]] this is preserved before the aggregation.
- */
-object ExpandAggregations {
- def apply(select: Select): Operation = select match {
- case Select(input, selection) =>
-
- val aggregations = mutable.HashMap[(Expression, Aggregations), String]()
- val intermediateFields = mutable.HashSet[Expression]()
- val aggregationIntermediates = mutable.HashMap[Aggregation, Seq[Expression]]()
-
- var intermediateCount = 0
- selection foreach { f =>
- f.transformPre {
- case agg: Aggregation =>
- val intermediateReferences = agg.getIntermediateFields.zip(agg.getAggregations) map {
- case (expr, basicAgg) =>
- aggregations.get((expr, basicAgg)) match {
- case Some(intermediateName) =>
- ResolvedFieldReference(intermediateName, expr.typeInfo)
- case None =>
- intermediateCount = intermediateCount + 1
- val intermediateName = s"intermediate.$intermediateCount"
- intermediateFields += Naming(expr, intermediateName)
- aggregations((expr, basicAgg)) = intermediateName
- ResolvedFieldReference(intermediateName, expr.typeInfo)
- }
- }
-
- aggregationIntermediates(agg) = intermediateReferences
- // Return a NOP so that we don't add the children of the aggregation
- // to intermediate fields. We already added the necessary fields to the list
- // of intermediate fields.
- NopExpression()
-
- case fa: ResolvedFieldReference =>
- if (!fa.name.startsWith("intermediate")) {
- intermediateFields += Naming(fa, fa.name)
- }
- fa
- }
- }
-
- if (aggregations.isEmpty) {
- // no aggregations, just return
- return select
- }
-
- // also add the grouping keys to the set of intermediate fields, because we use a Set,
- // they are only added when not already present
- input match {
- case GroupBy(_, groupingFields) =>
- groupingFields foreach {
- case fa: ResolvedFieldReference =>
- intermediateFields += Naming(fa, fa.name)
- }
- case _ => // Nothing to add
- }
-
- val basicAggregations = aggregations.map {
- case ((expr, basicAgg), fieldName) =>
- (fieldName, basicAgg)
- }
-
- val finalFields = selection.map { f =>
- f.transformPre {
- case agg: Aggregation =>
- val intermediates = aggregationIntermediates(agg)
- agg.getFinalField(intermediates)
- }
- }
-
- val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields)
- val analyzedIntermediates = intermediateFields.toSeq.map(intermediateAnalyzer.analyze)
-
- val finalAnalyzer =
- new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, e.typeInfo)))
- val analyzedFinals = finalFields.map(finalAnalyzer.analyze)
-
- val result = input match {
- case GroupBy(groupByInput, groupingFields) =>
- Select(
- Aggregate(
- GroupBy(
- Select(groupByInput, analyzedIntermediates),
- groupingFields),
- basicAggregations.toSeq),
- analyzedFinals)
-
- case _ =>
- Select(
- Aggregate(
- Select(input, analyzedIntermediates),
- basicAggregations.toSeq),
- analyzedFinals)
-
- }
-
- result
-
- case _ => select
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala
deleted file mode 100644
index 194edda..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.operations
-
-import java.lang.reflect.Modifier
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.parser.ExpressionParser
-import org.apache.flink.api.table.tree.{Expression, Naming, ResolvedFieldReference, UnresolvedFieldReference}
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-import org.apache.flink.api.table.{ExpressionException, Table}
-
-import scala.language.reflectiveCalls
-
-/**
- * When an [[org.apache.flink.api.table.Table]] is created a [[TableTranslator]] corresponding to
- * the underlying representation (either [[org.apache.flink.api.scala.DataSet]] or
- * [[org.apache.flink.streaming.api.scala.DataStream]] is created. This way, the Table API can be
- * completely agnostic while translation back to the correct API is handled by the API specific
- * [[TableTranslator]].
- */
-abstract class TableTranslator {
-
- type Representation[A] <: { def getType(): TypeInformation[A] }
-
- /**
- * Translates the given Table API [[Operation]] back to the underlying representation, i.e,
- * a DataSet or a DataStream.
- */
- def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): Representation[A]
-
- /**
- * Creates a [[Table]] from a DataSet or a DataStream (the underlying representation).
- */
- def createTable[A](
- repr: Representation[A],
- inputType: CompositeType[A],
- expressions: Array[Expression],
- resultFields: Seq[(String, TypeInformation[_])]): Table[this.type]
-
- /**
- * Creates a [[Table]] from the given DataSet or DataStream.
- */
- def createTable[A](repr: Representation[A]): Table[this.type] = {
-
- val fields = repr.getType() match {
- case c: CompositeType[A] => c.getFieldNames.map(UnresolvedFieldReference)
-
- case tpe => Array() // createTable will throw an exception for this later
- }
- createTable(
- repr,
- fields.toArray.asInstanceOf[Array[Expression]],
- checkDeterministicFields = false)
- }
-
- /**
- * Creates a [[Table]] from the given DataSet or DataStream while only taking those
- * fields mentioned in the field expression.
- */
- def createTable[A](repr: Representation[A], expression: String): Table[this.type] = {
-
- val fields = ExpressionParser.parseExpressionList(expression)
-
- createTable(repr, fields.toArray, checkDeterministicFields = true)
- }
-
- /**
- * Creates a [[Table]] from the given DataSet or DataStream while only taking those
- * fields mentioned in the fields parameter.
- *
- * When checkDeterministicFields is true check whether the fields of the underlying
- * [[TypeInformation]] have a deterministic ordering. This is only the case for Tuples
- * and Case classes. For a POJO, the field order is not obvious, this can lead to problems
- * when a user renames fields and assumes a certain ordering.
- */
- def createTable[A](
- repr: Representation[A],
- fields: Array[Expression],
- checkDeterministicFields: Boolean = true): Table[this.type] = {
-
- // shortcut for DataSet[Row] or DataStream[Row]
- repr.getType() match {
- case rowTypeInfo: RowTypeInfo =>
- val expressions = rowTypeInfo.getFieldNames map {
- name => (name, rowTypeInfo.getTypeAt(name))
- }
- new Table(
- Root(repr, expressions), this)
-
- case c: CompositeType[A] => // us ok
-
- case tpe => throw new ExpressionException("Only DataSets or DataStreams of composite type" +
- "can be transformed to a Table. These would be tuples, case classes and " +
- "POJOs. Type is: " + tpe)
-
- }
-
- val clazz = repr.getType().getTypeClass
- if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
- throw new ExpressionException("Cannot create Table from DataSet or DataStream of type " +
- clazz.getName + ". Only top-level classes or static members classes " +
- " are supported.")
- }
-
- val inputType = repr.getType().asInstanceOf[CompositeType[A]]
-
- if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) {
- throw new ExpressionException(s"You cannot rename fields upon Table creaton: " +
- s"Field order of input type $inputType is not deterministic." )
- }
-
- if (fields.length != inputType.getFieldNames.length) {
- throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") +
- "' and number of fields in input type " + inputType + " do not match.")
- }
-
- val newFieldNames = fields map {
- case UnresolvedFieldReference(name) => name
- case e =>
- throw new ExpressionException("Only field references allowed in 'as' operation, " +
- " offending expression: " + e)
- }
-
- if (newFieldNames.toSet.size != newFieldNames.size) {
- throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}")
- }
-
- val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map {
- case (name, index) => (name, inputType.getTypeAt(index))
- }
-
- val inputFields = inputType.getFieldNames
- val fieldMappings = inputFields.zip(resultFields)
- val expressions: Array[Expression] = fieldMappings map {
- case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName)
- }
-
- createTable(repr, inputType, expressions, resultFields)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala
deleted file mode 100644
index 5b80570..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.operations
-
-import org.apache.flink.api.table.tree.Expression
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.aggregation.Aggregations
-
-/**
- * Base class for all Table API operations.
- */
-sealed abstract class Operation {
- def outputFields: Seq[(String, TypeInformation[_])]
-}
-
-/**
- * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or
- * [[org.apache.flink.streaming.api.scala.DataStream]] into a [[org.apache.flink.api.table.Table]].
- */
-case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) extends Operation
-
-/**
- * Operation that joins two [[org.apache.flink.api.table.Table]]s. A "filter" and a "select"
- * should be applied after a join operation.
- */
-case class Join(left: Operation, right: Operation) extends Operation {
- def outputFields = left.outputFields ++ right.outputFields
-
- override def toString = s"Join($left, $right)"
-}
-
-/**
- * Operation that filters out elements that do not match the predicate expression.
- */
-case class Filter(input: Operation, predicate: Expression) extends Operation {
- def outputFields = input.outputFields
-
- override def toString = s"Filter($input, $predicate)"
-}
-
-/**
- * Selection expression. Similar to an SQL SELECT statement. The expressions can select fields
- * and perform arithmetic or logic operations. The expressions can also perform aggregates
- * on fields.
- */
-case class Select(input: Operation, selection: Seq[Expression]) extends Operation {
- def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) }
-
- override def toString = s"Select($input, ${selection.mkString(",")})"
-}
-
-/**
- * Operation that gives new names to fields. Use this to disambiguate fields before a join
- * operation.
- */
-case class As(input: Operation, names: Seq[String]) extends Operation {
- val outputFields = input.outputFields.zip(names) map {
- case ((_, tpe), newName) => (newName, tpe)
- }
-
- override def toString = s"As($input, ${names.mkString(",")})"
-}
-
-/**
- * Grouping operation. Keys are specified using field references. A group by operation os only
- * useful when performing a select with aggregates afterwards.
- * @param input
- * @param fields
- */
-case class GroupBy(input: Operation, fields: Seq[Expression]) extends Operation {
- def outputFields = input.outputFields
-
- override def toString = s"GroupBy($input, ${fields.mkString(",")})"
-}
-
-/**
- * Internal operation. Selection operations containing aggregates are expanded to an [[Aggregate]]
- * and a simple [[Select]].
- */
-case class Aggregate(
- input: Operation,
- aggregations: Seq[(String, Aggregations)]) extends Operation {
- def outputFields = input.outputFields
-
- override def toString = s"Aggregate($input, ${aggregations.mkString(",")})"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala
deleted file mode 100644
index 0f75424..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * The operations in this package are created by calling methods on [[Table]] they
- * should not be manually created by users of the API.
- */
-package object operations
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
index a0bc2b9..500f39f 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
@@ -18,8 +18,8 @@
package org.apache.flink.api.table.parser
import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.table.operations.As
-import org.apache.flink.api.table.tree._
+import org.apache.flink.api.table.plan.As
+import org.apache.flink.api.table.expressions._
import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
new file mode 100644
index 0000000..65728c2
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan
+
+import org.apache.flink.api.table.expressions.analysis.SelectionAnalyzer
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.java.aggregation.Aggregations
+
+import scala.collection.mutable
+
+/**
+ * This is used to expand a [[Select]] that contains aggregations. If it is called on a [[Select]]
+ * without aggregations it is simply returned.
+ *
+ * This select:
+ * {{{
+ * in.select('key, 'value.avg)
+ * }}}
+ *
+ * is transformed to this expansion:
+ * {{{
+ * in
+ * .select('key, 'value, Literal(1) as 'intermediate.1)
+ * .aggregate('value.sum, 'intermediate.1.sum)
+ * .select('key, 'value / 'intermediate.1)
+ * }}}
+ *
+ * If the input of the [[Select]] is a [[GroupBy]] this is preserved before the aggregation.
+ */
+object ExpandAggregations {
+ def apply(select: Select): PlanNode = select match {
+ case Select(input, selection) =>
+
+ val aggregations = mutable.HashMap[(Expression, Aggregations), String]()
+ val intermediateFields = mutable.HashSet[Expression]()
+ val aggregationIntermediates = mutable.HashMap[Aggregation, Seq[Expression]]()
+
+ var intermediateCount = 0
+ selection foreach { f =>
+ f.transformPre {
+ case agg: Aggregation =>
+ val intermediateReferences = agg.getIntermediateFields.zip(agg.getAggregations) map {
+ case (expr, basicAgg) =>
+ aggregations.get((expr, basicAgg)) match {
+ case Some(intermediateName) =>
+ ResolvedFieldReference(intermediateName, expr.typeInfo)
+ case None =>
+ intermediateCount = intermediateCount + 1
+ val intermediateName = s"intermediate.$intermediateCount"
+ intermediateFields += Naming(expr, intermediateName)
+ aggregations((expr, basicAgg)) = intermediateName
+ ResolvedFieldReference(intermediateName, expr.typeInfo)
+ }
+ }
+
+ aggregationIntermediates(agg) = intermediateReferences
+ // Return a NOP so that we don't add the children of the aggregation
+ // to intermediate fields. We already added the necessary fields to the list
+ // of intermediate fields.
+ NopExpression()
+
+ case fa: ResolvedFieldReference =>
+ if (!fa.name.startsWith("intermediate")) {
+ intermediateFields += Naming(fa, fa.name)
+ }
+ fa
+ }
+ }
+
+ if (aggregations.isEmpty) {
+ // no aggregations, just return
+ return select
+ }
+
+ // also add the grouping keys to the set of intermediate fields, because we use a Set,
+ // they are only added when not already present
+ input match {
+ case GroupBy(_, groupingFields) =>
+ groupingFields foreach {
+ case fa: ResolvedFieldReference =>
+ intermediateFields += Naming(fa, fa.name)
+ }
+ case _ => // Nothing to add
+ }
+
+ val basicAggregations = aggregations.map {
+ case ((expr, basicAgg), fieldName) =>
+ (fieldName, basicAgg)
+ }
+
+ val finalFields = selection.map { f =>
+ f.transformPre {
+ case agg: Aggregation =>
+ val intermediates = aggregationIntermediates(agg)
+ agg.getFinalField(intermediates)
+ }
+ }
+
+ val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields)
+ val analyzedIntermediates = intermediateFields.toSeq.map(intermediateAnalyzer.analyze)
+
+ val finalAnalyzer =
+ new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, e.typeInfo)))
+ val analyzedFinals = finalFields.map(finalAnalyzer.analyze)
+
+ val result = input match {
+ case GroupBy(groupByInput, groupingFields) =>
+ Select(
+ Aggregate(
+ GroupBy(
+ Select(groupByInput, analyzedIntermediates),
+ groupingFields),
+ basicAggregations.toSeq),
+ analyzedFinals)
+
+ case _ =>
+ Select(
+ Aggregate(
+ Select(input, analyzedIntermediates),
+ basicAggregations.toSeq),
+ analyzedFinals)
+
+ }
+
+ result
+
+ case _ => select
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
new file mode 100644
index 0000000..354c7d4
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan
+
+import java.lang.reflect.Modifier
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.parser.ExpressionParser
+import org.apache.flink.api.table.expressions.{Expression, Naming, ResolvedFieldReference, UnresolvedFieldReference}
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.table.{ExpressionException, Table}
+
+import scala.language.reflectiveCalls
+
+/**
+ * Base class for translators that transform the logical plan in a [[Table]] to an executable
+ * Flink plan and also for creating a [[Table]] from a DataSet or DataStream.
+ */
+abstract class PlanTranslator {
+
+ type Representation[A] <: { def getType(): TypeInformation[A] }
+
+ /**
+ * Translates the given Table API [[PlanNode]] back to the underlying representation, i.e,
+ * a DataSet or a DataStream.
+ */
+ def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): Representation[A]
+
+ /**
+ * Creates a [[Table]] from a DataSet or a DataStream (the underlying representation).
+ */
+ def createTable[A](
+ repr: Representation[A],
+ inputType: CompositeType[A],
+ expressions: Array[Expression],
+ resultFields: Seq[(String, TypeInformation[_])]): Table
+
+ /**
+ * Creates a [[Table]] from the given DataSet or DataStream.
+ */
+ def createTable[A](repr: Representation[A]): Table = {
+
+ val fields = repr.getType() match {
+ case c: CompositeType[A] => c.getFieldNames.map(UnresolvedFieldReference)
+
+ case tpe => Array() // createTable will throw an exception for this later
+ }
+ createTable(
+ repr,
+ fields.toArray.asInstanceOf[Array[Expression]],
+ checkDeterministicFields = false)
+ }
+
+ /**
+ * Creates a [[Table]] from the given DataSet or DataStream while only taking those
+ * fields mentioned in the field expression.
+ */
+ def createTable[A](repr: Representation[A], expression: String): Table = {
+
+ val fields = ExpressionParser.parseExpressionList(expression)
+
+ createTable(repr, fields.toArray, checkDeterministicFields = true)
+ }
+
+ /**
+ * Creates a [[Table]] from the given DataSet or DataStream while only taking those
+ * fields mentioned in the fields parameter.
+ *
+ * When checkDeterministicFields is true check whether the fields of the underlying
+ * [[TypeInformation]] have a deterministic ordering. This is only the case for Tuples
+ * and Case classes. For a POJO, the field order is not obvious, this can lead to problems
+ * when a user renames fields and assumes a certain ordering.
+ */
+ def createTable[A](
+ repr: Representation[A],
+ fields: Array[Expression],
+ checkDeterministicFields: Boolean = true): Table = {
+
+ // shortcut for DataSet[Row] or DataStream[Row]
+ repr.getType() match {
+ case rowTypeInfo: RowTypeInfo =>
+ val expressions = rowTypeInfo.getFieldNames map {
+ name => (name, rowTypeInfo.getTypeAt(name))
+ }
+ new Table(
+ Root(repr, expressions))
+
+ case c: CompositeType[A] => // us ok
+
+ case tpe => throw new ExpressionException("Only DataSets or DataStreams of composite type" +
+ "can be transformed to a Table. These would be tuples, case classes and " +
+ "POJOs. Type is: " + tpe)
+
+ }
+
+ val clazz = repr.getType().getTypeClass
+ if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+ throw new ExpressionException("Cannot create Table from DataSet or DataStream of type " +
+ clazz.getName + ". Only top-level classes or static members classes " +
+ " are supported.")
+ }
+
+ val inputType = repr.getType().asInstanceOf[CompositeType[A]]
+
+ if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) {
+ throw new ExpressionException(s"You cannot rename fields upon Table creation: " +
+ s"Field order of input type $inputType is not deterministic." )
+ }
+
+ if (fields.length != inputType.getFieldNames.length) {
+ throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") +
+ "' and number of fields in input type " + inputType + " do not match.")
+ }
+
+ val newFieldNames = fields map {
+ case UnresolvedFieldReference(name) => name
+ case e =>
+ throw new ExpressionException("Only field references allowed in 'as' operation, " +
+ " offending expression: " + e)
+ }
+
+ if (newFieldNames.toSet.size != newFieldNames.size) {
+ throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}")
+ }
+
+ val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map {
+ case (name, index) => (name, inputType.getTypeAt(index))
+ }
+
+ val inputFields = inputType.getFieldNames
+ val fieldMappings = inputFields.zip(resultFields)
+ val expressions: Array[Expression] = fieldMappings map {
+ case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName)
+ }
+
+ createTable(repr, inputType, expressions, resultFields)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
new file mode 100644
index 0000000..453acd7
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.trees.TreeNode
+
+/**
+ * Base class for all Table API operations.
+ */
+sealed abstract class PlanNode extends TreeNode[PlanNode] { self: Product =>
+ def outputFields: Seq[(String, TypeInformation[_])]
+}
+
+/**
+ * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or
+ * [[org.apache.flink.streaming.api.scala.DataStream]] into a [[org.apache.flink.api.table.Table]].
+ */
+case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) extends PlanNode {
+ val children = Nil
+}
+
+/**
+ * Operation that joins two [[org.apache.flink.api.table.Table]]s. A "filter" and a "select"
+ * should be applied after a join operation.
+ */
+case class Join(left: PlanNode, right: PlanNode) extends PlanNode {
+
+ val children = Seq(left, right)
+
+ def outputFields = left.outputFields ++ right.outputFields
+
+ override def toString = s"Join($left, $right)"
+}
+
+/**
+ * Operation that filters out elements that do not match the predicate expression.
+ */
+case class Filter(input: PlanNode, predicate: Expression) extends PlanNode {
+
+ val children = Seq(input)
+
+ def outputFields = input.outputFields
+
+ override def toString = s"Filter($input, $predicate)"
+}
+
+/**
+ * Selection expression. Similar to an SQL SELECT statement. The expressions can select fields
+ * and perform arithmetic or logic operations. The expressions can also perform aggregates
+ * on fields.
+ */
+case class Select(input: PlanNode, selection: Seq[Expression]) extends PlanNode {
+
+ val children = Seq(input)
+
+ def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) }
+
+ override def toString = s"Select($input, ${selection.mkString(",")})"
+}
+
+/**
+ * Operation that gives new names to fields. Use this to disambiguate fields before a join
+ * operation.
+ */
+case class As(input: PlanNode, names: Seq[String]) extends PlanNode {
+
+ val children = Seq(input)
+
+ val outputFields = input.outputFields.zip(names) map {
+ case ((_, tpe), newName) => (newName, tpe)
+ }
+
+ override def toString = s"As($input, ${names.mkString(",")})"
+}
+
+/**
+ * Grouping operation. Keys are specified using field references. A group by operation os only
+ * useful when performing a select with aggregates afterwards.
+ * @param input
+ * @param fields
+ */
+case class GroupBy(input: PlanNode, fields: Seq[Expression]) extends PlanNode {
+
+ val children = Seq(input)
+
+ def outputFields = input.outputFields
+
+ override def toString = s"GroupBy($input, ${fields.mkString(",")})"
+}
+
+/**
+ * Internal operation. Selection operations containing aggregates are expanded to an [[Aggregate]]
+ * and a simple [[Select]].
+ */
+case class Aggregate(
+ input: PlanNode,
+ aggregations: Seq[(String, Aggregations)]) extends PlanNode {
+
+ val children = Seq(input)
+
+ def outputFields = input.outputFields
+
+ override def toString = s"Aggregate($input, ${aggregations.mkString(",")})"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
new file mode 100644
index 0000000..a598483
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * The operations in this package are created by calling methods on [[Table]] they
+ * should not be manually created by users of the API.
+ */
+package object plan