You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/04/12 18:50:27 UTC

flink git commit: [FLINK-3736] [tableAPI] Move toRexNode logic into each expression's implementation.

Repository: flink
Updated Branches:
  refs/heads/master 427de663c -> ba46ab6b6


[FLINK-3736] [tableAPI] Move toRexNode logic into each expression's implementation.

This closes #1870


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba46ab6b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba46ab6b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba46ab6b

Branch: refs/heads/master
Commit: ba46ab6b659ffca60ea4a7b69f637622b9eb000c
Parents: 427de66
Author: Yijie Shen <he...@gmail.com>
Authored: Tue Apr 12 01:33:15 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Apr 12 18:49:22 2016 +0200

----------------------------------------------------------------------
 .../api/table/expressions/Expression.scala      |  19 ++-
 .../api/table/expressions/aggregations.scala    |  34 ++++
 .../api/table/expressions/arithmetic.scala      |  49 +++++-
 .../flink/api/table/expressions/call.scala      |  33 ++++
 .../flink/api/table/expressions/cast.scala      |   8 +
 .../api/table/expressions/comparison.scala      |  35 +++-
 .../api/table/expressions/fieldExpression.scala |  15 +-
 .../flink/api/table/expressions/literals.scala  |   7 +
 .../flink/api/table/expressions/logic.scala     |  16 ++
 .../api/table/plan/RexNodeTranslator.scala      | 162 +------------------
 .../org/apache/flink/api/table/table.scala      |  26 ++-
 .../scala/table/test/AggregationsITCase.scala   |   2 +-
 .../table/test/utils/ExpressionEvaluator.scala  |   4 +-
 13 files changed, 221 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
index cd278d0..6960a9f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
@@ -18,30 +18,39 @@
 package org.apache.flink.api.table.expressions
 
 import java.util.concurrent.atomic.AtomicInteger
-import scala.language.postfixOps
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
 
 abstract class Expression extends TreeNode[Expression] { self: Product =>
   def name: String = Expression.freshName("expression")
+
+  /**
+    * Convert Expression to its counterpart in Calcite, i.e. RexNode
+    */
+  def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+    throw new UnsupportedOperationException(
+      s"${this.getClass.getName} cannot be transformed to RexNode"
+    )
 }
 
-abstract class BinaryExpression() extends Expression { self: Product =>
+abstract class BinaryExpression extends Expression { self: Product =>
   def left: Expression
   def right: Expression
   def children = Seq(left, right)
 }
 
-abstract class UnaryExpression() extends Expression { self: Product =>
+abstract class UnaryExpression extends Expression { self: Product =>
   def child: Expression
   def children = Seq(child)
 }
 
-abstract class LeafExpression() extends Expression { self: Product =>
+abstract class LeafExpression extends Expression { self: Product =>
   val children = Nil
 }
 
 case class NopExpression() extends LeafExpression {
   override val name = Expression.freshName("nop")
-
 }
 
 object Expression {

http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
index d9d5fa8..8cd9dc3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
@@ -17,26 +17,60 @@
  */
 package org.apache.flink.api.table.expressions
 
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.tools.RelBuilder.AggCall
+
 abstract sealed class Aggregation extends UnaryExpression { self: Product =>
+
   override def toString = s"Aggregate($child)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+    throw new UnsupportedOperationException("Aggregate cannot be transformed to RexNode")
+
+  /**
+    * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
+    */
+  def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall
 }
 
 case class Sum(child: Expression) extends Aggregation {
   override def toString = s"($child).sum"
+
+  override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, null, name, child.toRexNode)
+  }
 }
 
 case class Min(child: Expression) extends Aggregation {
   override def toString = s"($child).min"
+
+  override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, null, name, child.toRexNode)
+  }
 }
 
 case class Max(child: Expression) extends Aggregation {
   override def toString = s"($child).max"
+
+  override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, null, name, child.toRexNode)
+  }
 }
 
 case class Count(child: Expression) extends Aggregation {
   override def toString = s"($child).count"
+
+  override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, name, child.toRexNode)
+  }
 }
 
 case class Avg(child: Expression) extends Aggregation {
   override def toString = s"($child).avg"
+
+  override def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, null, name, child.toRexNode)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
index b0bfa86..ca67697 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
@@ -17,28 +17,75 @@
  */
 package org.apache.flink.api.table.expressions
 
