You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2016/12/16 15:47:06 UTC

[37/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
deleted file mode 100644
index c960a79..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ /dev/null
@@ -1,489 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import org.apache.calcite.avatica.util.DateTimeUtils.{MILLIS_PER_DAY, MILLIS_PER_HOUR, MILLIS_PER_MINUTE, MILLIS_PER_SECOND}
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.table.ExpressionParserException
-import org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval}
-import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
-import org.apache.flink.api.table.expressions.TimePointUnit.TimePointUnit
-import org.apache.flink.api.table.expressions.TrimMode.TrimMode
-import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
-import scala.language.implicitConversions
-import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
- * Parser for expressions inside a String. This parses exactly the same expressions that
- * would be accepted by the Scala Expression DSL.
- *
- * See [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] for the constructs
- * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL
- * lazy valined in the above files.
- */
-object ExpressionParser extends JavaTokenParsers with PackratParsers {
-  case class Keyword(key: String)
-  // Convert the keyword into an case insensitive Parser
-  implicit def keyword2Parser(kw: Keyword): Parser[String] = {
-    ("""(?i)\Q""" + kw.key + """\E""").r
-  }
-  // Keyword
-  lazy val ARRAY: Keyword = Keyword("Array")
-  lazy val AS: Keyword = Keyword("as")
-  lazy val COUNT: Keyword = Keyword("count")
-  lazy val AVG: Keyword = Keyword("avg")
-  lazy val MIN: Keyword = Keyword("min")
-  lazy val MAX: Keyword = Keyword("max")
-  lazy val SUM: Keyword = Keyword("sum")
-  lazy val START: Keyword = Keyword("start")
-  lazy val END: Keyword = Keyword("end")
-  lazy val CAST: Keyword = Keyword("cast")
-  lazy val NULL: Keyword = Keyword("Null")
-  lazy val IF: Keyword = Keyword("?")
-  lazy val ASC: Keyword = Keyword("asc")
-  lazy val DESC: Keyword = Keyword("desc")
-  lazy val TO_DATE: Keyword = Keyword("toDate")
-  lazy val TO_TIME: Keyword = Keyword("toTime")
-  lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
-  lazy val TRIM: Keyword = Keyword("trim")
-  lazy val EXTRACT: Keyword = Keyword("extract")
-  lazy val FLOOR: Keyword = Keyword("floor")
-  lazy val CEIL: Keyword = Keyword("ceil")
-  lazy val YEARS: Keyword = Keyword("years")
-  lazy val YEAR: Keyword = Keyword("year")
-  lazy val MONTHS: Keyword = Keyword("months")
-  lazy val MONTH: Keyword = Keyword("month")
-  lazy val DAYS: Keyword = Keyword("days")
-  lazy val DAY: Keyword = Keyword("day")
-  lazy val HOURS: Keyword = Keyword("hours")
-  lazy val HOUR: Keyword = Keyword("hour")
-  lazy val MINUTES: Keyword = Keyword("minutes")
-  lazy val MINUTE: Keyword = Keyword("minute")
-  lazy val SECONDS: Keyword = Keyword("seconds")
-  lazy val SECOND: Keyword = Keyword("second")
-  lazy val MILLIS: Keyword = Keyword("millis")
-  lazy val MILLI: Keyword = Keyword("milli")
-  lazy val ROWS: Keyword = Keyword("rows")
-  lazy val STAR: Keyword = Keyword("*")
-  lazy val GET: Keyword = Keyword("get")
-  lazy val FLATTEN: Keyword = Keyword("flatten")
-  def functionIdent: ExpressionParser.Parser[String] =
-    not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
-      not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~
-      not(IF) ~> super.ident
-  // symbols
-  lazy val timeIntervalUnit: PackratParser[Expression] = TimeIntervalUnit.values map {
-    case unit: TimeIntervalUnit => literal(unit.toString) ^^^ unit.toExpr
-  } reduceLeft(_ | _)
-  lazy val timePointUnit: PackratParser[Expression] = TimePointUnit.values map {
-    case unit: TimePointUnit => literal(unit.toString) ^^^ unit.toExpr
-  } reduceLeft(_ | _)
-  lazy val trimMode: PackratParser[Expression] = TrimMode.values map {
-    case mode: TrimMode => literal(mode.toString) ^^^ mode.toExpr
-  } reduceLeft(_ | _)
-  // data types
-  lazy val dataType: PackratParser[TypeInformation[_]] =
-    "BYTE" ^^ { ti => BasicTypeInfo.BYTE_TYPE_INFO } |
-      "SHORT" ^^ { ti => BasicTypeInfo.SHORT_TYPE_INFO } |
-      "INTERVAL_MONTHS" ^^ {
-        ti => TimeIntervalTypeInfo.INTERVAL_MONTHS.asInstanceOf[TypeInformation[_]]
-      } |
-      "INTERVAL_MILLIS" ^^ {
-        ti => TimeIntervalTypeInfo.INTERVAL_MILLIS.asInstanceOf[TypeInformation[_]]
-      } |
-      "INT" ^^ { ti => BasicTypeInfo.INT_TYPE_INFO } |
-      "LONG" ^^ { ti => BasicTypeInfo.LONG_TYPE_INFO } |
-      "FLOAT" ^^ { ti => BasicTypeInfo.FLOAT_TYPE_INFO } |
-      "DOUBLE" ^^ { ti => BasicTypeInfo.DOUBLE_TYPE_INFO } |
-      ("BOOLEAN" | "BOOL") ^^ { ti => BasicTypeInfo.BOOLEAN_TYPE_INFO } |
-      "STRING" ^^ { ti => BasicTypeInfo.STRING_TYPE_INFO } |
-      "DATE" ^^ { ti => SqlTimeTypeInfo.DATE.asInstanceOf[TypeInformation[_]] } |
-      "TIMESTAMP" ^^ { ti => SqlTimeTypeInfo.TIMESTAMP } |
-      "TIME" ^^ { ti => SqlTimeTypeInfo.TIME } |
-      "DECIMAL" ^^ { ti => BasicTypeInfo.BIG_DEC_TYPE_INFO }
-  // Literals
-  // same as floatingPointNumber but we do not allow trailing dot "12.d" or "2."
-  lazy val floatingPointNumberFlink: Parser[String] =
-    """-?(\d+(\.\d+)?|\d*\.\d+)([eE][+-]?\d+)?[fFdD]?""".r
-  lazy val numberLiteral: PackratParser[Expression] =
-    (wholeNumber <~ ("l" | "L")) ^^ { n => Literal(n.toLong) } |
-      (decimalNumber <~ ("p" | "P")) ^^ { n => Literal(BigDecimal(n)) } |
-      (floatingPointNumberFlink | decimalNumber) ^^ {
-        n =>
-          if (n.matches("""-?\d+""")) {
-            Literal(n.toInt)
-          } else if (n.endsWith("f") || n.endsWith("F")) {
-            Literal(n.toFloat)
-          } else {
-            Literal(n.toDouble)
-          }
-      }
-  lazy val singleQuoteStringLiteral: Parser[Expression] =
-    ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ {
-      str => Literal(str.substring(1, str.length - 1))
-    }
-  lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ {
-    str => Literal(str.substring(1, str.length - 1))
-  }
-  lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ {
-    str => Literal(str.toBoolean)
-  }
-  lazy val nullLiteral: PackratParser[Expression] = NULL ~ "(" ~> dataType <~ ")" ^^ {
-    dt => Null(dt)
-  }
-  lazy val literalExpr: PackratParser[Expression] =
-    numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | boolLiteral | nullLiteral
-  lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ {
-    sym => UnresolvedFieldReference(sym)
-  }
-  lazy val atom: PackratParser[Expression] =
-    ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
-  // suffix operators
-  lazy val suffixSum: PackratParser[Expression] =
-    composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }
-  lazy val suffixMin: PackratParser[Expression] =
-    composite <~ "." ~ MIN ~ opt("()") ^^ { e => Min(e) }
-  lazy val suffixMax: PackratParser[Expression] =
-    composite <~ "." ~ MAX ~ opt("()") ^^ { e => Max(e) }
-  lazy val suffixCount: PackratParser[Expression] =
-    composite <~ "." ~ COUNT ~ opt("()") ^^ { e => Count(e) }
-  lazy val suffixAvg: PackratParser[Expression] =
-    composite <~ "." ~ AVG ~ opt("()") ^^ { e => Avg(e) }
-  lazy val suffixStart: PackratParser[Expression] =
-    composite <~ "." ~ START ~ opt("()") ^^ { e => WindowStart(e) }
-  lazy val suffixEnd: PackratParser[Expression] =
-    composite <~ "." ~ END ~ opt("()") ^^ { e => WindowEnd(e) }
-  lazy val suffixCast: PackratParser[Expression] =
-    composite ~ "." ~ CAST ~ "(" ~ dataType ~ ")" ^^ {
-    case e ~ _ ~ _ ~ _ ~ dt ~ _ => Cast(e, dt)
-  }
-  lazy val suffixAs: PackratParser[Expression] =
-    composite ~ "." ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
-    case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e,,
-  }
-  lazy val suffixTrim = composite ~ "." ~ TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ ")" ^^ {
-    case operand ~ _ ~ _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ => Trim(mode, trimCharacter, operand)
-  }
-  lazy val suffixTrimWithoutArgs = composite <~ "." ~ TRIM ~ opt("()") ^^ {
-    e => Trim(TrimMode.BOTH, TrimConstants.TRIM_DEFAULT_CHAR, e)
-  }
-  lazy val suffixIf: PackratParser[Expression] =
-    composite ~ "." ~ IF ~ "(" ~ expression ~ "," ~ expression ~ ")" ^^ {
-    case condition ~ _ ~ _ ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => If(condition, ifTrue, ifFalse)
-  }
-  lazy val suffixExtract = composite ~ "." ~ EXTRACT ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
-    case operand ~ _  ~ _ ~ _ ~ unit ~ _ => Extract(unit, operand)
-  }
-  lazy val suffixFloor = composite ~ "." ~ FLOOR ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
-    case operand ~ _  ~ _ ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
-  }
-  lazy val suffixCeil = composite ~ "." ~ CEIL ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
-    case operand ~ _  ~ _ ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
-  }
-  lazy val suffixFunctionCall =
-    composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
-    case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
-  }
-  lazy val suffixFunctionCallOneArg = composite ~ "." ~ functionIdent ^^ {
-    case operand ~ _ ~ name => Call(name.toUpperCase, Seq(operand))
-  }
-  lazy val suffixAsc : PackratParser[Expression] =
-    atom <~ "." ~ ASC ~ opt("()") ^^ { e => Asc(e) }
-  lazy val suffixDesc : PackratParser[Expression] =
-    atom <~ "." ~ DESC ~ opt("()") ^^ { e => Desc(e) }
-  lazy val suffixToDate: PackratParser[Expression] =
-    composite <~ "." ~ TO_DATE ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.DATE) }
-  lazy val suffixToTimestamp: PackratParser[Expression] =
-    composite <~ "." ~ TO_TIMESTAMP ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIMESTAMP) }
-  lazy val suffixToTime: PackratParser[Expression] =
-    composite <~ "." ~ TO_TIME ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIME) }
-  lazy val suffixTimeInterval : PackratParser[Expression] =
-    composite ~ "." ~ (YEARS | MONTHS | DAYS | HOURS | MINUTES | SECONDS | MILLIS |
-    case expr ~ _ ~ (YEARS.key | YEAR.key) => toMonthInterval(expr, 12)
-    case expr ~ _ ~ (MONTHS.key | MONTH.key) => toMonthInterval(expr, 1)
-    case expr ~ _ ~ (DAYS.key | DAY.key) => toMilliInterval(expr, MILLIS_PER_DAY)
-    case expr ~ _ ~ (HOURS.key | HOUR.key) => toMilliInterval(expr, MILLIS_PER_HOUR)
-    case expr ~ _ ~ (MINUTES.key | MINUTE.key) => toMilliInterval(expr, MILLIS_PER_MINUTE)
-    case expr ~ _ ~ (SECONDS.key | SECOND.key) => toMilliInterval(expr, MILLIS_PER_SECOND)
-    case expr ~ _ ~ (MILLIS.key | MILLI.key)=> toMilliInterval(expr, 1)
-  }
-  lazy val suffixRowInterval : PackratParser[Expression] =
-    composite <~ "." ~ ROWS ^^ { e => ExpressionUtils.toRowInterval(e) }
-  lazy val suffixGet: PackratParser[Expression] =
-    composite ~ "." ~ GET ~ "(" ~ literalExpr ~ ")" ^^ {
-      case e ~ _ ~ _ ~ _ ~ index ~ _ =>
-        GetCompositeField(e, index.asInstanceOf[Literal].value)
-  }
-  lazy val suffixFlattening: PackratParser[Expression] =
-    composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e => Flattening(e) }
-  lazy val suffixed: PackratParser[Expression] =
-    suffixTimeInterval | suffixRowInterval | suffixSum | suffixMin | suffixMax | suffixStart |
-      suffixEnd | suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim |
-      suffixTrimWithoutArgs | suffixIf | suffixAsc | suffixDesc | suffixToDate |
-      suffixToTimestamp | suffixToTime | suffixExtract | suffixFloor | suffixCeil |
-      suffixGet | suffixFlattening |
-      suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end
-  // prefix operators
-  lazy val prefixArray: PackratParser[Expression] =
-    ARRAY ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { elements => ArrayConstructor(elements) }
-  lazy val prefixSum: PackratParser[Expression] =
-    SUM ~ "(" ~> expression <~ ")" ^^ { e => Sum(e) }
-  lazy val prefixMin: PackratParser[Expression] =
-    MIN ~ "(" ~> expression <~ ")" ^^ { e => Min(e) }
-  lazy val prefixMax: PackratParser[Expression] =
-    MAX ~ "(" ~> expression <~ ")" ^^ { e => Max(e) }
-  lazy val prefixCount: PackratParser[Expression] =
-    COUNT ~ "(" ~> expression <~ ")" ^^ { e => Count(e) }
-  lazy val prefixAvg: PackratParser[Expression] =
-    AVG ~ "(" ~> expression <~ ")" ^^ { e => Avg(e) }
-  lazy val prefixStart: PackratParser[Expression] =
-    START ~ "(" ~> expression <~ ")" ^^ { e => WindowStart(e) }
-  lazy val prefixEnd: PackratParser[Expression] =
-    END ~ "(" ~> expression <~ ")" ^^ { e => WindowEnd(e) }
-  lazy val prefixCast: PackratParser[Expression] =
-    CAST ~ "(" ~ expression ~ "," ~ dataType ~ ")" ^^ {
-    case _ ~ _ ~ e ~ _ ~ dt ~ _ => Cast(e, dt)
-  }
-  lazy val prefixAs: PackratParser[Expression] =
-    AS ~ "(" ~ expression ~ "," ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
-    case _ ~ _ ~ e ~ _ ~ target ~ _ => Alias(e,,
-  }
-  lazy val prefixIf: PackratParser[Expression] =
-      IF ~ "(" ~ expression ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
-    case _ ~ _ ~ condition ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => If(condition, ifTrue, ifFalse)
-  }
-  lazy val prefixFunctionCall = functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
-    case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args)
-  }
-  lazy val prefixFunctionCallOneArg = functionIdent ~ "(" ~ expression ~ ")" ^^ {
-    case name ~ _ ~ arg ~ _ => Call(name.toUpperCase, Seq(arg))
-  }
-  lazy val prefixTrim = TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
-    case _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ ~ operand ~ _ => Trim(mode, trimCharacter, operand)
-  }
-  lazy val prefixTrimWithoutArgs = TRIM ~ "(" ~ expression ~ ")" ^^ {
-    case _ ~ _ ~ operand ~ _ => Trim(TrimMode.BOTH, TrimConstants.TRIM_DEFAULT_CHAR, operand)
-  }
-  lazy val prefixExtract = EXTRACT ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
-    case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand)
-  }
-  lazy val prefixFloor = FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
-    case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
-  }
-  lazy val prefixCeil = CEIL ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
-    case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
-  }
-  lazy val prefixGet: PackratParser[Expression] =
-    GET ~ "(" ~ composite ~ ","  ~ literalExpr ~ ")" ^^ {
-      case _ ~ _ ~ e ~ _ ~ index ~ _ =>
-        GetCompositeField(e, index.asInstanceOf[Literal].value)
-  }
-  lazy val prefixFlattening: PackratParser[Expression] =
-    FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) }
-  lazy val prefixed: PackratParser[Expression] =
-    prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
-      prefixStart | prefixEnd |
-      prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract |
-      prefixFloor | prefixCeil | prefixGet | prefixFlattening |
-      prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end
-  // suffix/prefix composite
-  lazy val composite: PackratParser[Expression] = suffixed | prefixed | atom |
-    failure("Composite expression expected.")
-  // unary ops
-  lazy val unaryNot: PackratParser[Expression] = "!" ~> composite ^^ { e => Not(e) }
-  lazy val unaryMinus: PackratParser[Expression] = "-" ~> composite ^^ { e => UnaryMinus(e) }
-  lazy val unary = composite | unaryNot | unaryMinus |
-    failure("Unary expression expected.")
-  // arithmetic
-  lazy val product = unary * (
-    "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
-    "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
-    "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } ) |
-    failure("Product expected.")
-  lazy val term = product * (
-    "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
-    "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } ) |
-    failure("Term expected.")
-  // Comparison
-  lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "==" | "=") ~ term ^^ {
-    case l ~ _ ~ r => EqualTo(l, r)
-  }
-  lazy val notEqualTo: PackratParser[Expression] = term ~ ("!==" | "!=" | "<>") ~ term ^^ {
-    case l ~ _ ~ r => NotEqualTo(l, r)
-  }
-  lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
-    case l ~ _ ~ r => GreaterThan(l, r)
-  }
-  lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ {
-    case l ~ _ ~ r => GreaterThanOrEqual(l, r)
-  }
-  lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
-    case l ~ _ ~ r => LessThan(l, r)
-  }
-  lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
-    case l ~ _ ~ r => LessThanOrEqual(l, r)
-  }
-  lazy val comparison: PackratParser[Expression] =
-    equalTo | notEqualTo |
-    greaterThan | greaterThanOrEqual |
-    lessThan | lessThanOrEqual | term |
-    failure("Comparison expected.")
-  // logic
-  lazy val logic = comparison * (
-    "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
-    "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } ) |
-    failure("Logic expected.")
-  // alias
-  lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
-      case e ~ _ ~ name => Alias(e,
-  } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
-    case e ~ _ ~ _ ~ names ~ _ => Alias(e,,
-  } | logic
-  lazy val expression: PackratParser[Expression] = alias |
-    failure("Invalid expression.")
-  lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
-  def parseExpressionList(expression: String): List[Expression] = {
-    parseAll(expressionList, expression) match {
-      case Success(lst, _) => lst
-      case NoSuccess(msg, next) =>
-        throwError(msg, next)
-    }
-  }
-  def parseExpression(exprString: String): Expression = {
-    parseAll(expression, exprString) match {
-      case Success(lst, _) => lst
-      case NoSuccess(msg, next) =>
-        throwError(msg, next)
-    }
-  }
-  private def throwError(msg: String, next: Input): Nothing = {
-    val improvedMsg = msg.replace("string matching regex `\\z'", "End of expression")
-    throw ExpressionParserException(
-      s"""Could not parse expression at column ${next.pos.column}: $improvedMsg
-        |${next.pos.longString}""".stripMargin)
-  }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
deleted file mode 100644
index 8657534..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
+++ /dev/null
@@ -1,154 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import java.lang.{Boolean => JBoolean, Byte => JByte, Short => JShort, Integer => JInteger, Long => JLong, Float => JFloat, Double => JDouble}
-import java.math.{BigDecimal => JBigDecimal}
-import java.sql.{Date, Time, Timestamp}
-import org.apache.calcite.avatica.util.TimeUnit
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexBuilder, RexNode}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.ValidationException
-import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-object ExpressionUtils {
-  private[flink] def toMonthInterval(expr: Expression, multiplier: Int): Expression = expr match {
-    case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
-      Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MONTHS)
-    case _ =>
-      Cast(Mul(expr, Literal(multiplier)), TimeIntervalTypeInfo.INTERVAL_MONTHS)
-  }
-  private[flink] def toMilliInterval(expr: Expression, multiplier: Long): Expression = expr match {
-    case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
-      Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MILLIS)
-    case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) =>
-      Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MILLIS)
-    case _ =>
-      Cast(Mul(expr, Literal(multiplier)), TimeIntervalTypeInfo.INTERVAL_MILLIS)
-  }
-  private[flink] def toRowInterval(expr: Expression): Expression = expr match {
-    case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
-      Literal(value.toLong, RowIntervalTypeInfo.INTERVAL_ROWS)
-    case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) =>
-      Literal(value, RowIntervalTypeInfo.INTERVAL_ROWS)
-    case _ =>
-      throw new IllegalArgumentException("Invalid value for row interval literal.")
-  }
-  private[flink] def convertArray(array: Array[_]): Expression = {
-    def createArray(): Expression = {
-      ArrayConstructor(
-    }
-    array match {
-      // primitives
-      case _: Array[Boolean] => createArray()
-      case _: Array[Byte] => createArray()
-      case _: Array[Short] => createArray()
-      case _: Array[Int] => createArray()
-      case _: Array[Long] => createArray()
-      case _: Array[Float] => createArray()
-      case _: Array[Double] => createArray()
-      // boxed types
-      case _: Array[JBoolean] => createArray()
-      case _: Array[JByte] => createArray()
-      case _: Array[JShort] => createArray()
-      case _: Array[JInteger] => createArray()
-      case _: Array[JLong] => createArray()
-      case _: Array[JFloat] => createArray()
-      case _: Array[JDouble] => createArray()
-      // others
-      case _: Array[String] => createArray()
-      case _: Array[JBigDecimal] => createArray()
-      case _: Array[Date] => createArray()
-      case _: Array[Time] => createArray()
-      case _: Array[Timestamp] => createArray()
-      case bda: Array[BigDecimal] => ArrayConstructor( { bd => Literal(bd.bigDecimal) })
-      case _ =>
-        // nested
-        if (array.length > 0 && array.head.isInstanceOf[Array[_]]) {
-          ArrayConstructor( { na => convertArray(na.asInstanceOf[Array[_]]) })
-        } else {
-          throw ValidationException("Unsupported array type.")
-        }
-    }
-  }
-  // ----------------------------------------------------------------------------------------------
-  // RexNode conversion functions (see org.apache.calcite.sql2rel.StandardConvertletTable)
-  // ----------------------------------------------------------------------------------------------
-  /**
-    * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#getFactor()]].
-    */
-  private[flink] def getFactor(unit: TimeUnit): JBigDecimal = unit match {
-    case TimeUnit.DAY => java.math.BigDecimal.ONE
-    case TimeUnit.HOUR => TimeUnit.DAY.multiplier
-    case TimeUnit.MINUTE => TimeUnit.HOUR.multiplier
-    case TimeUnit.SECOND => TimeUnit.MINUTE.multiplier
-    case TimeUnit.YEAR => java.math.BigDecimal.ONE
-    case TimeUnit.MONTH => TimeUnit.YEAR.multiplier
-    case _ => throw new IllegalArgumentException("Invalid start unit.")
-  }
-  /**
-    * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#mod()]].
-    */
-  private[flink] def mod(
-      rexBuilder: RexBuilder,
-      resType: RelDataType,
-      res: RexNode,
-      value: JBigDecimal)
-    : RexNode = {
-    if (value == JBigDecimal.ONE) return res
-    rexBuilder.makeCall(SqlStdOperatorTable.MOD, res, rexBuilder.makeExactLiteral(value, resType))
-  }
-  /**
-    * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#divide()]].
-    */
-  private[flink] def divide(rexBuilder: RexBuilder, res: RexNode, value: JBigDecimal): RexNode = {
-    if (value == JBigDecimal.ONE) return res
-    if (value.compareTo(JBigDecimal.ONE) < 0 && value.signum == 1) {
-      try {
-        val reciprocal = JBigDecimal.ONE.divide(value, JBigDecimal.ROUND_UNNECESSARY)
-        return rexBuilder.makeCall(
-          SqlStdOperatorTable.MULTIPLY,
-          res,
-          rexBuilder.makeExactLiteral(reciprocal))
-      } catch {
-        case e: ArithmeticException => // ignore
-      }
-    }
-    rexBuilder.makeCall(
-      SqlStdOperatorTable.DIVIDE_INTEGER,
-      res,
-      rexBuilder.makeExactLiteral(value))
-  }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
deleted file mode 100644
index 67e44a1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
+++ /dev/null
@@ -1,57 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import scala.collection.mutable
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.validate._
-  * Expressions that have specification on its inputs.
-  */
-trait InputTypeSpec extends Expression {
-  /**
-    * Input type specification for each child.
-    *
-    * For example, [[Power]] expecting both of the children be of Double Type should use:
-    * {{{
-    *   def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil
-    * }}}
-    */
-  private[flink] def expectedTypes: Seq[TypeInformation[_]]
-  override private[flink] def validateInput(): ValidationResult = {
-    val typeMismatches = mutable.ArrayBuffer.empty[String]
- { case ((e, tpe), i) =>
-      if (e.resultType != tpe) {
-        typeMismatches += s"expecting $tpe on ${i}th input, get ${e.resultType}"
-      }
-    }
-    if (typeMismatches.isEmpty) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(
-        s"""|$this fails on input type checking: ${typeMismatches.mkString("[", ", ", "]")}.
-            |Operand should be casted to proper type
-            |""".stripMargin)
-    }
-  }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
deleted file mode 100644
index 259f7e5..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
+++ /dev/null
@@ -1,100 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.typeutils.TypeCheckUtils
-abstract sealed class Aggregation extends UnaryExpression {
-  override def toString = s"Aggregate($child)"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
-    throw new UnsupportedOperationException("Aggregate cannot be transformed to RexNode")
-  /**
-    * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
-    */
-  private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall
-case class Sum(child: Expression) extends Aggregation {
-  override def toString = s"sum($child)"
-  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, null, name, child.toRexNode)
-  }
-  override private[flink] def resultType = child.resultType
-  override private[flink] def validateInput() =
-    TypeCheckUtils.assertNumericExpr(child.resultType, "sum")
-case class Min(child: Expression) extends Aggregation {
-  override def toString = s"min($child)"
-  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, null, name, child.toRexNode)
-  }
-  override private[flink] def resultType = child.resultType
-  override private[flink] def validateInput() =
-    TypeCheckUtils.assertOrderableExpr(child.resultType, "min")
-case class Max(child: Expression) extends Aggregation {
-  override def toString = s"max($child)"
-  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, null, name, child.toRexNode)
-  }
-  override private[flink] def resultType = child.resultType
-  override private[flink] def validateInput() =
-    TypeCheckUtils.assertOrderableExpr(child.resultType, "max")
-case class Count(child: Expression) extends Aggregation {
-  override def toString = s"count($child)"
-  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, name, child.toRexNode)
-  }
-  override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
-case class Avg(child: Expression) extends Aggregation {
-  override def toString = s"avg($child)"
-  override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
-    relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, null, name, child.toRexNode)
-  }
-  override private[flink] def resultType = child.resultType
-  override private[flink] def validateInput() =
-    TypeCheckUtils.assertNumericExpr(child.resultType, "avg")
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
deleted file mode 100644
index 8702886..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
+++ /dev/null
@@ -1,158 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.`type`.IntervalSqlType
-import org.apache.calcite.sql.SqlOperator
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.typeutils.TypeCheckUtils._
-import org.apache.flink.api.table.typeutils.TypeCoercion
-import org.apache.flink.api.table.validate._
-import scala.collection.JavaConversions._
-abstract class BinaryArithmetic extends BinaryExpression {
-  private[flink] def sqlOperator: SqlOperator
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-  }
-  override private[flink] def resultType: TypeInformation[_] =
-    TypeCoercion.widerTypeOf(left.resultType, right.resultType) match {
-      case Some(t) => t
-      case None =>
-        throw new RuntimeException("This should never happen.")
-    }
-  // TODO: tighten this rule once we implemented type coercion rules during validation
-  override private[flink] def validateInput(): ValidationResult = {
-    if (!isNumeric(left.resultType) || !isNumeric(right.resultType)) {
-      ValidationFailure(s"$this requires both operands Numeric, get " +
-        s"$left : ${left.resultType} and $right : ${right.resultType}")
-    } else {
-      ValidationSuccess
-    }
-  }
-case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left + $right)"
-  private[flink] val sqlOperator = SqlStdOperatorTable.PLUS
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    if(isString(left.resultType)) {
-      val castedRight = Cast(right, BasicTypeInfo.STRING_TYPE_INFO)
-, left.toRexNode, castedRight.toRexNode)
-    } else if(isString(right.resultType)) {
-      val castedLeft = Cast(left, BasicTypeInfo.STRING_TYPE_INFO)
-, castedLeft.toRexNode, right.toRexNode)
-    } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
-, left.toRexNode, right.toRexNode)
-    } else if (isTimeInterval(left.resultType) && isTemporal(right.resultType)) {
-      // Calcite has a bug that can't apply INTERVAL + DATETIME (INTERVAL at left)
-      // we manually switch them here
-, right.toRexNode, left.toRexNode)
-    } else if (isTemporal(left.resultType) && isTemporal(right.resultType)) {
-, left.toRexNode, right.toRexNode)
-    } else {
-      val castedLeft = Cast(left, resultType)
-      val castedRight = Cast(right, resultType)
-, castedLeft.toRexNode, castedRight.toRexNode)
-    }
-  }
-  override private[flink] def validateInput(): ValidationResult = {
-    if (isString(left.resultType) || isString(right.resultType)) {
-      ValidationSuccess
-    } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
-      ValidationSuccess
-    } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) {
-      ValidationSuccess
-    } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
-      ValidationSuccess
-    } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(
-        s"$this requires Numeric, String, Intervals of same type, " +
-        s"or Interval and a time point input, " +
-        s"get $left : ${left.resultType} and $right : ${right.resultType}")
-    }
-  }
-case class UnaryMinus(child: Expression) extends UnaryExpression {
-  override def toString = s"-($child)"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-, child.toRexNode)
-  }
-  override private[flink] def resultType = child.resultType
-  override private[flink] def validateInput(): ValidationResult = {
-    if (isNumeric(child.resultType)) {
-      ValidationSuccess
-    } else if (isTimeInterval(child.resultType)) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"$this requires Numeric, or Interval input, get ${child.resultType}")
-    }
-  }
-case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left - $right)"
-  private[flink] val sqlOperator = SqlStdOperatorTable.MINUS
-  override private[flink] def validateInput(): ValidationResult = {
-    if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
-      ValidationSuccess
-    } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) {
-      ValidationSuccess
-    } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
-      ValidationSuccess
-    } else {
-      super.validateInput()
-    }
-  }
-case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left / $right)"
-  private[flink] val sqlOperator = SqlStdOperatorTable.DIVIDE
-case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left * $right)"
-  private[flink] val sqlOperator = SqlStdOperatorTable.MULTIPLY
-case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
-  override def toString = s"($left % $right)"
-  private[flink] val sqlOperator = SqlStdOperatorTable.MOD
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/array.scala
deleted file mode 100644
index 78084de..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/array.scala
+++ /dev/null
@@ -1,146 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.table.FlinkRelBuilder
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-import scala.collection.JavaConverters._
-case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
-  override private[flink] def children: Seq[Expression] = elements
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    val relDataType = relBuilder
-      .asInstanceOf[FlinkRelBuilder]
-      .getTypeFactory
-      .createTypeFromTypeInfo(resultType)
-    val values =
-    relBuilder
-      .getRexBuilder
-      .makeCall(relDataType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, values)
-  }
-  override def toString = s"array(${elements.mkString(", ")})"
-  override private[flink] def resultType = ObjectArrayTypeInfo.getInfoFor(elements.head.resultType)
-  override private[flink] def validateInput(): ValidationResult = {
-    if (elements.isEmpty) {
-      return ValidationFailure("Empty arrays are not supported yet.")
-    }
-    val elementType = elements.head.resultType
-    if (!elements.forall(_.resultType == elementType)) {
-      ValidationFailure("Not all elements of the array have the same type.")
-    } else {
-      ValidationSuccess
-    }
-  }
-case class ArrayElementAt(array: Expression, index: Expression) extends Expression {
-  override private[flink] def children: Seq[Expression] = Seq(array, index)
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder
-      .getRexBuilder
-      .makeCall(SqlStdOperatorTable.ITEM, array.toRexNode, index.toRexNode)
-  }
-  override def toString = s"($array).at($index)"
-  override private[flink] def resultType = array.resultType match {
-    case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
-    case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
-  }
-  override private[flink] def validateInput(): ValidationResult = {
-    array.resultType match {
-      case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] =>
-        if (index.resultType == INT_TYPE_INFO) {
-          // check for common user mistake
-          index match {
-            case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
-              ValidationFailure(
-                s"Array element access needs an index starting at 1 but was $value.")
-            case _ => ValidationSuccess
-          }
-        } else {
-          ValidationFailure(
-            s"Array element access needs an integer index but was '${index.resultType}'.")
-        }
-      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
-    }
-  }
-case class ArrayCardinality(array: Expression) extends Expression {
-  override private[flink] def children: Seq[Expression] = Seq(array)
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder
-      .getRexBuilder
-      .makeCall(SqlStdOperatorTable.CARDINALITY, array.toRexNode)
-  }
-  override def toString = s"($array).cardinality()"
-  override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
-  override private[flink] def validateInput(): ValidationResult = {
-    array.resultType match {
-      case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => ValidationSuccess
-      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
-    }
-  }
-case class ArrayElement(array: Expression) extends Expression {
-  override private[flink] def children: Seq[Expression] = Seq(array)
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder
-      .getRexBuilder
-      .makeCall(SqlStdOperatorTable.ELEMENT, array.toRexNode)
-  }
-  override def toString = s"($array).element()"
-  override private[flink] def resultType = array.resultType match {
-    case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
-    case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
-  }
-  override private[flink] def validateInput(): ValidationResult = {
-    array.resultType match {
-      case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => ValidationSuccess
-      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
-    }
-  }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
deleted file mode 100644
index 3bb9dac..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
+++ /dev/null
@@ -1,159 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction}
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.api.table.plan.logical.{LogicalNode, LogicalTableFunctionCall}
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-import org.apache.flink.api.table.{FlinkTypeFactory, UnresolvedException, ValidationException}
-  * General expression for unresolved function calls. The function can be a built-in
-  * scalar function or a user-defined scalar function.
-  */
-case class Call(functionName: String, args: Seq[Expression]) extends Expression {
-  override private[flink] def children: Seq[Expression] = args
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    throw UnresolvedException(s"trying to convert UnresolvedFunction $functionName to RexNode")
-  }
-  override def toString = s"\\$functionName(${args.mkString(", ")})"
-  override private[flink] def resultType =
-    throw UnresolvedException(s"calling resultType on UnresolvedFunction $functionName")
-  override private[flink] def validateInput(): ValidationResult =
-    ValidationFailure(s"Unresolved function call: $functionName")
-  * Expression for calling a user-defined scalar functions.
-  *
-  * @param scalarFunction scalar function to be called (might be overloaded)
-  * @param parameters actual parameters that determine target evaluation method
-  */
-case class ScalarFunctionCall(
-    scalarFunction: ScalarFunction,
-    parameters: Seq[Expression])
-  extends Expression {
-  private var foundSignature: Option[Array[Class[_]]] = None
-  override private[flink] def children: Seq[Expression] = parameters
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-      createScalarSqlFunction(
-        scalarFunction.getClass.getCanonicalName,
-        scalarFunction,
-        typeFactory),
- _*)
-  }
-  override def toString =
-    s"${scalarFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
-  override private[flink] def resultType = getResultType(scalarFunction, foundSignature.get)
-  override private[flink] def validateInput(): ValidationResult = {
-    val signature =
-    // look for a signature that matches the input types
-    foundSignature = getSignature(scalarFunction, signature)
-    if (foundSignature.isEmpty) {
-      ValidationFailure(s"Given parameters do not match any signature. \n" +
-        s"Actual: ${signatureToString(signature)} \n" +
-        s"Expected: ${signaturesToString(scalarFunction)}")
-    } else {
-      ValidationSuccess
-    }
-  }
-  *
-  * Expression for calling a user-defined table function with actual parameters.
-  *
-  * @param functionName function name
-  * @param tableFunction user-defined table function
-  * @param parameters actual parameters of function
-  * @param resultType type information of returned table
-  */
-case class TableFunctionCall(
-    functionName: String,
-    tableFunction: TableFunction[_],
-    parameters: Seq[Expression],
-    resultType: TypeInformation[_])
-  extends Expression {
-  private var aliases: Option[Seq[String]] = None
-  override private[flink] def children: Seq[Expression] = parameters
-  /**
-    * Assigns an alias for this table function's returned fields that the following operator
-    * can refer to.
-    *
-    * @param aliasList alias for this table function's returned fields
-    * @return this table function call
-    */
-  private[flink] def as(aliasList: Option[Seq[String]]): TableFunctionCall = {
-    this.aliases = aliasList
-    this
-  }
-  /**
-    * Converts an API class to a logical node for planning.
-    */
-  private[flink] def toLogicalTableFunctionCall(child: LogicalNode): LogicalTableFunctionCall = {
-    val originNames = getFieldInfo(resultType)._1
-    // determine the final field names
-    val fieldNames = if (aliases.isDefined) {
-      val aliasList = aliases.get
-      if (aliasList.length != originNames.length) {
-        throw ValidationException(
-          s"List of column aliases must have same degree as table; " +
-            s"the returned table of function '$functionName' has ${originNames.length} " +
-            s"columns (${originNames.mkString(",")}), " +
-            s"whereas alias list has ${aliasList.length} columns")
-      } else {
-        aliasList.toArray
-      }
-    } else {
-      originNames
-    }
-    LogicalTableFunctionCall(
-      functionName,
-      tableFunction,
-      parameters,
-      resultType,
-      fieldNames,
-      child)
-  }
-  override def toString =
-    s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
deleted file mode 100644
index 2232a91..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
+++ /dev/null
@@ -1,53 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.FlinkTypeFactory
-import org.apache.flink.api.table.typeutils.TypeCoercion
-import org.apache.flink.api.table.validate._
-case class Cast(child: Expression, resultType: TypeInformation[_]) extends UnaryExpression {
-  override def toString = s"$child.cast($resultType)"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    relBuilder
-      .getRexBuilder
-      // we use abstract cast here because RelBuilder.cast() has to many side effects
-      .makeAbstractCast(
-        typeFactory.createTypeFromTypeInfo(resultType),
-        child.toRexNode)
-  }
-  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
-    val child: Expression = anyRefs.head.asInstanceOf[Expression]
-    copy(child, resultType).asInstanceOf[this.type]
-  }
-  override private[flink] def validateInput(): ValidationResult = {
-    if (TypeCoercion.canCast(child.resultType, resultType)) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"Unsupported cast from ${child.resultType} to $resultType")
-    }
-  }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
deleted file mode 100644
index 5a150f8..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
+++ /dev/null
@@ -1,160 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.table.typeutils.TypeCheckUtils.{isComparable, isNumeric}
-import org.apache.flink.api.table.validate._
-import scala.collection.JavaConversions._
-abstract class BinaryComparison extends BinaryExpression {
-  private[flink] def sqlOperator: SqlOperator
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-  }
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-  override private[flink] def validateInput(): ValidationResult =
-    (left.resultType, right.resultType) match {
-      case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
-      case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess
-      case (lType, rType) =>
-        ValidationFailure(
-          s"Comparison is only supported for numeric types and " +
-            s"comparable types of same type, got $lType and $rType")
-    }
-case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left === $right"
-  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.EQUALS
-  override private[flink] def validateInput(): ValidationResult =
-    (left.resultType, right.resultType) match {
-      case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
-      case (lType, rType) if lType == rType => ValidationSuccess
-      case (lType, rType) =>
-        ValidationFailure(s"Equality predicate on incompatible types: $lType and $rType")
-    }
-case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left !== $right"
-  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.NOT_EQUALS
-  override private[flink] def validateInput(): ValidationResult =
-    (left.resultType, right.resultType) match {
-      case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
-      case (lType, rType) if lType == rType => ValidationSuccess
-      case (lType, rType) =>
-        ValidationFailure(s"Inequality predicate on incompatible types: $lType and $rType")
-    }
-case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left > $right"
-  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.GREATER_THAN
-case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left >= $right"
-  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.GREATER_THAN_OR_EQUAL
-case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left < $right"
-  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.LESS_THAN
-case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
-  override def toString = s"$left <= $right"
-  private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.LESS_THAN_OR_EQUAL
-case class IsNull(child: Expression) extends UnaryExpression {
-  override def toString = s"($child).isNull"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.isNull(child.toRexNode)
-  }
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-case class IsNotNull(child: Expression) extends UnaryExpression {
-  override def toString = s"($child).isNotNull"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.isNotNull(child.toRexNode)
-  }
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-case class IsTrue(child: Expression) extends UnaryExpression {
-  override def toString = s"($child).isTrue"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-, child.toRexNode)
-  }
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-case class IsFalse(child: Expression) extends UnaryExpression {
-  override def toString = s"($child).isFalse"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-, child.toRexNode)
-  }
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-case class IsNotTrue(child: Expression) extends UnaryExpression {
-  override def toString = s"($child).isNotTrue"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-, child.toRexNode)
-  }
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
-case class IsNotFalse(child: Expression) extends UnaryExpression {
-  override def toString = s"($child).isNotFalse"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-, child.toRexNode)
-  }
-  override private[flink] def resultType = BOOLEAN_TYPE_INFO
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
deleted file mode 100644
index ee1eb46..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
+++ /dev/null
@@ -1,106 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.UnresolvedException
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-  * Flattening of composite types. All flattenings are resolved into
-  * `GetCompositeField` expressions.
-  */
-case class Flattening(child: Expression) extends UnaryExpression {
-  override def toString = s"$child.flatten()"
-  override private[flink] def resultType: TypeInformation[_] =
-    throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
-  override private[flink] def validateInput(): ValidationResult =
-    ValidationFailure(s"Unresolved flattening of $child")
-case class GetCompositeField(child: Expression, key: Any) extends UnaryExpression {
-  private var fieldIndex: Option[Int] = None
-  override def toString = s"$child.get($key)"
-  override private[flink] def validateInput(): ValidationResult = {
-    // check for composite type
-    if (!child.resultType.isInstanceOf[CompositeType[_]]) {
-      return ValidationFailure(s"Cannot access field of non-composite type '${child.resultType}'.")
-    }
-    val compositeType = child.resultType.asInstanceOf[CompositeType[_]]
-    // check key
-    key match {
-      case name: String =>
-        val index = compositeType.getFieldIndex(name)
-        if (index < 0) {
-          ValidationFailure(s"Field name '$name' could not be found.")
-        } else {
-          fieldIndex = Some(index)
-          ValidationSuccess
-        }
-      case index: Int =>
-        if (index >= compositeType.getArity) {
-          ValidationFailure(s"Field index '$index' exceeds arity.")
-        } else {
-          fieldIndex = Some(index)
-          ValidationSuccess
-        }
-      case _ =>
-        ValidationFailure(s"Invalid key '$key'.")
-    }
-  }
-  override private[flink] def resultType: TypeInformation[_] =
-    child.resultType.asInstanceOf[CompositeType[_]].getTypeAt(fieldIndex.get)
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder
-      .getRexBuilder
-      .makeFieldAccess(child.toRexNode, fieldIndex.get)
-  }
-  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
-    val child: Expression = anyRefs.head.asInstanceOf[Expression]
-    copy(child, key).asInstanceOf[this.type]
-  }
-  /**
-    * Gives a meaningful alias if possible (e.g. a$mypojo$field).
-    */
-  private[flink] def aliasName(): Option[String] = child match {
-    case gcf: GetCompositeField =>
-      val alias = gcf.aliasName()
-      if (alias.isDefined) {
-        Some(s"${alias.get}$$$key")
-      } else {
-        None
-      }
-    case c: ResolvedFieldReference => Some(s"${}$$$key")
-    case _ => None
-  }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
deleted file mode 100644
index e651bb3..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
+++ /dev/null
@@ -1,152 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.{UnresolvedException, ValidationException}
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-trait NamedExpression extends Expression {
-  private[flink] def name: String
-  private[flink] def toAttribute: Attribute
-abstract class Attribute extends LeafExpression with NamedExpression {
-  override private[flink] def toAttribute: Attribute = this
-  private[flink] def withName(newName: String): Attribute
-case class UnresolvedFieldReference(name: String) extends Attribute {
-  override def toString = "\"" + name
-  override private[flink] def withName(newName: String): Attribute =
-    UnresolvedFieldReference(newName)
-  override private[flink] def resultType: TypeInformation[_] =
-    throw UnresolvedException(s"Calling resultType on ${this.getClass}.")
-  override private[flink] def validateInput(): ValidationResult =
-    ValidationFailure(s"Unresolved reference $name.")
-case class ResolvedFieldReference(
-    name: String,
-    resultType: TypeInformation[_]) extends Attribute {
-  override def toString = s"'$name"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.field(name)
-  }
-  override private[flink] def withName(newName: String): Attribute = {
-    if (newName == name) {
-      this
-    } else {
-      ResolvedFieldReference(newName, resultType)
-    }
-  }
-case class Alias(child: Expression, name: String, extraNames: Seq[String] = Seq())
-    extends UnaryExpression with NamedExpression {
-  override def toString = s"$child as '$name"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.alias(child.toRexNode, name)
-  }
-  override private[flink] def resultType: TypeInformation[_] = child.resultType
-  override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
-    val child: Expression = anyRefs.head.asInstanceOf[Expression]
-    copy(child, name, extraNames).asInstanceOf[this.type]
-  }
-  override private[flink] def toAttribute: Attribute = {
-    if (valid) {
-      ResolvedFieldReference(name, child.resultType)
-    } else {
-      UnresolvedFieldReference(name)
-    }
-  }
-  override private[flink] def validateInput(): ValidationResult = {
-    if (name == "*") {
-      ValidationFailure("Alias can not accept '*' as name.")
-    } else if (extraNames.nonEmpty) {
-      ValidationFailure("Invalid call to Alias with multiple names.")
-    } else {
-      ValidationSuccess
-    }
-  }
-case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression {
-  override private[flink] def name: String =
-    throw UnresolvedException("Invalid call to name on UnresolvedAlias")
-  override private[flink] def toAttribute: Attribute =
-    throw UnresolvedException("Invalid call to toAttribute on UnresolvedAlias")
-  override private[flink] def resultType: TypeInformation[_] =
-    throw UnresolvedException("Invalid call to resultType on UnresolvedAlias")
-  override private[flink] lazy val valid = false
-case class RowtimeAttribute() extends Attribute {
-  override private[flink] def withName(newName: String): Attribute = {
-    if (newName == "rowtime") {
-      this
-    } else {
-      throw new ValidationException("Cannot rename streaming rowtime attribute.")
-    }
-  }
-  override private[flink] def name: String = "rowtime"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    throw new UnsupportedOperationException("A rowtime attribute can not be used solely.")
-  }
-  override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.LONG_TYPE_INFO
-case class WindowReference(name: String) extends Attribute {
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
-    throw new UnsupportedOperationException("A window reference can not be used solely.")
-  override private[flink] def resultType: TypeInformation[_] =
-    throw new UnsupportedOperationException("A window reference has no result type.")
-  override private[flink] def withName(newName: String): Attribute = {
-    if (newName == name) {
-      this
-    } else {
-      throw new ValidationException("Cannot rename window reference.")
-    }
-  }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
deleted file mode 100644
index 6382abe..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
+++ /dev/null
@@ -1,120 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import java.sql.{Date, Time, Timestamp}
-import java.util.{Calendar, TimeZone}
-import org.apache.calcite.avatica.util.TimeUnit
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.table.FlinkTypeFactory
-import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-object Literal {
-  private[flink] def apply(l: Any): Literal = l match {
-    case i: Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
-    case s: Short => Literal(s, BasicTypeInfo.SHORT_TYPE_INFO)
-    case b: Byte => Literal(b, BasicTypeInfo.BYTE_TYPE_INFO)
-    case l: Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
-    case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
-    case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
-    case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
-    case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
-    case javaDec: java.math.BigDecimal => Literal(javaDec, BasicTypeInfo.BIG_DEC_TYPE_INFO)
-    case scalaDec: scala.math.BigDecimal =>
-      Literal(scalaDec.bigDecimal, BasicTypeInfo.BIG_DEC_TYPE_INFO)
-    case sqlDate: Date => Literal(sqlDate, SqlTimeTypeInfo.DATE)
-    case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
-    case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
-  }
-case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpression {
-  override def toString = resultType match {
-    case _: BasicTypeInfo[_] => value.toString
-    case _@SqlTimeTypeInfo.DATE => value.toString + ".toDate"
-    case _@SqlTimeTypeInfo.TIME => value.toString + ".toTime"
-    case _@SqlTimeTypeInfo.TIMESTAMP => value.toString + ".toTimestamp"
-    case _@TimeIntervalTypeInfo.INTERVAL_MILLIS => value.toString + ".millis"
-    case _@TimeIntervalTypeInfo.INTERVAL_MONTHS => value.toString + ".months"
-    case _@RowIntervalTypeInfo.INTERVAL_ROWS => value.toString + ".rows"
-    case _ => s"Literal($value, $resultType)"
-  }
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    resultType match {
-      case BasicTypeInfo.BIG_DEC_TYPE_INFO =>
-        val bigDecValue = value.asInstanceOf[java.math.BigDecimal]
-        val decType = relBuilder.getTypeFactory.createSqlType(SqlTypeName.DECIMAL)
-        relBuilder.getRexBuilder.makeExactLiteral(bigDecValue, decType)
-      // date/time
-      case SqlTimeTypeInfo.DATE =>
-        relBuilder.getRexBuilder.makeDateLiteral(dateToCalendar)
-      case SqlTimeTypeInfo.TIME =>
-        relBuilder.getRexBuilder.makeTimeLiteral(dateToCalendar, 0)
-      case SqlTimeTypeInfo.TIMESTAMP =>
-        relBuilder.getRexBuilder.makeTimestampLiteral(dateToCalendar, 3)
-      case TimeIntervalTypeInfo.INTERVAL_MONTHS =>
-        val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Int])
-        val intervalQualifier = new SqlIntervalQualifier(
-          TimeUnit.YEAR,
-          TimeUnit.MONTH,
-          SqlParserPos.ZERO)
-        relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
-      case TimeIntervalTypeInfo.INTERVAL_MILLIS =>
-        val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Long])
-        val intervalQualifier = new SqlIntervalQualifier(
-          TimeUnit.DAY,
-          TimeUnit.SECOND,
-          SqlParserPos.ZERO)
-        relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
-      case _ => relBuilder.literal(value)
-    }
-  }
-  private def dateToCalendar: Calendar = {
-    val date = value.asInstanceOf[java.util.Date]
-    val cal = Calendar.getInstance()
-    val t = date.getTime
-    // according to Calcite's SqlFunctions.internalToXXX methods
-    cal.setTimeInMillis(t + TimeZone.getDefault.getOffset(t))
-    cal
-  }
-case class Null(resultType: TypeInformation[_]) extends LeafExpression {
-  override def toString = s"null"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    val rexBuilder = relBuilder.getRexBuilder
-    val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
-    rexBuilder
-      .makeCast(
-        typeFactory.createTypeFromTypeInfo(resultType),
-        rexBuilder.constantNull())
-  }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
deleted file mode 100644
index 9c8e279..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
+++ /dev/null
@@ -1,107 +0,0 @@
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-import org.apache.calcite.rex.RexNode
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.validate._
-abstract class BinaryPredicate extends BinaryExpression {
-  override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
-  override private[flink] def validateInput(): ValidationResult = {
-    if (left.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
-        right.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"$this only accepts children of Boolean type, " +
-        s"get $left : ${left.resultType} and $right : ${right.resultType}")
-    }
-  }
-case class Not(child: Expression) extends UnaryExpression {
-  override def toString = s"!($child)"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.not(child.toRexNode)
-  }
-  override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
-  override private[flink] def validateInput(): ValidationResult = {
-    if (child.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(s"Not operator requires a boolean expression as input, " +
-        s"but $child is of type ${child.resultType}")
-    }
-  }
-case class And(left: Expression, right: Expression) extends BinaryPredicate {
-  override def toString = s"$left && $right"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.and(left.toRexNode, right.toRexNode)
-  }
-case class Or(left: Expression, right: Expression) extends BinaryPredicate {
-  override def toString = s"$left || $right"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    relBuilder.or(left.toRexNode, right.toRexNode)
-  }
-case class If(
-    condition: Expression,
-    ifTrue: Expression,
-    ifFalse: Expression)
-  extends Expression {
-  private[flink] def children = Seq(condition, ifTrue, ifFalse)
-  override private[flink] def resultType = ifTrue.resultType
-  override def toString = s"($condition)? $ifTrue : $ifFalse"
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
-    val c = condition.toRexNode
-    val t = ifTrue.toRexNode
-    val f = ifFalse.toRexNode
-, c, t, f)
-  }
-  override private[flink] def validateInput(): ValidationResult = {
-    if (condition.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
-        ifTrue.resultType == ifFalse.resultType) {
-      ValidationSuccess
-    } else {
-      ValidationFailure(
-        s"If should have boolean condition and same type of ifTrue and ifFalse, get " +
-          s"(${condition.resultType}, ${ifTrue.resultType}, ${ifFalse.resultType})")
-    }
-  }