You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/04/03 10:34:57 UTC

[3/4] flink git commit: [FLINK-1788] [table] Make logical plans transformable

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
new file mode 100644
index 0000000..797de55
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ExtractEquiJoinFields.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.common.typeutils.CompositeType
+
+import scala.collection.mutable
+
+/**
+ * Equi-join field extractor for Join Predicates and CoGroup predicates. The result is a modified
+ * expression without the equi-join predicates together with indices of the join fields
+ * from both the left and right input.
+ */
+object ExtractEquiJoinFields {
+  def apply(leftType: CompositeType[_], rightType: CompositeType[_], predicate: Expression) = {
+
+    val joinFieldsLeft = mutable.MutableList[Int]()
+    val joinFieldsRight = mutable.MutableList[Int]()
+
+    val equiJoinExprs = mutable.MutableList[EqualTo]()
+    // First get all `===` expressions that are not below an `Or`
+    predicate.transformPre {
+      case or@Or(_, _) => NopExpression()
+      case eq@EqualTo(le: ResolvedFieldReference, re: ResolvedFieldReference) =>
+        if (leftType.hasField(le.name) && rightType.hasField(re.name)) {
+          joinFieldsLeft += leftType.getFieldIndex(le.name)
+          joinFieldsRight += rightType.getFieldIndex(re.name)
+        } else if (leftType.hasField(re.name) && rightType.hasField(le.name)) {
+          joinFieldsLeft += leftType.getFieldIndex(re.name)
+          joinFieldsRight += rightType.getFieldIndex(le.name)
+        } else {
+          // not an equi-join predicate
+        }
+        equiJoinExprs += eq
+        eq
+    }
+
+    // then remove the equi join expressions from the predicate
+    val resultExpr = predicate.transformPost {
+      // For OR, we can eliminate the OR since the equi join
+      // predicate is evaluated before the expression is evaluated
+      case or@Or(NopExpression(), _) => NopExpression()
+      case or@Or(_, NopExpression()) => NopExpression()
+      // For AND we replace it with the other expression, since the
+      // equi join predicate will always be true
+      case and@And(NopExpression(), other) => other
+      case and@And(other, NopExpression()) => other
+      case eq : EqualTo if equiJoinExprs.contains(eq) =>
+        NopExpression()
+    }
+
+    (resultExpr, joinFieldsLeft.toArray, joinFieldsRight.toArray)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
new file mode 100644
index 0000000..6c7ecb2
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/GroupByAnalyzer.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table._
+import org.apache.flink.api.table.expressions.{ResolvedFieldReference, Expression}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import scala.collection.mutable
+
+import org.apache.flink.api.table.trees.{Rule, Analyzer}
+
+
+/**
+ * Analyzer for grouping expressions. Only field expressions are allowed as grouping expressions.
+ */
+class GroupByAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
+  extends Analyzer[Expression] {
+
+  def rules = Seq(new ResolveFieldReferences(inputFields), CheckGroupExpression)
+
+  object CheckGroupExpression extends Rule[Expression] {
+
+    def apply(expr: Expression) = {
+      val errors = mutable.MutableList[String]()
+
+      expr match {
+        case f: ResolvedFieldReference => // this is OK
+        case other =>
+          throw new ExpressionException(
+            s"""Invalid grouping expression "$expr". Only field references are allowed.""")
+      }
+      expr
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
new file mode 100644
index 0000000..af8de38
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/InsertAutoCasts.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.common.typeinfo.{IntegerTypeInfo, BasicTypeInfo}
+import org.apache.flink.api.table.trees.Rule
+
+/**
+ * [[Rule]] that adds casts in arithmetic operations.
+ */
+class InsertAutoCasts extends Rule[Expression] {
+
+  def apply(expr: Expression) = {
+    val result = expr.transformPost {
+
+      case plus@Plus(o1, o2) =>
+        // Plus is special case since we can cast anything to String for String concat
+        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) {
+          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            Plus(Cast(o1, o2.typeInfo), o2)
+          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            Plus(o1, Cast(o2, o1.typeInfo))
+          } else if (o1.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
+            Plus(o1, Cast(o2, BasicTypeInfo.STRING_TYPE_INFO))
+          } else if (o2.typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
+            Plus(Cast(o1, BasicTypeInfo.STRING_TYPE_INFO), o2)
+          } else {
+            plus
+          }
+        } else {
+          plus
+        }
+
+      case ba: BinaryExpression if ba.isInstanceOf[BinaryArithmetic] ||
+        ba.isInstanceOf[BinaryComparison] =>
+        val o1 = ba.left
+        val o2 = ba.right
+        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isBasicType && o2.typeInfo.isBasicType) {
+          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
+          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
+          } else {
+            ba
+          }
+        } else {
+          ba
+        }
+
+      case ba: BinaryExpression if ba.isInstanceOf[BitwiseBinaryArithmetic] =>
+        val o1 = ba.left
+        val o2 = ba.right
+        if (o1.typeInfo != o2.typeInfo && o1.typeInfo.isInstanceOf[IntegerTypeInfo[_]] &&
+          o2.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+          if (o1.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o2.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            ba.makeCopy(Seq(Cast(o1, o2.typeInfo), o2))
+          } else if (o2.typeInfo.asInstanceOf[BasicTypeInfo[_]].canCastTo(
+            o1.typeInfo.asInstanceOf[BasicTypeInfo[_]])) {
+            ba.makeCopy(Seq(o1, Cast(o2, o1.typeInfo)))
+          } else {
+            ba
+          }
+        } else {
+          ba
+        }
+    }
+
+    result
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
new file mode 100644
index 0000000..e9236f7
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateAnalyzer.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.trees.Analyzer
+
+/**
+ * Analyzer for predicates, i.e. filter operations and where clauses of joins.
+ */
+class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
+  extends Analyzer[Expression] {
+  def rules = Seq(
+    new ResolveFieldReferences(inputFields),
+    new InsertAutoCasts,
+    new TypeCheck,
+    new VerifyNoAggregates,
+    new VerifyBoolean)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
new file mode 100644
index 0000000..db7ea6c
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.expressions.{ResolvedFieldReference,
+UnresolvedFieldReference, Expression}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table._
+
+import scala.collection.mutable
+
+import org.apache.flink.api.table.trees.Rule
+
+/**
+ * Rule that resolved field references. This rule verifies that field references point to existing
+ * fields of the input operation and creates [[ResolvedFieldReference]]s that hold the field
+ * [[TypeInformation]] in addition to the field name.
+ */
+class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])])
+  extends Rule[Expression] {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPost {
+      case fe@UnresolvedFieldReference(fieldName) =>
+        inputFields.find { _._1 == fieldName } match {
+          case Some((_, tpe)) => ResolvedFieldReference(fieldName, tpe)
+
+          case None =>
+            errors +=
+              s"Field '$fieldName' is not valid for input fields ${inputFields.mkString(",")}"
+            fe
+        }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
new file mode 100644
index 0000000..625fdbf
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/SelectionAnalyzer.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.trees.Analyzer
+
+/**
+ * This analyzes selection expressions.
+ */
+class SelectionAnalyzer(inputFields: Seq[(String, TypeInformation[_])])
+  extends Analyzer[Expression] {
+
+  def rules = Seq(
+    new ResolveFieldReferences(inputFields),
+    new VerifyNoNestedAggregates,
+    new InsertAutoCasts,
+    new TypeCheck)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
new file mode 100644
index 0000000..b724561
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/TypeCheck.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.trees.Rule
+import org.apache.flink.api.table.{_}
+
+import scala.collection.mutable
+
+/**
+ * Rule that makes sure we call [[Expression.typeInfo]] on each [[Expression]] at least once.
+ * Expressions are expected to perform type verification in this method.
+ */
+class TypeCheck extends Rule[Expression] {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPre {
+      case expr: Expression=> {
+        // simply get the typeInfo from the expression. this will perform type analysis
+        try {
+          expr.typeInfo
+        } catch {
+          case e: ExpressionException =>
+            errors += e.getMessage
+        }
+        expr
+      }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
new file mode 100644
index 0000000..e75dd20
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyBoolean.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.expressions.{NopExpression, Expression}
+import org.apache.flink.api.table.trees.Rule
+import org.apache.flink.api.table.{_}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+import scala.collection.mutable
+
+/**
+ * [[Rule]] that verifies that the result type of an [[Expression]] is Boolean. This is required
+ * for filter/join predicates.
+ */
+class VerifyBoolean extends Rule[Expression] {
+
+  def apply(expr: Expression) = {
+    if (!expr.isInstanceOf[NopExpression] && expr.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      throw new ExpressionException(s"Expression $expr of type ${expr.typeInfo} is not boolean.")
+    }
+
+    expr
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
new file mode 100644
index 0000000..09dbf88
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoAggregates.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.expressions.{Aggregation, Expression}
+
+import scala.collection.mutable
+
+import org.apache.flink.api.table.trees.Rule
+
+/**
+ * Rule that verifies that an expression does not contain aggregate operations. Right now, join
+ * predicates and filter predicates cannot contain aggregates.
+ */
+class VerifyNoAggregates extends Rule[Expression] {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPre {
+      case agg: Aggregation=> {
+        errors +=
+          s"""Aggregations are not allowed in join/filter predicates."""
+        agg
+      }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
new file mode 100644
index 0000000..07acf1e
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/VerifyNoNestedAggregates.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.analysis
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.table.expressions.{Expression, Aggregation}
+
+import scala.collection.mutable
+
+import org.apache.flink.api.table.trees.Rule
+
+/**
+ * Rule that verifies that an expression does not contain aggregate operations
+ * as children of aggregate operations.
+ */
+class VerifyNoNestedAggregates extends Rule[Expression] {
+
+  def apply(expr: Expression) = {
+    val errors = mutable.MutableList[String]()
+
+    val result = expr.transformPre {
+      case agg: Aggregation=> {
+        if (agg.child.exists(_.isInstanceOf[Aggregation])) {
+          errors += s"""Found nested aggregation inside "$agg"."""
+        }
+        agg
+      }
+    }
+
+    if (errors.length > 0) {
+      throw new ExpressionException(
+        s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
+    }
+
+    result
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
new file mode 100644
index 0000000..e866ea0
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation}
+
+abstract class BinaryArithmetic extends BinaryExpression { self: Product =>
+  def typeInfo = {
+    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""")
+    }
+    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    left.typeInfo
+  }
+}
+
+case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def typeInfo = {
+    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
+      !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
+      throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this")
+    }
+    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] &&
+      !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) {
+      throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    left.typeInfo
+  }
+
+  override def toString = s"($left + $right)"
+}
+
+case class UnaryMinus(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""")
+    }
+    child.typeInfo
+  }
+
+  override def toString = s"-($child)"
+}
+
+case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left - $right)"
+}
+
+case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left / $right)"
+}
+
+case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left * $right)"
+}
+
+case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left * $right)"
+}
+
+case class Abs(child: Expression) extends UnaryExpression {
+  def typeInfo = child.typeInfo
+
+  override def toString = s"abs($child)"
+}
+
+abstract class BitwiseBinaryArithmetic extends BinaryExpression { self: Product =>
+  def typeInfo: TypeInformation[_] = {
+    if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""")
+    }
+    if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+      left.typeInfo
+    } else {
+      BasicTypeInfo.INT_TYPE_INFO
+    }
+  }
+}
+
+case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
+  override def toString = s"($left & $right)"
+}
+
+case class BitwiseOr(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
+  override def toString = s"($left | $right)"
+}
+
+
+case class BitwiseXor(left: Expression, right: Expression) extends BitwiseBinaryArithmetic {
+  override def toString = s"($left ^ $right)"
+}
+
+case class BitwiseNot(child: Expression) extends UnaryExpression {
+  def typeInfo: TypeInformation[_] = {
+    if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""")
+    }
+    if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+      child.typeInfo
+    } else {
+      BasicTypeInfo.INT_TYPE_INFO
+    }
+  }
+
+  override def toString = s"~($child)"
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
new file mode 100644
index 0000000..31dfdb6
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression {
+  def typeInfo = tpe
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
new file mode 100644
index 0000000..687ea7a
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo}
+
+abstract class BinaryComparison extends BinaryExpression { self: Product =>
+  def typeInfo = {
+    if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(s"Non-numeric operand ${left} in $this")
+    }
+    if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) {
+      throw new ExpressionException(s"Non-numeric operand ${right} in $this")
+    }
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+}
+
+case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
+  override def typeInfo = {
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"$left === $right"
+}
+
+case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
+  override def typeInfo = {
+    if (left.typeInfo != right.typeInfo) {
+      throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"$left !== $right"
+}
+
+case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left > $right"
+}
+
+case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left >= $right"
+}
+
+case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left < $right"
+}
+
+case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left <= $right"
+}
+
+case class IsNull(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"($child).isNull"
+}
+
+case class IsNotNull(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override def toString = s"($child).isNotNull"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
new file mode 100644
index 0000000..a649aed
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+case class UnresolvedFieldReference(override val name: String) extends LeafExpression {
+  def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this")
+
+  override def toString = "\"" + name
+}
+
+case class ResolvedFieldReference(
+    override val name: String,
+    tpe: TypeInformation[_]) extends LeafExpression {
+  def typeInfo = tpe
+
+  override def toString = s"'$name"
+}
+
+case class Naming(child: Expression, override val name: String) extends UnaryExpression {
+  def typeInfo = child.typeInfo
+
+  override def toString = s"$child as '$name"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
new file mode 100644
index 0000000..5654649
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.scala.table.ImplicitExpressionOperations
+
+object Literal {
+  def apply(l: Any): Literal = l match {
+    case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
+    case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
+    case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
+    case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
+    case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
+    case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+  }
+}
+
+case class Literal(value: Any, tpe: TypeInformation[_])
+  extends LeafExpression with ImplicitExpressionOperations {
+  def expr = this
+  def typeInfo = tpe
+
+  override def toString = s"$value"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
new file mode 100644
index 0000000..eaf0463
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+
+abstract class BinaryPredicate extends BinaryExpression { self: Product =>
+  def typeInfo = {
+    if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO ||
+      right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " +
+        s"${right.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+}
+
+case class Not(child: Expression) extends UnaryExpression {
+  def typeInfo = {
+    if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this")
+    }
+    BasicTypeInfo.BOOLEAN_TYPE_INFO
+  }
+
+  override val name = Expression.freshName("not-" + child.name)
+
+  override def toString = s"!($child)"
+}
+
+case class And(left: Expression, right: Expression) extends BinaryPredicate {
+  override def toString = s"$left && $right"
+
+  override val name = Expression.freshName(left.name + "-and-" + right.name)
+}
+
+case class Or(left: Expression, right: Expression) extends BinaryPredicate {
+  override def toString = s"$left || $right"
+
+  override val name = Expression.freshName(left.name + "-or-" + right.name)
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
new file mode 100644
index 0000000..c5c8c94
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * This package contains the base class of AST nodes and all the expression language AST classes.
+ * Expression trees should not be manually constructed by users. They are implicitly constructed
+ * from the implicit DSL conversions in
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API,
+ * expression trees should be generated from a string parser that parses expressions and creates
+ * AST nodes.
+ */
+package object expressions

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
new file mode 100644
index 0000000..a39d601
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.table.ExpressionException
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo}
+
+case class Substring(
+    str: Expression,
+    beginIndex: Expression,
+    endIndex: Expression) extends Expression {
+  def typeInfo = {
+    if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
+      throw new ExpressionException(
+        s"""Operand must be of type String in $this, is ${str.typeInfo}.""")
+    }
+    if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""")
+    }
+    if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
+      throw new ExpressionException(
+        s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""")
+    }
+
+    BasicTypeInfo.STRING_TYPE_INFO
+  }
+
+  override def children: Seq[Expression] = Seq(str, beginIndex, endIndex)
+  override def toString = s"($str).substring($beginIndex, $endIndex)"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala
deleted file mode 100644
index 894dd22..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.operations
-
-import org.apache.flink.api.table.analysis.SelectionAnalyzer
-import org.apache.flink.api.table.tree._
-import org.apache.flink.api.java.aggregation.Aggregations
-
-import scala.collection.mutable
-
-/**
- * This is used to expand a [[Select]] that contains aggregations. If it is called on a [[Select]]
- * without aggregations it is simply returned.
- *
- * This select:
- * {{{
- *   in.select('key, 'value.avg)
- * }}}
- *
- * is transformed to this expansion:
- * {{{
- *   in
- *     .select('key, 'value, Literal(1) as 'intermediate.1)
- *     .aggregate('value.sum, 'intermediate.1.sum)
- *     .select('key, 'value / 'intermediate.1)
- * }}}
- *
- * If the input of the [[Select]] is a [[GroupBy]] this is preserved before the aggregation.
- */
-object ExpandAggregations {
-  def apply(select: Select): Operation = select match {
-    case Select(input, selection) =>
-
-      val aggregations = mutable.HashMap[(Expression, Aggregations), String]()
-      val intermediateFields = mutable.HashSet[Expression]()
-      val aggregationIntermediates = mutable.HashMap[Aggregation, Seq[Expression]]()
-
-      var intermediateCount = 0
-      selection foreach {  f =>
-        f.transformPre {
-          case agg: Aggregation =>
-            val intermediateReferences = agg.getIntermediateFields.zip(agg.getAggregations) map {
-              case (expr, basicAgg) =>
-                aggregations.get((expr, basicAgg)) match {
-                  case Some(intermediateName) =>
-                    ResolvedFieldReference(intermediateName, expr.typeInfo)
-                  case None =>
-                    intermediateCount = intermediateCount + 1
-                    val intermediateName = s"intermediate.$intermediateCount"
-                    intermediateFields += Naming(expr, intermediateName)
-                    aggregations((expr, basicAgg)) = intermediateName
-                    ResolvedFieldReference(intermediateName, expr.typeInfo)
-                }
-            }
-
-            aggregationIntermediates(agg) = intermediateReferences
-            // Return a NOP so that we don't add the children of the aggregation
-            // to intermediate fields. We already added the necessary fields to the list
-            // of intermediate fields.
-            NopExpression()
-
-          case fa: ResolvedFieldReference =>
-            if (!fa.name.startsWith("intermediate")) {
-              intermediateFields += Naming(fa, fa.name)
-            }
-            fa
-        }
-      }
-
-      if (aggregations.isEmpty) {
-        // no aggregations, just return
-        return select
-      }
-
-      // also add the grouping keys to the set of intermediate fields, because we use a Set,
-      // they are only added when not already present
-      input match {
-        case GroupBy(_, groupingFields) =>
-          groupingFields foreach {
-            case fa: ResolvedFieldReference =>
-              intermediateFields += Naming(fa, fa.name)
-          }
-        case _ => // Nothing to add
-      }
-
-      val basicAggregations = aggregations.map {
-        case ((expr, basicAgg), fieldName) =>
-          (fieldName, basicAgg)
-      }
-
-      val finalFields = selection.map {  f =>
-        f.transformPre {
-          case agg: Aggregation =>
-            val intermediates = aggregationIntermediates(agg)
-            agg.getFinalField(intermediates)
-        }
-      }
-
-      val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields)
-      val analyzedIntermediates = intermediateFields.toSeq.map(intermediateAnalyzer.analyze)
-
-      val finalAnalyzer =
-        new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, e.typeInfo)))
-      val analyzedFinals = finalFields.map(finalAnalyzer.analyze)
-
-      val result = input match {
-        case GroupBy(groupByInput, groupingFields) =>
-          Select(
-            Aggregate(
-              GroupBy(
-                Select(groupByInput, analyzedIntermediates),
-                groupingFields),
-              basicAggregations.toSeq),
-            analyzedFinals)
-
-        case _ =>
-          Select(
-            Aggregate(
-              Select(input, analyzedIntermediates),
-              basicAggregations.toSeq),
-            analyzedFinals)
-
-      }
-
-      result
-
-    case _ => select
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala
deleted file mode 100644
index 194edda..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.operations
-
-import java.lang.reflect.Modifier
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.parser.ExpressionParser
-import org.apache.flink.api.table.tree.{Expression, Naming, ResolvedFieldReference, UnresolvedFieldReference}
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-import org.apache.flink.api.table.{ExpressionException, Table}
-
-import scala.language.reflectiveCalls
-
-/**
- * When an [[org.apache.flink.api.table.Table]] is created a [[TableTranslator]] corresponding to
- * the underlying representation (either [[org.apache.flink.api.scala.DataSet]] or
- * [[org.apache.flink.streaming.api.scala.DataStream]] is created. This way, the Table API can be
- * completely agnostic while translation back to the correct API is handled by the API specific
- * [[TableTranslator]].
- */
-abstract class TableTranslator {
-
-  type Representation[A] <: { def getType(): TypeInformation[A] }
-
-  /**
-   * Translates the given Table API [[Operation]] back to the underlying representation, i.e,
-   * a DataSet or a DataStream.
-   */
-  def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): Representation[A]
-
-  /**
-   * Creates a [[Table]] from a DataSet or a DataStream (the underlying representation).
-   */
-  def createTable[A](
-      repr: Representation[A],
-      inputType: CompositeType[A],
-      expressions: Array[Expression],
-      resultFields: Seq[(String, TypeInformation[_])]): Table[this.type]
-
-  /**
-   * Creates a [[Table]] from the given DataSet or DataStream.
-   */
-  def createTable[A](repr: Representation[A]): Table[this.type] = {
-
-    val fields = repr.getType() match {
-      case c: CompositeType[A] => c.getFieldNames.map(UnresolvedFieldReference)
-
-      case tpe => Array() // createTable will throw an exception for this later
-    }
-    createTable(
-      repr,
-      fields.toArray.asInstanceOf[Array[Expression]],
-      checkDeterministicFields = false)
-  }
-
-  /**
-   * Creates a [[Table]] from the given DataSet or DataStream while only taking those
-   * fields mentioned in the field expression.
-   */
-  def createTable[A](repr: Representation[A], expression: String): Table[this.type] = {
-
-    val fields = ExpressionParser.parseExpressionList(expression)
-
-    createTable(repr, fields.toArray, checkDeterministicFields = true)
-  }
-
-  /**
-   * Creates a [[Table]] from the given DataSet or DataStream while only taking those
-   * fields mentioned in the fields parameter.
-   *
-   * When checkDeterministicFields is true check whether the fields of the underlying
-   * [[TypeInformation]] have a deterministic ordering. This is only the case for Tuples
-   * and Case classes. For a POJO, the field order is not obvious, this can lead to problems
-   * when a user renames fields and assumes a certain ordering.
-   */
-  def createTable[A](
-      repr: Representation[A],
-      fields: Array[Expression],
-      checkDeterministicFields: Boolean = true): Table[this.type] = {
-
-    // shortcut for DataSet[Row] or DataStream[Row]
-    repr.getType() match {
-      case rowTypeInfo: RowTypeInfo =>
-        val expressions = rowTypeInfo.getFieldNames map {
-          name => (name, rowTypeInfo.getTypeAt(name))
-        }
-        new Table(
-          Root(repr, expressions), this)
-
-      case c: CompositeType[A] => // us ok
-
-      case tpe => throw new ExpressionException("Only DataSets or DataStreams of composite type" +
-        "can be transformed to a Table. These would be tuples, case classes and " +
-        "POJOs. Type is: " + tpe)
-
-    }
-
-    val clazz = repr.getType().getTypeClass
-    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
-      throw new ExpressionException("Cannot create Table from DataSet or DataStream of type " +
-        clazz.getName + ". Only top-level classes or static members classes " +
-        " are supported.")
-    }
-
-    val inputType = repr.getType().asInstanceOf[CompositeType[A]]
-
-    if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) {
-      throw new ExpressionException(s"You cannot rename fields upon Table creaton: " +
-        s"Field order of input type $inputType is not deterministic." )
-    }
-
-    if (fields.length != inputType.getFieldNames.length) {
-      throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") +
-        "' and number of fields in input type " + inputType + " do not match.")
-    }
-
-    val newFieldNames = fields map {
-      case UnresolvedFieldReference(name) => name
-      case e =>
-        throw new ExpressionException("Only field references allowed in 'as' operation, " +
-          " offending expression: " + e)
-    }
-
-    if (newFieldNames.toSet.size != newFieldNames.size) {
-      throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}")
-    }
-
-    val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map {
-      case (name, index) => (name, inputType.getTypeAt(index))
-    }
-
-    val inputFields = inputType.getFieldNames
-    val fieldMappings = inputFields.zip(resultFields)
-    val expressions: Array[Expression] = fieldMappings map {
-      case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName)
-    }
-
-    createTable(repr, inputType, expressions, resultFields)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala
deleted file mode 100644
index 5b80570..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.operations
-
-import org.apache.flink.api.table.tree.Expression
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.aggregation.Aggregations
-
-/**
- * Base class for all Table API operations.
- */
-sealed abstract class Operation {
-  def outputFields: Seq[(String, TypeInformation[_])]
-}
-
-/**
- * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or
- * [[org.apache.flink.streaming.api.scala.DataStream]] into a [[org.apache.flink.api.table.Table]].
- */
-case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) extends Operation
-
-/**
- * Operation that joins two [[org.apache.flink.api.table.Table]]s. A "filter" and a "select"
- * should be applied after a join operation.
- */
-case class Join(left: Operation, right: Operation) extends Operation {
-  def outputFields = left.outputFields ++ right.outputFields
-
-  override def toString = s"Join($left, $right)"
-}
-
-/**
- * Operation that filters out elements that do not match the predicate expression.
- */
-case class Filter(input: Operation, predicate: Expression) extends Operation {
-  def outputFields = input.outputFields
-
-  override def toString = s"Filter($input, $predicate)"
-}
-
-/**
- * Selection expression. Similar to an SQL SELECT statement. The expressions can select fields
- * and perform arithmetic or logic operations. The expressions can also perform aggregates
- * on fields.
- */
-case class Select(input: Operation, selection: Seq[Expression]) extends Operation {
-  def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) }
-
-  override def toString = s"Select($input, ${selection.mkString(",")})"
-}
-
-/**
- * Operation that gives new names to fields. Use this to disambiguate fields before a join
- * operation.
- */
-case class As(input: Operation, names: Seq[String]) extends Operation {
-  val outputFields = input.outputFields.zip(names) map {
-    case ((_, tpe), newName) => (newName, tpe)
-  }
-
-  override def toString = s"As($input, ${names.mkString(",")})"
-}
-
-/**
- * Grouping operation. Keys are specified using field references. A group by operation os only
- * useful when performing a select with aggregates afterwards.
- * @param input
- * @param fields
- */
-case class GroupBy(input: Operation, fields: Seq[Expression]) extends Operation {
-  def outputFields = input.outputFields
-
-  override def toString = s"GroupBy($input, ${fields.mkString(",")})"
-}
-
-/**
- * Internal operation. Selection operations containing aggregates are expanded to an [[Aggregate]]
- * and a simple [[Select]].
- */
-case class Aggregate(
-    input: Operation,
-    aggregations: Seq[(String, Aggregations)]) extends Operation {
-  def outputFields = input.outputFields
-
-  override def toString = s"Aggregate($input, ${aggregations.mkString(",")})"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala
deleted file mode 100644
index 0f75424..0000000
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * The operations in this package are created by calling methods on [[Table]] they
- * should not be manually created by users of the API.
- */
-package object operations

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
index a0bc2b9..500f39f 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
@@ -18,8 +18,8 @@
 package org.apache.flink.api.table.parser
 
 import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.table.operations.As
-import org.apache.flink.api.table.tree._
+import org.apache.flink.api.table.plan.As
+import org.apache.flink.api.table.expressions._
 
 import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
new file mode 100644
index 0000000..65728c2
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan
+
+import org.apache.flink.api.table.expressions.analysis.SelectionAnalyzer
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.java.aggregation.Aggregations
+
+import scala.collection.mutable
+
+/**
+ * This is used to expand a [[Select]] that contains aggregations. If it is called on a [[Select]]
+ * without aggregations it is simply returned.
+ *
+ * This select:
+ * {{{
+ *   in.select('key, 'value.avg)
+ * }}}
+ *
+ * is transformed to this expansion:
+ * {{{
+ *   in
+ *     .select('key, 'value, Literal(1) as 'intermediate.1)
+ *     .aggregate('value.sum, 'intermediate.1.sum)
+ *     .select('key, 'value / 'intermediate.1)
+ * }}}
+ *
+ * If the input of the [[Select]] is a [[GroupBy]] this is preserved before the aggregation.
+ */
+object ExpandAggregations {
+  def apply(select: Select): PlanNode = select match {
+    case Select(input, selection) =>
+
+      val aggregations = mutable.HashMap[(Expression, Aggregations), String]()
+      val intermediateFields = mutable.HashSet[Expression]()
+      val aggregationIntermediates = mutable.HashMap[Aggregation, Seq[Expression]]()
+
+      var intermediateCount = 0
+      selection foreach {  f =>
+        f.transformPre {
+          case agg: Aggregation =>
+            val intermediateReferences = agg.getIntermediateFields.zip(agg.getAggregations) map {
+              case (expr, basicAgg) =>
+                aggregations.get((expr, basicAgg)) match {
+                  case Some(intermediateName) =>
+                    ResolvedFieldReference(intermediateName, expr.typeInfo)
+                  case None =>
+                    intermediateCount = intermediateCount + 1
+                    val intermediateName = s"intermediate.$intermediateCount"
+                    intermediateFields += Naming(expr, intermediateName)
+                    aggregations((expr, basicAgg)) = intermediateName
+                    ResolvedFieldReference(intermediateName, expr.typeInfo)
+                }
+            }
+
+            aggregationIntermediates(agg) = intermediateReferences
+            // Return a NOP so that we don't add the children of the aggregation
+            // to intermediate fields. We already added the necessary fields to the list
+            // of intermediate fields.
+            NopExpression()
+
+          case fa: ResolvedFieldReference =>
+            if (!fa.name.startsWith("intermediate")) {
+              intermediateFields += Naming(fa, fa.name)
+            }
+            fa
+        }
+      }
+
+      if (aggregations.isEmpty) {
+        // no aggregations, just return
+        return select
+      }
+
+      // also add the grouping keys to the set of intermediate fields, because we use a Set,
+      // they are only added when not already present
+      input match {
+        case GroupBy(_, groupingFields) =>
+          groupingFields foreach {
+            case fa: ResolvedFieldReference =>
+              intermediateFields += Naming(fa, fa.name)
+          }
+        case _ => // Nothing to add
+      }
+
+      val basicAggregations = aggregations.map {
+        case ((expr, basicAgg), fieldName) =>
+          (fieldName, basicAgg)
+      }
+
+      val finalFields = selection.map {  f =>
+        f.transformPre {
+          case agg: Aggregation =>
+            val intermediates = aggregationIntermediates(agg)
+            agg.getFinalField(intermediates)
+        }
+      }
+
+      val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields)
+      val analyzedIntermediates = intermediateFields.toSeq.map(intermediateAnalyzer.analyze)
+
+      val finalAnalyzer =
+        new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, e.typeInfo)))
+      val analyzedFinals = finalFields.map(finalAnalyzer.analyze)
+
+      val result = input match {
+        case GroupBy(groupByInput, groupingFields) =>
+          Select(
+            Aggregate(
+              GroupBy(
+                Select(groupByInput, analyzedIntermediates),
+                groupingFields),
+              basicAggregations.toSeq),
+            analyzedFinals)
+
+        case _ =>
+          Select(
+            Aggregate(
+              Select(input, analyzedIntermediates),
+              basicAggregations.toSeq),
+            analyzedFinals)
+
+      }
+
+      result
+
+    case _ => select
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
new file mode 100644
index 0000000..354c7d4
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan
+
+import java.lang.reflect.Modifier
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.table.parser.ExpressionParser
+import org.apache.flink.api.table.expressions.{Expression, Naming, ResolvedFieldReference, UnresolvedFieldReference}
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+import org.apache.flink.api.table.{ExpressionException, Table}
+
+import scala.language.reflectiveCalls
+
+/**
+ * Base class for translators that transform the logical plan in a [[Table]] to an executable
+ * Flink plan and also for creating a [[Table]] from a DataSet or DataStream.
+ */
+abstract class PlanTranslator {
+
+  type Representation[A] <: { def getType(): TypeInformation[A] }
+
+  /**
+   * Translates the given Table API [[PlanNode]] back to the underlying representation, i.e,
+   * a DataSet or a DataStream.
+   */
+  def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): Representation[A]
+
+  /**
+   * Creates a [[Table]] from a DataSet or a DataStream (the underlying representation).
+   */
+  def createTable[A](
+      repr: Representation[A],
+      inputType: CompositeType[A],
+      expressions: Array[Expression],
+      resultFields: Seq[(String, TypeInformation[_])]): Table
+
+  /**
+   * Creates a [[Table]] from the given DataSet or DataStream.
+   */
+  def createTable[A](repr: Representation[A]): Table = {
+
+    val fields = repr.getType() match {
+      case c: CompositeType[A] => c.getFieldNames.map(UnresolvedFieldReference)
+
+      case tpe => Array() // createTable will throw an exception for this later
+    }
+    createTable(
+      repr,
+      fields.toArray.asInstanceOf[Array[Expression]],
+      checkDeterministicFields = false)
+  }
+
+  /**
+   * Creates a [[Table]] from the given DataSet or DataStream while only taking those
+   * fields mentioned in the field expression.
+   */
+  def createTable[A](repr: Representation[A], expression: String): Table = {
+
+    val fields = ExpressionParser.parseExpressionList(expression)
+
+    createTable(repr, fields.toArray, checkDeterministicFields = true)
+  }
+
+  /**
+   * Creates a [[Table]] from the given DataSet or DataStream while only taking those
+   * fields mentioned in the fields parameter.
+   *
+   * When checkDeterministicFields is true check whether the fields of the underlying
+   * [[TypeInformation]] have a deterministic ordering. This is only the case for Tuples
+   * and Case classes. For a POJO, the field order is not obvious, this can lead to problems
+   * when a user renames fields and assumes a certain ordering.
+   */
+  def createTable[A](
+      repr: Representation[A],
+      fields: Array[Expression],
+      checkDeterministicFields: Boolean = true): Table = {
+
+    // shortcut for DataSet[Row] or DataStream[Row]
+    repr.getType() match {
+      case rowTypeInfo: RowTypeInfo =>
+        val expressions = rowTypeInfo.getFieldNames map {
+          name => (name, rowTypeInfo.getTypeAt(name))
+        }
+        new Table(
+          Root(repr, expressions))
+
+      case c: CompositeType[A] => // us ok
+
+      case tpe => throw new ExpressionException("Only DataSets or DataStreams of composite type" +
+        "can be transformed to a Table. These would be tuples, case classes and " +
+        "POJOs. Type is: " + tpe)
+
+    }
+
+    val clazz = repr.getType().getTypeClass
+    if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) {
+      throw new ExpressionException("Cannot create Table from DataSet or DataStream of type " +
+        clazz.getName + ". Only top-level classes or static members classes " +
+        " are supported.")
+    }
+
+    val inputType = repr.getType().asInstanceOf[CompositeType[A]]
+
+    if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) {
+      throw new ExpressionException(s"You cannot rename fields upon Table creation: " +
+        s"Field order of input type $inputType is not deterministic." )
+    }
+
+    if (fields.length != inputType.getFieldNames.length) {
+      throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") +
+        "' and number of fields in input type " + inputType + " do not match.")
+    }
+
+    val newFieldNames = fields map {
+      case UnresolvedFieldReference(name) => name
+      case e =>
+        throw new ExpressionException("Only field references allowed in 'as' operation, " +
+          " offending expression: " + e)
+    }
+
+    if (newFieldNames.toSet.size != newFieldNames.size) {
+      throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}")
+    }
+
+    val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map {
+      case (name, index) => (name, inputType.getTypeAt(index))
+    }
+
+    val inputFields = inputType.getFieldNames
+    val fieldMappings = inputFields.zip(resultFields)
+    val expressions: Array[Expression] = fieldMappings map {
+      case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName)
+    }
+
+    createTable(repr, inputType, expressions, resultFields)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
new file mode 100644
index 0000000..453acd7
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.plan
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.aggregation.Aggregations
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.trees.TreeNode
+
+/**
+ * Base class for all Table API operations.
+ */
+sealed abstract class PlanNode extends TreeNode[PlanNode] { self: Product =>
+  def outputFields: Seq[(String, TypeInformation[_])]
+}
+
+/**
+ * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or
+ * [[org.apache.flink.streaming.api.scala.DataStream]] into a [[org.apache.flink.api.table.Table]].
+ */
+case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) extends PlanNode {
+  val children = Nil
+}
+
+/**
+ * Operation that joins two [[org.apache.flink.api.table.Table]]s. A "filter" and a "select"
+ * should be applied after a join operation.
+ */
+case class Join(left: PlanNode, right: PlanNode) extends PlanNode {
+
+  val children = Seq(left, right)
+
+  def outputFields = left.outputFields ++ right.outputFields
+
+  override def toString = s"Join($left, $right)"
+}
+
+/**
+ * Operation that filters out elements that do not match the predicate expression.
+ */
+case class Filter(input: PlanNode, predicate: Expression) extends PlanNode {
+
+  val children = Seq(input)
+
+  def outputFields = input.outputFields
+
+  override def toString = s"Filter($input, $predicate)"
+}
+
+/**
+ * Selection expression. Similar to an SQL SELECT statement. The expressions can select fields
+ * and perform arithmetic or logic operations. The expressions can also perform aggregates
+ * on fields.
+ */
+case class Select(input: PlanNode, selection: Seq[Expression]) extends PlanNode {
+
+  val children = Seq(input)
+
+  def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) }
+
+  override def toString = s"Select($input, ${selection.mkString(",")})"
+}
+
+/**
+ * Operation that gives new names to fields. Use this to disambiguate fields before a join
+ * operation.
+ */
+case class As(input: PlanNode, names: Seq[String]) extends PlanNode {
+
+  val children = Seq(input)
+
+  val outputFields = input.outputFields.zip(names) map {
+    case ((_, tpe), newName) => (newName, tpe)
+  }
+
+  override def toString = s"As($input, ${names.mkString(",")})"
+}
+
+/**
+ * Grouping operation. Keys are specified using field references. A group by operation os only
+ * useful when performing a select with aggregates afterwards.
+ * @param input
+ * @param fields
+ */
+case class GroupBy(input: PlanNode, fields: Seq[Expression]) extends PlanNode {
+
+  val children = Seq(input)
+
+  def outputFields = input.outputFields
+
+  override def toString = s"GroupBy($input, ${fields.mkString(",")})"
+}
+
+/**
+ * Internal operation. Selection operations containing aggregates are expanded to an [[Aggregate]]
+ * and a simple [[Select]].
+ */
+case class Aggregate(
+    input: PlanNode,
+    aggregations: Seq[(String, Aggregations)]) extends PlanNode {
+
+  val children = Seq(input)
+
+  def outputFields = input.outputFields
+
+  override def toString = s"Aggregate($input, ${aggregations.mkString(",")})"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2d55cf0d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
new file mode 100644
index 0000000..a598483
--- /dev/null
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table
+
+/**
+ * The operations in this package are created by calling methods on [[Table]] they
+ * should not be manually created by users of the API.
+ */
+package object plan