You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:52 UTC

[23/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
new file mode 100644
index 0000000..48dbce6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.avatica.util.DateTimeUtils.{MILLIS_PER_DAY, MILLIS_PER_HOUR, MILLIS_PER_MINUTE, MILLIS_PER_SECOND}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.api.ExpressionParserException
+import org.apache.flink.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval}
+import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit
+import org.apache.flink.table.expressions.TrimMode.TrimMode
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
+
+/**
+ * Parser for expressions inside a String. This parses exactly the same expressions that
+ * would be accepted by the Scala Expression DSL.
+ *
+ * See [[org.apache.flink.table.api.scala.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.table.api.scala.ImplicitExpressionOperations]] for the constructs
+ * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL
+ * lazy valined in the above files.
+ */
+object ExpressionParser extends JavaTokenParsers with PackratParsers {
+  case class Keyword(key: String)
+
+  // Convert the keyword into an case insensitive Parser
+  implicit def keyword2Parser(kw: Keyword): Parser[String] = {
+    ("""(?i)\Q""" + kw.key + """\E""").r
+  }
+
+  // Keyword
+
+  lazy val ARRAY: Keyword = Keyword("Array")
+  lazy val AS: Keyword = Keyword("as")
+  lazy val COUNT: Keyword = Keyword("count")
+  lazy val AVG: Keyword = Keyword("avg")
+  lazy val MIN: Keyword = Keyword("min")
+  lazy val MAX: Keyword = Keyword("max")
+  lazy val SUM: Keyword = Keyword("sum")
+  lazy val START: Keyword = Keyword("start")
+  lazy val END: Keyword = Keyword("end")
+  lazy val CAST: Keyword = Keyword("cast")
+  lazy val NULL: Keyword = Keyword("Null")
+  lazy val IF: Keyword = Keyword("?")
+  lazy val ASC: Keyword = Keyword("asc")
+  lazy val DESC: Keyword = Keyword("desc")
+  lazy val TO_DATE: Keyword = Keyword("toDate")
+  lazy val TO_TIME: Keyword = Keyword("toTime")
+  lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
+  lazy val TRIM: Keyword = Keyword("trim")
+  lazy val EXTRACT: Keyword = Keyword("extract")
+  lazy val FLOOR: Keyword = Keyword("floor")
+  lazy val CEIL: Keyword = Keyword("ceil")
+  lazy val YEARS: Keyword = Keyword("years")
+  lazy val YEAR: Keyword = Keyword("year")
+  lazy val MONTHS: Keyword = Keyword("months")
+  lazy val MONTH: Keyword = Keyword("month")
+  lazy val DAYS: Keyword = Keyword("days")
+  lazy val DAY: Keyword = Keyword("day")
+  lazy val HOURS: Keyword = Keyword("hours")
+  lazy val HOUR: Keyword = Keyword("hour")
+  lazy val MINUTES: Keyword = Keyword("minutes")
+  lazy val MINUTE: Keyword = Keyword("minute")
+  lazy val SECONDS: Keyword = Keyword("seconds")
+  lazy val SECOND: Keyword = Keyword("second")
+  lazy val MILLIS: Keyword = Keyword("millis")
+  lazy val MILLI: Keyword = Keyword("milli")
+  lazy val ROWS: Keyword = Keyword("rows")
+  lazy val STAR: Keyword = Keyword("*")
+  lazy val GET: Keyword = Keyword("get")
+  lazy val FLATTEN: Keyword = Keyword("flatten")
+
+  def functionIdent: ExpressionParser.Parser[String] =
+    not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
+      not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~
+      not(IF) ~> super.ident
+
+  // symbols
+
+  lazy val timeIntervalUnit: PackratParser[Expression] = TimeIntervalUnit.values map {
+    case unit: TimeIntervalUnit => literal(unit.toString) ^^^ unit.toExpr
+  } reduceLeft(_ | _)
+
+  lazy val timePointUnit: PackratParser[Expression] = TimePointUnit.values map {
+    case unit: TimePointUnit => literal(unit.toString) ^^^ unit.toExpr
+  } reduceLeft(_ | _)
+
+  lazy val trimMode: PackratParser[Expression] = TrimMode.values map {
+    case mode: TrimMode => literal(mode.toString) ^^^ mode.toExpr
+  } reduceLeft(_ | _)
+
+  // data types
+
+  lazy val dataType: PackratParser[TypeInformation[_]] =
+    "BYTE" ^^ { ti => BasicTypeInfo.BYTE_TYPE_INFO } |
+      "SHORT" ^^ { ti => BasicTypeInfo.SHORT_TYPE_INFO } |
+      "INTERVAL_MONTHS" ^^ {
+        ti => TimeIntervalTypeInfo.INTERVAL_MONTHS.asInstanceOf[TypeInformation[_]]
+      } |
+      "INTERVAL_MILLIS" ^^ {
+        ti => TimeIntervalTypeInfo.INTERVAL_MILLIS.asInstanceOf[TypeInformation[_]]
+      } |
+      "INT" ^^ { ti => BasicTypeInfo.INT_TYPE_INFO } |
+      "LONG" ^^ { ti => BasicTypeInfo.LONG_TYPE_INFO } |
+      "FLOAT" ^^ { ti => BasicTypeInfo.FLOAT_TYPE_INFO } |
+      "DOUBLE" ^^ { ti => BasicTypeInfo.DOUBLE_TYPE_INFO } |
+      ("BOOLEAN" | "BOOL") ^^ { ti => BasicTypeInfo.BOOLEAN_TYPE_INFO } |
+      "STRING" ^^ { ti => BasicTypeInfo.STRING_TYPE_INFO } |
+      "DATE" ^^ { ti => SqlTimeTypeInfo.DATE.asInstanceOf[TypeInformation[_]] } |
+      "TIMESTAMP" ^^ { ti => SqlTimeTypeInfo.TIMESTAMP } |
+      "TIME" ^^ { ti => SqlTimeTypeInfo.TIME } |
+      "DECIMAL" ^^ { ti => BasicTypeInfo.BIG_DEC_TYPE_INFO }
+
+  // Literals
+
+  // same as floatingPointNumber but we do not allow trailing dot "12.d" or "2."
+  lazy val floatingPointNumberFlink: Parser[String] =
+    """-?(\d+(\.\d+)?|\d*\.\d+)([eE][+-]?\d+)?[fFdD]?""".r
+
+  lazy val numberLiteral: PackratParser[Expression] =
+    (wholeNumber <~ ("l" | "L")) ^^ { n => Literal(n.toLong) } |
+      (decimalNumber <~ ("p" | "P")) ^^ { n => Literal(BigDecimal(n)) } |
+      (floatingPointNumberFlink | decimalNumber) ^^ {
+        n =>
+          if (n.matches("""-?\d+""")) {
+            Literal(n.toInt)
+          } else if (n.endsWith("f") || n.endsWith("F")) {
+            Literal(n.toFloat)
+          } else {
+            Literal(n.toDouble)
+          }
+      }
+
+  lazy val singleQuoteStringLiteral: Parser[Expression] =
+    ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ {
+      str => Literal(str.substring(1, str.length - 1))
+    }
+
+  lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ {
+    str => Literal(str.substring(1, str.length - 1))
+  }
+
+  lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ {
+    str => Literal(str.toBoolean)
+  }
+
+  lazy val nullLiteral: PackratParser[Expression] = NULL ~ "(" ~> dataType <~ ")" ^^ {
+    dt => Null(dt)
+  }
+
+  lazy val literalExpr: PackratParser[Expression] =
+    numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | boolLiteral | nullLiteral
+
+  lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ {
+    sym => UnresolvedFieldReference(sym)
+  }
+
+  lazy val atom: PackratParser[Expression] =
+    ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
+
+  // suffix operators
+
+  lazy val suffixSum: PackratParser[Expression] =
+    composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }
+
+  lazy val suffixMin: PackratParser[Expression] =
+    composite <~ "." ~ MIN ~ opt("()") ^^ { e => Min(e) }
+
+  lazy val suffixMax: PackratParser[Expression] =
+    composite <~ "." ~ MAX ~ opt("()") ^^ { e => Max(e) }
+
+  lazy val suffixCount: PackratParser[Expression] =
+    composite <~ "." ~ COUNT ~ opt("()") ^^ { e => Count(e) }
+
+  lazy val suffixAvg: PackratParser[Expression] =
+    composite <~ "." ~ AVG ~ opt("()") ^^ { e => Avg(e) }
+
+  lazy val suffixStart: PackratParser[Expression] =
+    composite <~ "." ~ START ~ opt("()") ^^ { e => WindowStart(e) }
+
+  lazy val suffixEnd: PackratParser[Expression] =
+    composite <~ "." ~ END ~ opt("()") ^^ { e => WindowEnd(e) }
+
+  lazy val suffixCast: PackratParser[Expression] =
+    composite ~ "." ~ CAST ~ "(" ~ dataType ~ ")" ^^ {
+    case e ~ _ ~ _ ~ _ ~ dt ~ _ => Cast(e, dt)
+  }
+
+  lazy val suffixAs: PackratParser[Expression] =
+    composite ~ "." ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+    case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
+  }
+
+  lazy val suffixTrim = composite ~ "." ~ TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ ")" ^^ {
+    case operand ~ _ ~ _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ => Trim(mode, trimCharacter, operand)
+  }
+
+  lazy val suffixTrimWithoutArgs = composite <~ "." ~ TRIM ~ opt("()") ^^ {
+    e => Trim(TrimMode.BOTH, TrimConstants.TRIM_DEFAULT_CHAR, e)
+  }
+
+  lazy val suffixIf: PackratParser[Expression] =
+    composite ~ "." ~ IF ~ "(" ~ expression ~ "," ~ expression ~ ")" ^^ {
+    case condition ~ _ ~ _ ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => If(condition, ifTrue, ifFalse)
+  }
+
+  lazy val suffixExtract = composite ~ "." ~ EXTRACT ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+    case operand ~ _  ~ _ ~ _ ~ unit ~ _ => Extract(unit, operand)
+  }
+
+  lazy val suffixFloor = composite ~ "." ~ FLOOR ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+    case operand ~ _  ~ _ ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
+  }
+
+  lazy val suffixCeil = composite ~ "." ~ CEIL ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+    case operand ~ _  ~ _ ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
+  }
+
+  lazy val suffixFunctionCall =
+    composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+    case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
+  }
+
+  lazy val suffixFunctionCallOneArg = composite ~ "." ~ functionIdent ^^ {
+    case operand ~ _ ~ name => Call(name.toUpperCase, Seq(operand))
+  }
+
+  lazy val suffixAsc : PackratParser[Expression] =
+    atom <~ "." ~ ASC ~ opt("()") ^^ { e => Asc(e) }
+
+  lazy val suffixDesc : PackratParser[Expression] =
+    atom <~ "." ~ DESC ~ opt("()") ^^ { e => Desc(e) }
+
+  lazy val suffixToDate: PackratParser[Expression] =
+    composite <~ "." ~ TO_DATE ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.DATE) }
+
+  lazy val suffixToTimestamp: PackratParser[Expression] =
+    composite <~ "." ~ TO_TIMESTAMP ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIMESTAMP) }
+
+  lazy val suffixToTime: PackratParser[Expression] =
+    composite <~ "." ~ TO_TIME ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIME) }
+
+  lazy val suffixTimeInterval : PackratParser[Expression] =
+    composite ~ "." ~ (YEARS | MONTHS | DAYS | HOURS | MINUTES | SECONDS | MILLIS |
+      YEAR | MONTH | DAY | HOUR | MINUTE | SECOND | MILLI) ^^ {
+
+    case expr ~ _ ~ (YEARS.key | YEAR.key) => toMonthInterval(expr, 12)
+
+    case expr ~ _ ~ (MONTHS.key | MONTH.key) => toMonthInterval(expr, 1)
+
+    case expr ~ _ ~ (DAYS.key | DAY.key) => toMilliInterval(expr, MILLIS_PER_DAY)
+
+    case expr ~ _ ~ (HOURS.key | HOUR.key) => toMilliInterval(expr, MILLIS_PER_HOUR)
+
+    case expr ~ _ ~ (MINUTES.key | MINUTE.key) => toMilliInterval(expr, MILLIS_PER_MINUTE)
+
+    case expr ~ _ ~ (SECONDS.key | SECOND.key) => toMilliInterval(expr, MILLIS_PER_SECOND)
+
+    case expr ~ _ ~ (MILLIS.key | MILLI.key)=> toMilliInterval(expr, 1)
+  }
+
+  lazy val suffixRowInterval : PackratParser[Expression] =
+    composite <~ "." ~ ROWS ^^ { e => ExpressionUtils.toRowInterval(e) }
+
+  lazy val suffixGet: PackratParser[Expression] =
+    composite ~ "." ~ GET ~ "(" ~ literalExpr ~ ")" ^^ {
+      case e ~ _ ~ _ ~ _ ~ index ~ _ =>
+        GetCompositeField(e, index.asInstanceOf[Literal].value)
+  }
+
+  lazy val suffixFlattening: PackratParser[Expression] =
+    composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e => Flattening(e) }
+
+  lazy val suffixed: PackratParser[Expression] =
+    suffixTimeInterval | suffixRowInterval | suffixSum | suffixMin | suffixMax | suffixStart |
+      suffixEnd | suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim |
+      suffixTrimWithoutArgs | suffixIf | suffixAsc | suffixDesc | suffixToDate |
+      suffixToTimestamp | suffixToTime | suffixExtract | suffixFloor | suffixCeil |
+      suffixGet | suffixFlattening |
+      suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end
+
+  // prefix operators
+
+  lazy val prefixArray: PackratParser[Expression] =
+    ARRAY ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { elements => ArrayConstructor(elements) }
+
+  lazy val prefixSum: PackratParser[Expression] =
+    SUM ~ "(" ~> expression <~ ")" ^^ { e => Sum(e) }
+
+  lazy val prefixMin: PackratParser[Expression] =
+    MIN ~ "(" ~> expression <~ ")" ^^ { e => Min(e) }
+
+  lazy val prefixMax: PackratParser[Expression] =
+    MAX ~ "(" ~> expression <~ ")" ^^ { e => Max(e) }
+
+  lazy val prefixCount: PackratParser[Expression] =
+    COUNT ~ "(" ~> expression <~ ")" ^^ { e => Count(e) }
+
+  lazy val prefixAvg: PackratParser[Expression] =
+    AVG ~ "(" ~> expression <~ ")" ^^ { e => Avg(e) }
+
+  lazy val prefixStart: PackratParser[Expression] =
+    START ~ "(" ~> expression <~ ")" ^^ { e => WindowStart(e) }
+
+  lazy val prefixEnd: PackratParser[Expression] =
+    END ~ "(" ~> expression <~ ")" ^^ { e => WindowEnd(e) }
+
+  lazy val prefixCast: PackratParser[Expression] =
+    CAST ~ "(" ~ expression ~ "," ~ dataType ~ ")" ^^ {
+    case _ ~ _ ~ e ~ _ ~ dt ~ _ => Cast(e, dt)
+  }
+
+  lazy val prefixAs: PackratParser[Expression] =
+    AS ~ "(" ~ expression ~ "," ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+    case _ ~ _ ~ e ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
+  }
+
+  lazy val prefixIf: PackratParser[Expression] =
+      IF ~ "(" ~ expression ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+    case _ ~ _ ~ condition ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => If(condition, ifTrue, ifFalse)
+  }
+
+  lazy val prefixFunctionCall = functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+    case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args)
+  }
+
+  lazy val prefixFunctionCallOneArg = functionIdent ~ "(" ~ expression ~ ")" ^^ {
+    case name ~ _ ~ arg ~ _ => Call(name.toUpperCase, Seq(arg))
+  }
+
+  lazy val prefixTrim = TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+    case _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ ~ operand ~ _ => Trim(mode, trimCharacter, operand)
+  }
+
+  lazy val prefixTrimWithoutArgs = TRIM ~ "(" ~ expression ~ ")" ^^ {
+    case _ ~ _ ~ operand ~ _ => Trim(TrimMode.BOTH, TrimConstants.TRIM_DEFAULT_CHAR, operand)
+  }
+
+  lazy val prefixExtract = EXTRACT ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+    case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand)
+  }
+
+  lazy val prefixFloor = FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+    case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
+  }
+
+  lazy val prefixCeil = CEIL ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+    case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
+  }
+
+  lazy val prefixGet: PackratParser[Expression] =
+    GET ~ "(" ~ composite ~ ","  ~ literalExpr ~ ")" ^^ {
+      case _ ~ _ ~ e ~ _ ~ index ~ _ =>
+        GetCompositeField(e, index.asInstanceOf[Literal].value)
+  }
+
+  lazy val prefixFlattening: PackratParser[Expression] =
+    FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) }
+
+  lazy val prefixed: PackratParser[Expression] =
+    prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
+      prefixStart | prefixEnd |
+      prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract |
+      prefixFloor | prefixCeil | prefixGet | prefixFlattening |
+      prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end
+
+  // suffix/prefix composite
+
+  lazy val composite: PackratParser[Expression] = suffixed | prefixed | atom |
+    failure("Composite expression expected.")
+
+  // unary ops
+
+  lazy val unaryNot: PackratParser[Expression] = "!" ~> composite ^^ { e => Not(e) }
+
+  lazy val unaryMinus: PackratParser[Expression] = "-" ~> composite ^^ { e => UnaryMinus(e) }
+
+  lazy val unary = composite | unaryNot | unaryMinus |
+    failure("Unary expression expected.")
+
+  // arithmetic
+
+  lazy val product = unary * (
+    "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
+    "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
+    "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } ) |
+    failure("Product expected.")
+
+  lazy val term = product * (
+    "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
+    "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } ) |
+    failure("Term expected.")
+
+  // Comparison
+
+  lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "==" | "=") ~ term ^^ {
+    case l ~ _ ~ r => EqualTo(l, r)
+  }
+
+  lazy val notEqualTo: PackratParser[Expression] = term ~ ("!==" | "!=" | "<>") ~ term ^^ {
+    case l ~ _ ~ r => NotEqualTo(l, r)
+  }
+
+  lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
+    case l ~ _ ~ r => GreaterThan(l, r)
+  }
+
+  lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ {
+    case l ~ _ ~ r => GreaterThanOrEqual(l, r)
+  }
+
+  lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
+    case l ~ _ ~ r => LessThan(l, r)
+  }
+
+  lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
+    case l ~ _ ~ r => LessThanOrEqual(l, r)
+  }
+
+  lazy val comparison: PackratParser[Expression] =
+    equalTo | notEqualTo |
+    greaterThan | greaterThanOrEqual |
+    lessThan | lessThanOrEqual | term |
+    failure("Comparison expected.")
+
+  // logic
+
+  lazy val logic = comparison * (
+    "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
+    "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } ) |
+    failure("Logic expected.")
+
+  // alias
+
+  lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
+      case e ~ _ ~ name => Alias(e, name.name)
+  } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+    case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.tail.map(_.name))
+  } | logic
+
+  lazy val expression: PackratParser[Expression] = alias |
+    failure("Invalid expression.")
+
+  lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
+
+  def parseExpressionList(expression: String): List[Expression] = {
+    parseAll(expressionList, expression) match {
+      case Success(lst, _) => lst
+
+      case NoSuccess(msg, next) =>
+        throwError(msg, next)
+    }
+  }
+
+  def parseExpression(exprString: String): Expression = {
+    parseAll(expression, exprString) match {
+      case Success(lst, _) => lst
+
+      case NoSuccess(msg, next) =>
+        throwError(msg, next)
+    }
+  }
+
+  private def throwError(msg: String, next: Input): Nothing = {
+    val improvedMsg = msg.replace("string matching regex `\\z'", "End of expression")
+
+    throw ExpressionParserException(
+      s"""Could not parse expression at column ${next.pos.column}: $improvedMsg
+        |${next.pos.longString}""".stripMargin)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
new file mode 100644
index 0000000..4b5781f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short => JShort}
+import java.math.{BigDecimal => JBigDecimal}
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+
+object ExpressionUtils {
+
+  private[flink] def toMonthInterval(expr: Expression, multiplier: Int): Expression = expr match {
+    case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
+      Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MONTHS)
+    case _ =>
+      Cast(Mul(expr, Literal(multiplier)), TimeIntervalTypeInfo.INTERVAL_MONTHS)
+  }
+
+  private[flink] def toMilliInterval(expr: Expression, multiplier: Long): Expression = expr match {
+    case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
+      Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MILLIS)
+    case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) =>
+      Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MILLIS)
+    case _ =>
+      Cast(Mul(expr, Literal(multiplier)), TimeIntervalTypeInfo.INTERVAL_MILLIS)
+  }
+
+  private[flink] def toRowInterval(expr: Expression): Expression = expr match {
+    case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
+      Literal(value.toLong, RowIntervalTypeInfo.INTERVAL_ROWS)
+    case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) =>
+      Literal(value, RowIntervalTypeInfo.INTERVAL_ROWS)
+    case _ =>
+      throw new IllegalArgumentException("Invalid value for row interval literal.")
+  }
+
+  private[flink] def convertArray(array: Array[_]): Expression = {
+    def createArray(): Expression = {
+      ArrayConstructor(array.map(Literal(_)))
+    }
+
+    array match {
+      // primitives
+      case _: Array[Boolean] => createArray()
+      case _: Array[Byte] => createArray()
+      case _: Array[Short] => createArray()
+      case _: Array[Int] => createArray()
+      case _: Array[Long] => createArray()
+      case _: Array[Float] => createArray()
+      case _: Array[Double] => createArray()
+
+      // boxed types
+      case _: Array[JBoolean] => createArray()
+      case _: Array[JByte] => createArray()
+      case _: Array[JShort] => createArray()
+      case _: Array[JInteger] => createArray()
+      case _: Array[JLong] => createArray()
+      case _: Array[JFloat] => createArray()
+      case _: Array[JDouble] => createArray()
+
+      // others
+      case _: Array[String] => createArray()
+      case _: Array[JBigDecimal] => createArray()
+      case _: Array[Date] => createArray()
+      case _: Array[Time] => createArray()
+      case _: Array[Timestamp] => createArray()
+      case bda: Array[BigDecimal] => ArrayConstructor(bda.map { bd => Literal(bd.bigDecimal) })
+
+      case _ =>
+        // nested
+        if (array.length > 0 && array.head.isInstanceOf[Array[_]]) {
+          ArrayConstructor(array.map { na => convertArray(na.asInstanceOf[Array[_]]) })
+        } else {
+          throw ValidationException("Unsupported array type.")
+        }
+    }
+  }
+
+  // ----------------------------------------------------------------------------------------------
+  // RexNode conversion functions (see org.apache.calcite.sql2rel.StandardConvertletTable)
+  // ----------------------------------------------------------------------------------------------
+
+  /**
+    * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#getFactor()]].
+    */
+  private[flink] def getFactor(unit: TimeUnit): JBigDecimal = unit match {
+    case TimeUnit.DAY => java.math.BigDecimal.ONE
+    case TimeUnit.HOUR => TimeUnit.DAY.multiplier
+    case TimeUnit.MINUTE => TimeUnit.HOUR.multiplier
+    case TimeUnit.SECOND => TimeUnit.MINUTE.multiplier
+    case TimeUnit.YEAR => java.math.BigDecimal.ONE
+    case TimeUnit.MONTH => TimeUnit.YEAR.multiplier
+    case _ => throw new IllegalArgumentException("Invalid start unit.")
+  }
+
+  /**
+    * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#mod()]].
+    */
+  private[flink] def mod(
+      rexBuilder: RexBuilder,
+      resType: RelDataType,
+      res: RexNode,
+      value: JBigDecimal)
+    : RexNode = {
+    if (value == JBigDecimal.ONE) return res
+    rexBuilder.makeCall(SqlStdOperatorTable.MOD, res, rexBuilder.makeExactLiteral(value, resType))
+  }
+
+  /**
+    * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#divide()]].
+    */
+  private[flink] def divide(rexBuilder: RexBuilder, res: RexNode, value: JBigDecimal): RexNode = {
+    if (value == JBigDecimal.ONE) return res
+    if (value.compareTo(JBigDecimal.ONE) < 0 && value.signum == 1) {
+      try {
+        val reciprocal = JBigDecimal.ONE.divide(value, JBigDecimal.ROUND_UNNECESSARY)
+        return rexBuilder.makeCall(
+          SqlStdOperatorTable.MULTIPLY,
+          res,
+          rexBuilder.makeExactLiteral(reciprocal))
+      } catch {
+        case e: ArithmeticException => // ignore
+      }
+    }
+    rexBuilder.makeCall(
+      SqlStdOperatorTable.DIVIDE_INTEGER,
+      res,
+      rexBuilder.makeExactLiteral(value))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
new file mode 100644
index 0000000..39e1fe2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import scala.collection.mutable
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.validate._
+
+/**
+  * Expressions that have specification on its inputs.
+  */
+trait InputTypeSpec extends Expression {
+
+  /**
+    * Input type specification for each child.
+    *
+    * For example, [[Power]] expecting both of the children be of Double Type should use:
+    * {{{
+    *   def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil
+    * }}}
+    */
+  private[flink] def expectedTypes: Seq[TypeInformation[_]]
+
+  override private[flink] def validateInput(): ValidationResult = {
+    val typeMismatches = mutable.ArrayBuffer.empty[String]
+    children.zip(expectedTypes).zipWithIndex.foreach { case ((e, tpe), i) =>
+      if (e.resultType != tpe) {
+        typeMismatches += s"expecting $tpe on ${i}th input, get ${e.resultType}"
+      }
+    }
+    if (typeMismatches.isEmpty) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(
+        s"""|$this fails on input type checking: ${typeMismatches.mkString("[", ", ", "]")}.
+            |Operand should be casted to proper type
+            |""".stripMargin)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
new file mode 100644
index 0000000..b2fca88
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.tools.RelBuilder.AggCall
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TypeCheckUtils
+
+abstract sealed class Aggregation extends UnaryExpression {
+
+  override def toString = s"Aggregate($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+    throw new UnsupportedOperationException("Aggregate cannot be transformed to RexNode")
+
+  /**
+    * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
+    */
+  private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall
+}
+
+case class Sum(child: Expression) extends Aggregation {
+  override def toString = s"sum($child)"
+
+  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, null, name, child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "sum")
+}
+
+case class Min(child: Expression) extends Aggregation {
+  override def toString = s"min($child)"
+
+  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, null, name, child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeCheckUtils.assertOrderableExpr(child.resultType, "min")
+}
+
+case class Max(child: Expression) extends Aggregation {
+  override def toString = s"max($child)"
+
+  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, null, name, child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeCheckUtils.assertOrderableExpr(child.resultType, "max")
+}
+
+case class Count(child: Expression) extends Aggregation {
+  override def toString = s"count($child)"
+
+  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, name, child.toRexNode)
+  }
+
+  override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
+}
+
+case class Avg(child: Expression) extends Aggregation {
+  override def toString = s"avg($child)"
+
+  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+    relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, null, name, child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput() =
+    TypeCheckUtils.assertNumericExpr(child.resultType, "avg")
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
new file mode 100644
index 0000000..ad1af63
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.IntervalSqlType
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.typeutils.TypeCoercion
+import org.apache.flink.table.validate._
+
+import scala.collection.JavaConversions._
+
+abstract class BinaryArithmetic extends BinaryExpression {
+  private[flink] def sqlOperator: SqlOperator
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(sqlOperator, children.map(_.toRexNode))
+  }
+
+  override private[flink] def resultType: TypeInformation[_] =
+    TypeCoercion.widerTypeOf(left.resultType, right.resultType) match {
+      case Some(t) => t
+      case None =>
+        throw new RuntimeException("This should never happen.")
+    }
+
+  // TODO: tighten this rule once we implemented type coercion rules during validation
+  override private[flink] def validateInput(): ValidationResult = {
+    if (!isNumeric(left.resultType) || !isNumeric(right.resultType)) {
+      ValidationFailure(s"$this requires both operands Numeric, get " +
+        s"$left : ${left.resultType} and $right : ${right.resultType}")
+    } else {
+      ValidationSuccess
+    }
+  }
+}
+
+case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left + $right)"
+
+  private[flink] val sqlOperator = SqlStdOperatorTable.PLUS
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    if(isString(left.resultType)) {
+      val castedRight = Cast(right, BasicTypeInfo.STRING_TYPE_INFO)
+      relBuilder.call(SqlStdOperatorTable.CONCAT, left.toRexNode, castedRight.toRexNode)
+    } else if(isString(right.resultType)) {
+      val castedLeft = Cast(left, BasicTypeInfo.STRING_TYPE_INFO)
+      relBuilder.call(SqlStdOperatorTable.CONCAT, castedLeft.toRexNode, right.toRexNode)
+    } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
+      relBuilder.call(SqlStdOperatorTable.PLUS, left.toRexNode, right.toRexNode)
+    } else if (isTimeInterval(left.resultType) && isTemporal(right.resultType)) {
+      // Calcite has a bug that can't apply INTERVAL + DATETIME (INTERVAL at left)
+      // we manually switch them here
+      relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, right.toRexNode, left.toRexNode)
+    } else if (isTemporal(left.resultType) && isTemporal(right.resultType)) {
+      relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, left.toRexNode, right.toRexNode)
+    } else {
+      val castedLeft = Cast(left, resultType)
+      val castedRight = Cast(right, resultType)
+      relBuilder.call(SqlStdOperatorTable.PLUS, castedLeft.toRexNode, castedRight.toRexNode)
+    }
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (isString(left.resultType) || isString(right.resultType)) {
+      ValidationSuccess
+    } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
+      ValidationSuccess
+    } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) {
+      ValidationSuccess
+    } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
+      ValidationSuccess
+    } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(
+        s"$this requires Numeric, String, Intervals of same type, " +
+        s"or Interval and a time point input, " +
+        s"get $left : ${left.resultType} and $right : ${right.resultType}")
+    }
+  }
+}
+
+case class UnaryMinus(child: Expression) extends UnaryExpression {
+  override def toString = s"-($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.UNARY_MINUS, child.toRexNode)
+  }
+
+  override private[flink] def resultType = child.resultType
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (isNumeric(child.resultType)) {
+      ValidationSuccess
+    } else if (isTimeInterval(child.resultType)) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"$this requires Numeric, or Interval input, get ${child.resultType}")
+    }
+  }
+}
+
+case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left - $right)"
+
+  private[flink] val sqlOperator = SqlStdOperatorTable.MINUS
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
+      ValidationSuccess
+    } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) {
+      ValidationSuccess
+    } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
+      ValidationSuccess
+    } else {
+      super.validateInput()
+    }
+  }
+}
+
+case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left / $right)"
+
+  private[flink] val sqlOperator = SqlStdOperatorTable.DIVIDE
+}
+
+case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left * $right)"
+
+  private[flink] val sqlOperator = SqlStdOperatorTable.MULTIPLY
+}
+
+case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
+  override def toString = s"($left % $right)"
+
+  private[flink] val sqlOperator = SqlStdOperatorTable.MOD
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
new file mode 100644
index 0000000..b087b61
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = elements
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val relDataType = relBuilder
+      .asInstanceOf[FlinkRelBuilder]
+      .getTypeFactory
+      .createTypeFromTypeInfo(resultType)
+    val values = elements.map(_.toRexNode).toList.asJava
+    relBuilder
+      .getRexBuilder
+      .makeCall(relDataType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, values)
+  }
+
+  override def toString = s"array(${elements.mkString(", ")})"
+
+  override private[flink] def resultType = ObjectArrayTypeInfo.getInfoFor(elements.head.resultType)
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (elements.isEmpty) {
+      return ValidationFailure("Empty arrays are not supported yet.")
+    }
+    val elementType = elements.head.resultType
+    if (!elements.forall(_.resultType == elementType)) {
+      ValidationFailure("Not all elements of the array have the same type.")
+    } else {
+      ValidationSuccess
+    }
+  }
+}
+
+case class ArrayElementAt(array: Expression, index: Expression) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = Seq(array, index)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeCall(SqlStdOperatorTable.ITEM, array.toRexNode, index.toRexNode)
+  }
+
+  override def toString = s"($array).at($index)"
+
+  override private[flink] def resultType = array.resultType match {
+    case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+    case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    array.resultType match {
+      case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] =>
+        if (index.resultType == INT_TYPE_INFO) {
+          // check for common user mistake
+          index match {
+            case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
+              ValidationFailure(
+                s"Array element access needs an index starting at 1 but was $value.")
+            case _ => ValidationSuccess
+          }
+        } else {
+          ValidationFailure(
+            s"Array element access needs an integer index but was '${index.resultType}'.")
+        }
+      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
+    }
+  }
+}
+
+case class ArrayCardinality(array: Expression) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = Seq(array)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeCall(SqlStdOperatorTable.CARDINALITY, array.toRexNode)
+  }
+
+  override def toString = s"($array).cardinality()"
+
+  override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    array.resultType match {
+      case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => ValidationSuccess
+      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
+    }
+  }
+}
+
+case class ArrayElement(array: Expression) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = Seq(array)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeCall(SqlStdOperatorTable.ELEMENT, array.toRexNode)
+  }
+
+  override def toString = s"($array).element()"
+
+  override private[flink] def resultType = array.resultType match {
+    case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+    case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    array.resultType match {
+      case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => ValidationSuccess
+      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
new file mode 100644
index 0000000..ef2cf4e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.plan.logical.{LogicalNode, LogicalTableFunctionCall}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+import org.apache.flink.table.api.{UnresolvedException, ValidationException}
+
+/**
+  * General expression for unresolved function calls. The function can be a built-in
+  * scalar function or a user-defined scalar function.
+  */
+case class Call(functionName: String, args: Seq[Expression]) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = args
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    throw UnresolvedException(s"trying to convert UnresolvedFunction $functionName to RexNode")
+  }
+
+  override def toString = s"\\$functionName(${args.mkString(", ")})"
+
+  override private[flink] def resultType =
+    throw UnresolvedException(s"calling resultType on UnresolvedFunction $functionName")
+
+  override private[flink] def validateInput(): ValidationResult =
+    ValidationFailure(s"Unresolved function call: $functionName")
+}
+
+/**
+  * Expression for calling a user-defined scalar functions.
+  *
+  * @param scalarFunction scalar function to be called (might be overloaded)
+  * @param parameters actual parameters that determine target evaluation method
+  */
+case class ScalarFunctionCall(
+    scalarFunction: ScalarFunction,
+    parameters: Seq[Expression])
+  extends Expression {
+
+  private var foundSignature: Option[Array[Class[_]]] = None
+
+  override private[flink] def children: Seq[Expression] = parameters
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    relBuilder.call(
+      createScalarSqlFunction(
+        scalarFunction.getClass.getCanonicalName,
+        scalarFunction,
+        typeFactory),
+      parameters.map(_.toRexNode): _*)
+  }
+
+  override def toString =
+    s"${scalarFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
+
+  override private[flink] def resultType = getResultType(scalarFunction, foundSignature.get)
+
+  override private[flink] def validateInput(): ValidationResult = {
+    val signature = children.map(_.resultType)
+    // look for a signature that matches the input types
+    foundSignature = getSignature(scalarFunction, signature)
+    if (foundSignature.isEmpty) {
+      ValidationFailure(s"Given parameters do not match any signature. \n" +
+        s"Actual: ${signatureToString(signature)} \n" +
+        s"Expected: ${signaturesToString(scalarFunction)}")
+    } else {
+      ValidationSuccess
+    }
+  }
+}
+
+/**
+  *
+  * Expression for calling a user-defined table function with actual parameters.
+  *
+  * @param functionName function name
+  * @param tableFunction user-defined table function
+  * @param parameters actual parameters of function
+  * @param resultType type information of returned table
+  */
+case class TableFunctionCall(
+    functionName: String,
+    tableFunction: TableFunction[_],
+    parameters: Seq[Expression],
+    resultType: TypeInformation[_])
+  extends Expression {
+
+  private var aliases: Option[Seq[String]] = None
+
+  override private[flink] def children: Seq[Expression] = parameters
+
+  /**
+    * Assigns an alias for this table function's returned fields that the following operator
+    * can refer to.
+    *
+    * @param aliasList alias for this table function's returned fields
+    * @return this table function call
+    */
+  private[flink] def as(aliasList: Option[Seq[String]]): TableFunctionCall = {
+    this.aliases = aliasList
+    this
+  }
+
+  /**
+    * Converts an API class to a logical node for planning.
+    */
+  private[flink] def toLogicalTableFunctionCall(child: LogicalNode): LogicalTableFunctionCall = {
+    val originNames = getFieldInfo(resultType)._1
+
+    // determine the final field names
+    val fieldNames = if (aliases.isDefined) {
+      val aliasList = aliases.get
+      if (aliasList.length != originNames.length) {
+        throw ValidationException(
+          s"List of column aliases must have same degree as table; " +
+            s"the returned table of function '$functionName' has ${originNames.length} " +
+            s"columns (${originNames.mkString(",")}), " +
+            s"whereas alias list has ${aliasList.length} columns")
+      } else {
+        aliasList.toArray
+      }
+    } else {
+      originNames
+    }
+
+    LogicalTableFunctionCall(
+      functionName,
+      tableFunction,
+      parameters,
+      resultType,
+      fieldNames,
+      child)
+  }
+
+  override def toString =
+    s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
new file mode 100644
index 0000000..312bf12
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.typeutils.TypeCoercion
+import org.apache.flink.table.validate._
+
+case class Cast(child: Expression, resultType: TypeInformation[_]) extends UnaryExpression {
+
+  override def toString = s"$child.cast($resultType)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    relBuilder
+      .getRexBuilder
+      // we use abstract cast here because RelBuilder.cast() has to many side effects
+      .makeAbstractCast(
+        typeFactory.createTypeFromTypeInfo(resultType),
+        child.toRexNode)
+  }
+
+  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+    val child: Expression = anyRefs.head.asInstanceOf[Expression]
+    copy(child, resultType).asInstanceOf[this.type]
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (TypeCoercion.canCast(child.resultType, resultType)) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Unsupported cast from ${child.resultType} to $resultType")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
new file mode 100644
index 0000000..0c7e57c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isComparable, isNumeric}
+import org.apache.flink.table.validate._
+
+import scala.collection.JavaConversions._
+
+abstract class BinaryComparison extends BinaryExpression {
+  private[flink] def sqlOperator: SqlOperator
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(sqlOperator, children.map(_.toRexNode))
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult =
+    (left.resultType, right.resultType) match {
+      case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+      case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess
+      case (lType, rType) =>
+        ValidationFailure(
+          s"Comparison is only supported for numeric types and " +
+            s"comparable types of same type, got $lType and $rType")
+    }
+}
+
+case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left === $right"
+
+  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.EQUALS
+
+  override private[flink] def validateInput(): ValidationResult =
+    (left.resultType, right.resultType) match {
+      case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+      case (lType, rType) if lType == rType => ValidationSuccess
+      case (lType, rType) =>
+        ValidationFailure(s"Equality predicate on incompatible types: $lType and $rType")
+    }
+}
+
+case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left !== $right"
+
+  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.NOT_EQUALS
+
+  override private[flink] def validateInput(): ValidationResult =
+    (left.resultType, right.resultType) match {
+      case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+      case (lType, rType) if lType == rType => ValidationSuccess
+      case (lType, rType) =>
+        ValidationFailure(s"Inequality predicate on incompatible types: $lType and $rType")
+    }
+}
+
+case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left > $right"
+
+  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.GREATER_THAN
+}
+
+case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left >= $right"
+
+  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.GREATER_THAN_OR_EQUAL
+}
+
+case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left < $right"
+
+  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.LESS_THAN
+}
+
+case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+  override def toString = s"$left <= $right"
+
+  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.LESS_THAN_OR_EQUAL
+}
+
+case class IsNull(child: Expression) extends UnaryExpression {
+  override def toString = s"($child).isNull"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.isNull(child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsNotNull(child: Expression) extends UnaryExpression {
+  override def toString = s"($child).isNotNull"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.isNotNull(child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsTrue(child: Expression) extends UnaryExpression {
+  override def toString = s"($child).isTrue"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.IS_TRUE, child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsFalse(child: Expression) extends UnaryExpression {
+  override def toString = s"($child).isFalse"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.IS_FALSE, child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsNotTrue(child: Expression) extends UnaryExpression {
+  override def toString = s"($child).isNotTrue"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.IS_NOT_TRUE, child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsNotFalse(child: Expression) extends UnaryExpression {
+  override def toString = s"($child).isNotFalse"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.call(SqlStdOperatorTable.IS_NOT_FALSE, child.toRexNode)
+  }
+
+  override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/composite.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/composite.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/composite.scala
new file mode 100644
index 0000000..2f3fdb1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/composite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.UnresolvedException
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+/**
+  * Flattening of composite types. All flattenings are resolved into
+  * `GetCompositeField` expressions.
+  */
+case class Flattening(child: Expression) extends UnaryExpression {
+
+  override def toString = s"$child.flatten()"
+
+  override private[flink] def resultType: TypeInformation[_] =
+    throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
+
+  override private[flink] def validateInput(): ValidationResult =
+    ValidationFailure(s"Unresolved flattening of $child")
+}
+
+case class GetCompositeField(child: Expression, key: Any) extends UnaryExpression {
+
+  private var fieldIndex: Option[Int] = None
+
+  override def toString = s"$child.get($key)"
+
+  override private[flink] def validateInput(): ValidationResult = {
+    // check for composite type
+    if (!child.resultType.isInstanceOf[CompositeType[_]]) {
+      return ValidationFailure(s"Cannot access field of non-composite type '${child.resultType}'.")
+    }
+    val compositeType = child.resultType.asInstanceOf[CompositeType[_]]
+
+    // check key
+    key match {
+      case name: String =>
+        val index = compositeType.getFieldIndex(name)
+        if (index < 0) {
+          ValidationFailure(s"Field name '$name' could not be found.")
+        } else {
+          fieldIndex = Some(index)
+          ValidationSuccess
+        }
+      case index: Int =>
+        if (index >= compositeType.getArity) {
+          ValidationFailure(s"Field index '$index' exceeds arity.")
+        } else {
+          fieldIndex = Some(index)
+          ValidationSuccess
+        }
+      case _ =>
+        ValidationFailure(s"Invalid key '$key'.")
+    }
+  }
+
+  override private[flink] def resultType: TypeInformation[_] =
+    child.resultType.asInstanceOf[CompositeType[_]].getTypeAt(fieldIndex.get)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeFieldAccess(child.toRexNode, fieldIndex.get)
+  }
+
+  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+    val child: Expression = anyRefs.head.asInstanceOf[Expression]
+    copy(child, key).asInstanceOf[this.type]
+  }
+
+  /**
+    * Gives a meaningful alias if possible (e.g. a$mypojo$field).
+    */
+  private[flink] def aliasName(): Option[String] = child match {
+    case gcf: GetCompositeField =>
+      val alias = gcf.aliasName()
+      if (alias.isDefined) {
+        Some(s"${alias.get}$$$key")
+      } else {
+        None
+      }
+    case c: ResolvedFieldReference => Some(s"${c.name}$$$key")
+    case _ => None
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
new file mode 100644
index 0000000..299a850
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.api.{UnresolvedException, ValidationException}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+trait NamedExpression extends Expression {
+  private[flink] def name: String
+  private[flink] def toAttribute: Attribute
+}
+
+abstract class Attribute extends LeafExpression with NamedExpression {
+  override private[flink] def toAttribute: Attribute = this
+
+  private[flink] def withName(newName: String): Attribute
+}
+
+case class UnresolvedFieldReference(name: String) extends Attribute {
+
+  override def toString = "\"" + name
+
+  override private[flink] def withName(newName: String): Attribute =
+    UnresolvedFieldReference(newName)
+
+  override private[flink] def resultType: TypeInformation[_] =
+    throw UnresolvedException(s"Calling resultType on ${this.getClass}.")
+
+  override private[flink] def validateInput(): ValidationResult =
+    ValidationFailure(s"Unresolved reference $name.")
+}
+
+case class ResolvedFieldReference(
+    name: String,
+    resultType: TypeInformation[_]) extends Attribute {
+
+  override def toString = s"'$name"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.field(name)
+  }
+
+  override private[flink] def withName(newName: String): Attribute = {
+    if (newName == name) {
+      this
+    } else {
+      ResolvedFieldReference(newName, resultType)
+    }
+  }
+}
+
+case class Alias(child: Expression, name: String, extraNames: Seq[String] = Seq())
+    extends UnaryExpression with NamedExpression {
+
+  override def toString = s"$child as '$name"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.alias(child.toRexNode, name)
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = child.resultType
+
+  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+    val child: Expression = anyRefs.head.asInstanceOf[Expression]
+    copy(child, name, extraNames).asInstanceOf[this.type]
+  }
+
+  override private[flink] def toAttribute: Attribute = {
+    if (valid) {
+      ResolvedFieldReference(name, child.resultType)
+    } else {
+      UnresolvedFieldReference(name)
+    }
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (name == "*") {
+      ValidationFailure("Alias can not accept '*' as name.")
+    } else if (extraNames.nonEmpty) {
+      ValidationFailure("Invalid call to Alias with multiple names.")
+    } else {
+      ValidationSuccess
+    }
+  }
+}
+
+case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression {
+
+  override private[flink] def name: String =
+    throw UnresolvedException("Invalid call to name on UnresolvedAlias")
+
+  override private[flink] def toAttribute: Attribute =
+    throw UnresolvedException("Invalid call to toAttribute on UnresolvedAlias")
+
+  override private[flink] def resultType: TypeInformation[_] =
+    throw UnresolvedException("Invalid call to resultType on UnresolvedAlias")
+
+  override private[flink] lazy val valid = false
+}
+
+case class RowtimeAttribute() extends Attribute {
+  override private[flink] def withName(newName: String): Attribute = {
+    if (newName == "rowtime") {
+      this
+    } else {
+      throw new ValidationException("Cannot rename streaming rowtime attribute.")
+    }
+  }
+
+  override private[flink] def name: String = "rowtime"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    throw new UnsupportedOperationException("A rowtime attribute can not be used solely.")
+  }
+
+  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.LONG_TYPE_INFO
+}
+
+case class WindowReference(name: String) extends Attribute {
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+    throw new UnsupportedOperationException("A window reference can not be used solely.")
+
+  override private[flink] def resultType: TypeInformation[_] =
+    throw new UnsupportedOperationException("A window reference has no result type.")
+
+  override private[flink] def withName(newName: String): Attribute = {
+    if (newName == name) {
+      this
+    } else {
+      throw new ValidationException("Cannot rename window reference.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
new file mode 100644
index 0000000..ccdfc2d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import java.sql.{Date, Time, Timestamp}
+import java.util.{Calendar, TimeZone}
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlIntervalQualifier
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+
+object Literal {
+  private[flink] def apply(l: Any): Literal = l match {
+    case i: Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
+    case s: Short => Literal(s, BasicTypeInfo.SHORT_TYPE_INFO)
+    case b: Byte => Literal(b, BasicTypeInfo.BYTE_TYPE_INFO)
+    case l: Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
+    case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
+    case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
+    case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
+    case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+    case javaDec: java.math.BigDecimal => Literal(javaDec, BasicTypeInfo.BIG_DEC_TYPE_INFO)
+    case scalaDec: scala.math.BigDecimal =>
+      Literal(scalaDec.bigDecimal, BasicTypeInfo.BIG_DEC_TYPE_INFO)
+    case sqlDate: Date => Literal(sqlDate, SqlTimeTypeInfo.DATE)
+    case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
+    case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
+  }
+}
+
+case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpression {
+  override def toString = resultType match {
+    case _: BasicTypeInfo[_] => value.toString
+    case _@SqlTimeTypeInfo.DATE => value.toString + ".toDate"
+    case _@SqlTimeTypeInfo.TIME => value.toString + ".toTime"
+    case _@SqlTimeTypeInfo.TIMESTAMP => value.toString + ".toTimestamp"
+    case _@TimeIntervalTypeInfo.INTERVAL_MILLIS => value.toString + ".millis"
+    case _@TimeIntervalTypeInfo.INTERVAL_MONTHS => value.toString + ".months"
+    case _@RowIntervalTypeInfo.INTERVAL_ROWS => value.toString + ".rows"
+    case _ => s"Literal($value, $resultType)"
+  }
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    resultType match {
+      case BasicTypeInfo.BIG_DEC_TYPE_INFO =>
+        val bigDecValue = value.asInstanceOf[java.math.BigDecimal]
+        val decType = relBuilder.getTypeFactory.createSqlType(SqlTypeName.DECIMAL)
+        relBuilder.getRexBuilder.makeExactLiteral(bigDecValue, decType)
+
+      // date/time
+      case SqlTimeTypeInfo.DATE =>
+        relBuilder.getRexBuilder.makeDateLiteral(dateToCalendar)
+      case SqlTimeTypeInfo.TIME =>
+        relBuilder.getRexBuilder.makeTimeLiteral(dateToCalendar, 0)
+      case SqlTimeTypeInfo.TIMESTAMP =>
+        relBuilder.getRexBuilder.makeTimestampLiteral(dateToCalendar, 3)
+
+      case TimeIntervalTypeInfo.INTERVAL_MONTHS =>
+        val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Int])
+        val intervalQualifier = new SqlIntervalQualifier(
+          TimeUnit.YEAR,
+          TimeUnit.MONTH,
+          SqlParserPos.ZERO)
+        relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
+
+      case TimeIntervalTypeInfo.INTERVAL_MILLIS =>
+        val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Long])
+        val intervalQualifier = new SqlIntervalQualifier(
+          TimeUnit.DAY,
+          TimeUnit.SECOND,
+          SqlParserPos.ZERO)
+        relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
+
+      case _ => relBuilder.literal(value)
+    }
+  }
+
+  private def dateToCalendar: Calendar = {
+    val date = value.asInstanceOf[java.util.Date]
+    val cal = Calendar.getInstance()
+    val t = date.getTime
+    // according to Calcite's SqlFunctions.internalToXXX methods
+    cal.setTimeInMillis(t + TimeZone.getDefault.getOffset(t))
+    cal
+  }
+}
+
+case class Null(resultType: TypeInformation[_]) extends LeafExpression {
+  override def toString = s"null"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val rexBuilder = relBuilder.getRexBuilder
+    val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+    rexBuilder
+      .makeCast(
+        typeFactory.createTypeFromTypeInfo(resultType),
+        rexBuilder.constantNull())
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/logic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/logic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/logic.scala
new file mode 100644
index 0000000..dfe00cc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/logic.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.validate._
+
+abstract class BinaryPredicate extends BinaryExpression {
+  override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (left.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
+        right.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"$this only accepts children of Boolean type, " +
+        s"get $left : ${left.resultType} and $right : ${right.resultType}")
+    }
+  }
+}
+
+case class Not(child: Expression) extends UnaryExpression {
+
+  override def toString = s"!($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.not(child.toRexNode)
+  }
+
+  override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (child.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(s"Not operator requires a boolean expression as input, " +
+        s"but $child is of type ${child.resultType}")
+    }
+  }
+}
+
+case class And(left: Expression, right: Expression) extends BinaryPredicate {
+
+  override def toString = s"$left && $right"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.and(left.toRexNode, right.toRexNode)
+  }
+}
+
+case class Or(left: Expression, right: Expression) extends BinaryPredicate {
+
+  override def toString = s"$left || $right"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    relBuilder.or(left.toRexNode, right.toRexNode)
+  }
+}
+
+case class If(
+    condition: Expression,
+    ifTrue: Expression,
+    ifFalse: Expression)
+  extends Expression {
+  private[flink] def children = Seq(condition, ifTrue, ifFalse)
+
+  override private[flink] def resultType = ifTrue.resultType
+
+  override def toString = s"($condition)? $ifTrue : $ifFalse"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+    val c = condition.toRexNode
+    val t = ifTrue.toRexNode
+    val f = ifFalse.toRexNode
+    relBuilder.call(SqlStdOperatorTable.CASE, c, t, f)
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (condition.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
+        ifTrue.resultType == ifFalse.resultType) {
+      ValidationSuccess
+    } else {
+      ValidationFailure(
+        s"If should have boolean condition and same type of ifTrue and ifFalse, get " +
+          s"(${condition.resultType}, ${ifTrue.resultType}, ${ifFalse.resultType})")
+    }
+  }
+}