-abstract class BinaryArithmetic extends BinaryExpression { self: Product => }
+import scala.collection.JavaConversions._
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.typeutils.TypeConverter
+
+abstract class BinaryArithmetic extends BinaryExpression { self: Product =>
+  def sqlOperator: SqlOperator
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(sqlOperator, children.map(_.toRexNode))
+  }
+}
 
 case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
   override def toString = s"($left + $right)"
+
+  val sqlOperator = SqlStdOperatorTable.PLUS
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val l = left.toRexNode
+    val r = right.toRexNode
+    if(SqlTypeName.STRING_TYPES.contains(l.getType.getSqlTypeName)) {
+      val cast: RexNode = relBuilder.cast(r,
+        TypeConverter.typeInfoToSqlType(BasicTypeInfo.STRING_TYPE_INFO))
+      relBuilder.call(SqlStdOperatorTable.PLUS, l, cast)
+    } else if(SqlTypeName.STRING_TYPES.contains(r.getType.getSqlTypeName)) {
+      val cast: RexNode = relBuilder.cast(l,
+        TypeConverter.typeInfoToSqlType(BasicTypeInfo.STRING_TYPE_INFO))
+      relBuilder.call(SqlStdOperatorTable.PLUS, cast, r)
+    } else {
+      relBuilder.call(SqlStdOperatorTable.PLUS, l, r)
+    }
+  }
 }
 
 case class UnaryMinus(child: Expression) extends UnaryExpression {
   override def toString = s"-($child)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.UNARY_MINUS, child.toRexNode)
+  }
 }
 
 case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
   override def toString = s"($left - $right)"
+
+  val sqlOperator = SqlStdOperatorTable.MINUS
 }
 
 case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
   override def toString = s"($left / $right)"
+
+  val sqlOperator = SqlStdOperatorTable.DIVIDE
 }
 
 case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
   override def toString = s"($left * $right)"
+
+  val sqlOperator = SqlStdOperatorTable.MULTIPLY
 }
 
 case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
   override def toString = s"($left % $right)"
+
+  val sqlOperator = SqlStdOperatorTable.MOD
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
index 9f74414..280d213 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
@@ -17,6 +17,13 @@
  */
 package org.apache.flink.api.table.expressions
 
+import scala.collection.JavaConversions._
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+
 /**
   * General expression for unresolved function calls. The function can be a built-in
   * scalar function or a user-defined scalar function.
@@ -25,6 +32,12 @@ case class Call(functionName: String, args: Expression*) extends Expression {
 
   override def children: Seq[Expression] = args
 
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(
+      BuiltInFunctionNames.toSqlOperator(functionName),
+      args.map(_.toRexNode))
+  }
+
   override def toString = s"\\$functionName(${args.mkString(", ")})"
 
   override def makeCopy(newArgs: Seq[AnyRef]): this.type = {
@@ -54,6 +67,26 @@ object BuiltInFunctionNames {
   val POWER = "POWER"
   val LN = "LN"
   val ABS = "ABS"
+
+  def toSqlOperator(name: String): SqlOperator = {
+    name match {
+      case BuiltInFunctionNames.SUBSTRING => SqlStdOperatorTable.SUBSTRING
+      case BuiltInFunctionNames.TRIM => SqlStdOperatorTable.TRIM
+      case BuiltInFunctionNames.CHAR_LENGTH => SqlStdOperatorTable.CHAR_LENGTH
+      case BuiltInFunctionNames.UPPER_CASE => SqlStdOperatorTable.UPPER
+      case BuiltInFunctionNames.LOWER_CASE => SqlStdOperatorTable.LOWER
+      case BuiltInFunctionNames.INIT_CAP => SqlStdOperatorTable.INITCAP
+      case BuiltInFunctionNames.LIKE => SqlStdOperatorTable.LIKE
+      case BuiltInFunctionNames.SIMILAR => SqlStdOperatorTable.SIMILAR_TO
+      case BuiltInFunctionNames.EXP => SqlStdOperatorTable.EXP
+      case BuiltInFunctionNames.LOG10 => SqlStdOperatorTable.LOG10
+      case BuiltInFunctionNames.POWER => SqlStdOperatorTable.POWER
+      case BuiltInFunctionNames.LN => SqlStdOperatorTable.LN
+      case BuiltInFunctionNames.ABS => SqlStdOperatorTable.ABS
+      case BuiltInFunctionNames.MOD => SqlStdOperatorTable.MOD
+      case _ => ???
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
index eb97d04..fdad1f6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
@@ -17,12 +17,20 @@
  */
 package org.apache.flink.api.table.expressions
 
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.typeutils.TypeConverter
 
 case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression {
 
   override def toString = s"$child.cast($tpe)"
 
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.cast(child.toRexNode, TypeConverter.typeInfoToSqlType(tpe))
+  }
+
   override def makeCopy(anyRefs: Seq[AnyRef]): this.type = {
     val child: Expression = anyRefs.head.asInstanceOf[Expression]
     copy(child, tpe).asInstanceOf[this.type]

http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
index d9e9198..124393c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
@@ -17,36 +17,69 @@
  */
 package org.apache.flink.api.table.expressions
 
-abstract class BinaryComparison extends BinaryExpression { self: Product => }
+import scala.collection.JavaConversions._
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+
+abstract class BinaryComparison extends BinaryExpression { self: Product =>
+  def sqlOperator: SqlOperator
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(sqlOperator, children.map(_.toRexNode))
+  }
+}
 
 case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
   override def toString = s"$left === $right"
+
+  val sqlOperator: SqlOperator = SqlStdOperatorTable.EQUALS
 }
 
 case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
   override def toString = s"$left !== $right"
+
+  val sqlOperator: SqlOperator = SqlStdOperatorTable.NOT_EQUALS
 }
 
 case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
   override def toString = s"$left > $right"
+
+  val sqlOperator: SqlOperator = SqlStdOperatorTable.GREATER_THAN
 }
 
 case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
   override def toString = s"$left >= $right"
+
+  val sqlOperator: SqlOperator = SqlStdOperatorTable.GREATER_THAN_OR_EQUAL
 }
 
 case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
   override def toString = s"$left < $right"
+
+  val sqlOperator: SqlOperator = SqlStdOperatorTable.LESS_THAN
 }
 
 case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
   override def toString = s"$left <= $right"
+
+  val sqlOperator: SqlOperator = SqlStdOperatorTable.LESS_THAN_OR_EQUAL
 }
 
 case class IsNull(child: Expression) extends UnaryExpression {
   override def toString = s"($child).isNull"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.isNull(child.toRexNode)
+  }
 }
 
 case class IsNotNull(child: Expression) extends UnaryExpression {
   override def toString = s"($child).isNotNull"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.isNotNull(child.toRexNode)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
index f3cb77e..82f7653 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
@@ -17,19 +17,28 @@
  */
 package org.apache.flink.api.table.expressions
 
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
 case class UnresolvedFieldReference(override val name: String) extends LeafExpression {
   override def toString = "\"" + name
-}
 
-case class ResolvedFieldReference(
-    override val name: String) extends LeafExpression {
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.field(name)
+  }
+}
 
+case class ResolvedFieldReference(override val name: String) extends LeafExpression {
   override def toString = s"'$name"
 }
 
 case class Naming(child: Expression, override val name: String) extends UnaryExpression {
   override def toString = s"$child as '$name"
 
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.alias(child.toRexNode, name)
+  }
+
   override def makeCopy(anyRefs: Seq[AnyRef]): this.type = {
     val child: Expression = anyRefs.head.asInstanceOf[Expression]
     copy(child, name).asInstanceOf[this.type]

http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
index 85956a2..efaa96d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
@@ -18,6 +18,9 @@
 package org.apache.flink.api.table.expressions
 
 import java.util.Date
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.scala.table.ImplicitExpressionOperations
 
@@ -41,4 +44,8 @@ case class Literal(value: Any, tpe: TypeInformation[_])
   def typeInfo = tpe
 
   override def toString = s"$value"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.literal(value)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
index 3f9b5c2..99da371 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
@@ -17,6 +17,9 @@
  */
 package org.apache.flink.api.table.expressions
 
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+
 abstract class BinaryPredicate extends BinaryExpression { self: Product => }
 
 case class Not(child: Expression) extends UnaryExpression {
@@ -24,17 +27,30 @@ case class Not(child: Expression) extends UnaryExpression {
   override val name = Expression.freshName("not-" + child.name)
 
   override def toString = s"!($child)"
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.not(child.toRexNode)
+  }
 }
 
 case class And(left: Expression, right: Expression) extends BinaryPredicate {
+
   override def toString = s"$left && $right"
 
   override val name = Expression.freshName(left.name + "-and-" + right.name)
+
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.and(left.toRexNode, right.toRexNode)
+  }
 }
 
 case class Or(left: Expression, right: Expression) extends BinaryPredicate {
+
   override def toString = s"$left || $right"
 
   override val name = Expression.freshName(left.name + "-or-" + right.name)
 
+  override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.or(left.toRexNode, right.toRexNode)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index b50b74b..926e023 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -18,17 +18,10 @@
 
 package org.apache.flink.api.table.plan
 
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.tools.RelBuilder.AggCall
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.typeutils.TypeConverter
 
-import scala.collection.JavaConversions._
+import org.apache.flink.api.table.expressions._
 
 object RexNodeTranslator {
 
@@ -42,11 +35,11 @@ object RexNodeTranslator {
     exp match {
       case agg: Aggregation =>
         val name = TranslationContext.getUniqueName
-        val aggCall = toAggCall(agg, name, relBuilder)
+        val aggCall = agg.toAggCall(name)(relBuilder)
         val fieldExp = new UnresolvedFieldReference(name)
         (fieldExp, List(aggCall))
       case n@Naming(agg: Aggregation, name) =>
-        val aggCall = toAggCall(agg, name, relBuilder)
+        val aggCall = agg.toAggCall(name)(relBuilder)
         val fieldExp = new UnresolvedFieldReference(name)
         (fieldExp, List(aggCall))
       case l: LeafExpression =>
@@ -69,153 +62,4 @@ object RexNodeTranslator {
           s"Expression $e of type ${e.getClass} not supported yet")
     }
   }
-
-  /**
-    * Translates a Table API expression into a Calcite RexNode.
-    */
-  def toRexNode(exp: Expression, relBuilder: RelBuilder): RexNode = {
-
-    exp match {
-      // Basic operators
-      case Literal(value, tpe) =>
-        relBuilder.literal(value)
-      case ResolvedFieldReference(name) =>
-        relBuilder.field(name)
-      case UnresolvedFieldReference(name) =>
-        relBuilder.field(name)
-      case NopExpression() =>
-        throw new IllegalArgumentException("NoOp expression encountered")
-      case Naming(child, name) =>
-        val c = toRexNode(child, relBuilder)
-        relBuilder.alias(c, name)
-      case Cast(child, tpe) =>
-        val c = toRexNode(child, relBuilder)
-        relBuilder.cast(c, TypeConverter.typeInfoToSqlType(tpe))
-      case Not(child) =>
-        val c = toRexNode(child, relBuilder)
-        relBuilder.not(c)
-      case Or(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        relBuilder.or(l, r)
-      case And(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        relBuilder.and(l, r)
-      case EqualTo(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        relBuilder.equals(l, r)
-      case NotEqualTo(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        relBuilder.not(relBuilder.equals(l, r))
-        relBuilder.call(SqlStdOperatorTable.NOT_EQUALS, l, r)
-      case LessThan(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.LESS_THAN, l, r)
-      case LessThanOrEqual(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, l, r)
-      case GreaterThan(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.GREATER_THAN, l, r)
-      case GreaterThanOrEqual(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, l, r)
-      case IsNull(child) =>
-        val c = toRexNode(child, relBuilder)
-        relBuilder.isNull(c)
-      case IsNotNull(child) =>
-        val c = toRexNode(child, relBuilder)
-        relBuilder.isNotNull(c)
-      case Plus(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        if(SqlTypeName.STRING_TYPES.contains(l.getType.getSqlTypeName)) {
-          val cast: RexNode = relBuilder.cast(r,
-            TypeConverter.typeInfoToSqlType(BasicTypeInfo.STRING_TYPE_INFO))
-          relBuilder.call(SqlStdOperatorTable.PLUS, l, cast)
-        } else if(SqlTypeName.STRING_TYPES.contains(r.getType.getSqlTypeName)) {
-          val cast: RexNode = relBuilder.cast(l,
-            TypeConverter.typeInfoToSqlType(BasicTypeInfo.STRING_TYPE_INFO))
-          relBuilder.call(SqlStdOperatorTable.PLUS, cast, r)
-        } else {
-          relBuilder.call(SqlStdOperatorTable.PLUS, l, r)
-        }
-      case Minus(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.MINUS, l, r)
-      case Mul(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.MULTIPLY, l, r)
-      case Div(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.DIVIDE, l, r)
-      case Mod(left, right) =>
-        val l = toRexNode(left, relBuilder)
-        val r = toRexNode(right, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.MOD, l, r)
-      case UnaryMinus(child) =>
-        val c = toRexNode(child, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.UNARY_MINUS, c)
-
-      // Scalar functions
-      case Call(name, args@_*) =>
-        val rexArgs = args.map(toRexNode(_, relBuilder))
-        val sqlOperator = toSqlOperator(name)
-        relBuilder.call(sqlOperator, rexArgs)
-
-      case a: Aggregation =>
-        throw new IllegalArgumentException(s"Aggregation expression $a not allowed at this place")
-      case e@AnyRef =>
-        throw new IllegalArgumentException(
-          s"Expression $e of type ${e.getClass} not supported yet")
-    }
-  }
-
-  private def toAggCall(agg: Aggregation, name: String, relBuilder: RelBuilder): AggCall = {
-
-    val rexNode = toRexNode(agg.child, relBuilder)
-    agg match {
-      case s: Sum => relBuilder.aggregateCall(
-        SqlStdOperatorTable.SUM, false, null, name, rexNode)
-      case m: Min => relBuilder.aggregateCall(
-        SqlStdOperatorTable.MIN, false, null, name, rexNode)
-      case m: Max => relBuilder.aggregateCall(
-        SqlStdOperatorTable.MAX, false, null, name, rexNode)
-      case c: Count => relBuilder.aggregateCall(
-        SqlStdOperatorTable.COUNT, false, null, name, rexNode)
-      case a: Avg => relBuilder.aggregateCall(
-        SqlStdOperatorTable.AVG, false, null, name, rexNode)
-    }
-  }
-
-  private def toSqlOperator(name: String): SqlOperator = {
-    name match {
-      case BuiltInFunctionNames.SUBSTRING => SqlStdOperatorTable.SUBSTRING
-      case BuiltInFunctionNames.TRIM => SqlStdOperatorTable.TRIM
-      case BuiltInFunctionNames.CHAR_LENGTH => SqlStdOperatorTable.CHAR_LENGTH
-      case BuiltInFunctionNames.UPPER_CASE => SqlStdOperatorTable.UPPER
-      case BuiltInFunctionNames.LOWER_CASE => SqlStdOperatorTable.LOWER
-      case BuiltInFunctionNames.INIT_CAP => SqlStdOperatorTable.INITCAP
-      case BuiltInFunctionNames.LIKE => SqlStdOperatorTable.LIKE
-      case BuiltInFunctionNames.SIMILAR => SqlStdOperatorTable.SIMILAR_TO
-      case BuiltInFunctionNames.EXP => SqlStdOperatorTable.EXP
-      case BuiltInFunctionNames.LOG10 => SqlStdOperatorTable.LOG10
-      case BuiltInFunctionNames.POWER => SqlStdOperatorTable.POWER
-      case BuiltInFunctionNames.LN => SqlStdOperatorTable.LN
-      case BuiltInFunctionNames.ABS => SqlStdOperatorTable.ABS
-      case BuiltInFunctionNames.MOD => SqlStdOperatorTable.MOD
-      case _ => ???
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index 53c3b4a..7b40c57 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -30,7 +30,7 @@ import org.apache.calcite.util.NlsString
 import org.apache.flink.api.java.io.DiscardingOutputFormat
 import org.apache.flink.api.table.explain.PlanJsonParser
 import org.apache.flink.api.table.plan.{PlanGenException, RexNodeTranslator}
-import RexNodeTranslator.{toRexNode, extractAggCalls}
+import RexNodeTranslator.extractAggCalls
 import org.apache.flink.api.table.expressions.{ExpressionParser, Naming, UnresolvedFieldReference, Expression}
 
 import org.apache.flink.api.scala._
@@ -96,8 +96,7 @@ class Table(
       .map(extractAggCalls(_, relBuilder)).toList
 
     // get aggregation calls
-    val aggCalls: List[AggCall] = extractedAggCalls
-      .map(_._2).reduce( (x,y) => x ::: y)
+    val aggCalls: List[AggCall] = extractedAggCalls.flatMap(_._2)
 
     // apply aggregations
     if (aggCalls.nonEmpty) {
@@ -106,9 +105,7 @@ class Table(
     }
 
     // get selection expressions
-    val exprs: List[RexNode] = extractedAggCalls
-      .map(_._1)
-      .map(toRexNode(_, relBuilder))
+    val exprs: List[RexNode] = extractedAggCalls.map(_._1.toRexNode(relBuilder))
 
     relBuilder.project(exprs.toIterable.asJava)
     val projected = relBuilder.build()
@@ -170,7 +167,7 @@ class Table(
 
     relBuilder.push(relNode)
 
-    val exprs = (renamings ++ remaining).map(toRexNode(_, relBuilder))
+    val exprs = (renamings ++ remaining).map(_.toRexNode(relBuilder))
 
     new Table(createRenamingProject(exprs), relBuilder)
   }
@@ -203,8 +200,7 @@ class Table(
   def filter(predicate: Expression): Table = {
 
     relBuilder.push(relNode)
-    val pred = toRexNode(predicate, relBuilder)
-    relBuilder.filter(pred)
+    relBuilder.filter(predicate.toRexNode(relBuilder))
     new Table(relBuilder.build(), relBuilder)
   }
 
@@ -264,7 +260,7 @@ class Table(
   def groupBy(fields: Expression*): GroupedTable = {
 
     relBuilder.push(relNode)
-    val groupExpr = fields.map(toRexNode(_, relBuilder)).toIterable.asJava
+    val groupExpr = fields.map(_.toRexNode(relBuilder)).toIterable.asJava
     val groupKey = relBuilder.groupKey(groupExpr)
 
     new GroupedTable(relBuilder.build(), relBuilder, groupKey)
@@ -450,19 +446,15 @@ class GroupedTable(
       .map(extractAggCalls(_, relBuilder)).toList
 
     // get aggregation calls
-    val aggCalls: List[AggCall] = extractedAggCalls
-      .map(_._2).reduce( (x,y) => x ::: y)
+    val aggCalls: List[AggCall] = extractedAggCalls.flatMap(_._2)
 
     // apply aggregations
     relBuilder.aggregate(groupKey, aggCalls.toIterable.asJava)
 
     // get selection expressions
     val exprs: List[RexNode] = try {
-      extractedAggCalls
-        .map(_._1)
-        .map(toRexNode(_, relBuilder))
-    }
-    catch {
+      extractedAggCalls.map(_._1.toRexNode(relBuilder))
+    } catch {
       case iae: IllegalArgumentException  =>
         throw new IllegalArgumentException(
           "Only grouping fields and aggregations allowed after groupBy.", iae)

http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 0741db8..abf2735 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -133,7 +133,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     t.collect()
   }
 
-  @Test(expected = classOf[IllegalArgumentException])
+  @Test(expected = classOf[UnsupportedOperationException])
   def testNoNestedAggregations(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/ba46ab6b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
index a52bbbd..48dea56 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.DataSet
 import org.apache.flink.api.table.TableConfig
 import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
 import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.plan.{RexNodeTranslator, TranslationContext}
+import org.apache.flink.api.table.plan.TranslationContext
 import org.apache.flink.api.table.plan.schema.DataSetTable
 import org.apache.flink.api.table.runtime.FunctionCompiler
 import org.mockito.Mockito._
@@ -78,7 +78,7 @@ object ExpressionEvaluator {
 
   def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): String = {
     val relBuilder = prepareTable(typeInfo)._2
-    evaluate(data, typeInfo, relBuilder, RexNodeTranslator.toRexNode(expr, relBuilder))
+    evaluate(data, typeInfo, relBuilder, expr.toRexNode(relBuilder))
   }
 
   def evaluate(