You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:52 UTC
[23/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
new file mode 100644
index 0000000..48dbce6
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionParser.scala
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.avatica.util.DateTimeUtils.{MILLIS_PER_DAY, MILLIS_PER_HOUR, MILLIS_PER_MINUTE, MILLIS_PER_SECOND}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.api.ExpressionParserException
+import org.apache.flink.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval}
+import org.apache.flink.table.expressions.TimeIntervalUnit.TimeIntervalUnit
+import org.apache.flink.table.expressions.TimePointUnit.TimePointUnit
+import org.apache.flink.table.expressions.TrimMode.TrimMode
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+
+import scala.language.implicitConversions
+import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
+
+/**
+ * Parser for expressions inside a String. This parses exactly the same expressions that
+ * would be accepted by the Scala Expression DSL.
+ *
+ * See [[org.apache.flink.table.api.scala.ImplicitExpressionConversions]] and
+ * [[org.apache.flink.table.api.scala.ImplicitExpressionOperations]] for the constructs
+ * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL
+ * lazy valined in the above files.
+ */
+object ExpressionParser extends JavaTokenParsers with PackratParsers {
+ case class Keyword(key: String)
+
+ // Convert the keyword into an case insensitive Parser
+ implicit def keyword2Parser(kw: Keyword): Parser[String] = {
+ ("""(?i)\Q""" + kw.key + """\E""").r
+ }
+
+ // Keyword
+
+ lazy val ARRAY: Keyword = Keyword("Array")
+ lazy val AS: Keyword = Keyword("as")
+ lazy val COUNT: Keyword = Keyword("count")
+ lazy val AVG: Keyword = Keyword("avg")
+ lazy val MIN: Keyword = Keyword("min")
+ lazy val MAX: Keyword = Keyword("max")
+ lazy val SUM: Keyword = Keyword("sum")
+ lazy val START: Keyword = Keyword("start")
+ lazy val END: Keyword = Keyword("end")
+ lazy val CAST: Keyword = Keyword("cast")
+ lazy val NULL: Keyword = Keyword("Null")
+ lazy val IF: Keyword = Keyword("?")
+ lazy val ASC: Keyword = Keyword("asc")
+ lazy val DESC: Keyword = Keyword("desc")
+ lazy val TO_DATE: Keyword = Keyword("toDate")
+ lazy val TO_TIME: Keyword = Keyword("toTime")
+ lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
+ lazy val TRIM: Keyword = Keyword("trim")
+ lazy val EXTRACT: Keyword = Keyword("extract")
+ lazy val FLOOR: Keyword = Keyword("floor")
+ lazy val CEIL: Keyword = Keyword("ceil")
+ lazy val YEARS: Keyword = Keyword("years")
+ lazy val YEAR: Keyword = Keyword("year")
+ lazy val MONTHS: Keyword = Keyword("months")
+ lazy val MONTH: Keyword = Keyword("month")
+ lazy val DAYS: Keyword = Keyword("days")
+ lazy val DAY: Keyword = Keyword("day")
+ lazy val HOURS: Keyword = Keyword("hours")
+ lazy val HOUR: Keyword = Keyword("hour")
+ lazy val MINUTES: Keyword = Keyword("minutes")
+ lazy val MINUTE: Keyword = Keyword("minute")
+ lazy val SECONDS: Keyword = Keyword("seconds")
+ lazy val SECOND: Keyword = Keyword("second")
+ lazy val MILLIS: Keyword = Keyword("millis")
+ lazy val MILLI: Keyword = Keyword("milli")
+ lazy val ROWS: Keyword = Keyword("rows")
+ lazy val STAR: Keyword = Keyword("*")
+ lazy val GET: Keyword = Keyword("get")
+ lazy val FLATTEN: Keyword = Keyword("flatten")
+
+ def functionIdent: ExpressionParser.Parser[String] =
+ not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
+ not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~
+ not(IF) ~> super.ident
+
+ // symbols
+
+ lazy val timeIntervalUnit: PackratParser[Expression] = TimeIntervalUnit.values map {
+ case unit: TimeIntervalUnit => literal(unit.toString) ^^^ unit.toExpr
+ } reduceLeft(_ | _)
+
+ lazy val timePointUnit: PackratParser[Expression] = TimePointUnit.values map {
+ case unit: TimePointUnit => literal(unit.toString) ^^^ unit.toExpr
+ } reduceLeft(_ | _)
+
+ lazy val trimMode: PackratParser[Expression] = TrimMode.values map {
+ case mode: TrimMode => literal(mode.toString) ^^^ mode.toExpr
+ } reduceLeft(_ | _)
+
+ // data types
+
+ lazy val dataType: PackratParser[TypeInformation[_]] =
+ "BYTE" ^^ { ti => BasicTypeInfo.BYTE_TYPE_INFO } |
+ "SHORT" ^^ { ti => BasicTypeInfo.SHORT_TYPE_INFO } |
+ "INTERVAL_MONTHS" ^^ {
+ ti => TimeIntervalTypeInfo.INTERVAL_MONTHS.asInstanceOf[TypeInformation[_]]
+ } |
+ "INTERVAL_MILLIS" ^^ {
+ ti => TimeIntervalTypeInfo.INTERVAL_MILLIS.asInstanceOf[TypeInformation[_]]
+ } |
+ "INT" ^^ { ti => BasicTypeInfo.INT_TYPE_INFO } |
+ "LONG" ^^ { ti => BasicTypeInfo.LONG_TYPE_INFO } |
+ "FLOAT" ^^ { ti => BasicTypeInfo.FLOAT_TYPE_INFO } |
+ "DOUBLE" ^^ { ti => BasicTypeInfo.DOUBLE_TYPE_INFO } |
+ ("BOOLEAN" | "BOOL") ^^ { ti => BasicTypeInfo.BOOLEAN_TYPE_INFO } |
+ "STRING" ^^ { ti => BasicTypeInfo.STRING_TYPE_INFO } |
+ "DATE" ^^ { ti => SqlTimeTypeInfo.DATE.asInstanceOf[TypeInformation[_]] } |
+ "TIMESTAMP" ^^ { ti => SqlTimeTypeInfo.TIMESTAMP } |
+ "TIME" ^^ { ti => SqlTimeTypeInfo.TIME } |
+ "DECIMAL" ^^ { ti => BasicTypeInfo.BIG_DEC_TYPE_INFO }
+
+ // Literals
+
+ // same as floatingPointNumber but we do not allow trailing dot "12.d" or "2."
+ lazy val floatingPointNumberFlink: Parser[String] =
+ """-?(\d+(\.\d+)?|\d*\.\d+)([eE][+-]?\d+)?[fFdD]?""".r
+
+ lazy val numberLiteral: PackratParser[Expression] =
+ (wholeNumber <~ ("l" | "L")) ^^ { n => Literal(n.toLong) } |
+ (decimalNumber <~ ("p" | "P")) ^^ { n => Literal(BigDecimal(n)) } |
+ (floatingPointNumberFlink | decimalNumber) ^^ {
+ n =>
+ if (n.matches("""-?\d+""")) {
+ Literal(n.toInt)
+ } else if (n.endsWith("f") || n.endsWith("F")) {
+ Literal(n.toFloat)
+ } else {
+ Literal(n.toDouble)
+ }
+ }
+
+ lazy val singleQuoteStringLiteral: Parser[Expression] =
+ ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ {
+ str => Literal(str.substring(1, str.length - 1))
+ }
+
+ lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ {
+ str => Literal(str.substring(1, str.length - 1))
+ }
+
+ lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ {
+ str => Literal(str.toBoolean)
+ }
+
+ lazy val nullLiteral: PackratParser[Expression] = NULL ~ "(" ~> dataType <~ ")" ^^ {
+ dt => Null(dt)
+ }
+
+ lazy val literalExpr: PackratParser[Expression] =
+ numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | boolLiteral | nullLiteral
+
+ lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ {
+ sym => UnresolvedFieldReference(sym)
+ }
+
+ lazy val atom: PackratParser[Expression] =
+ ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
+
+ // suffix operators
+
+ lazy val suffixSum: PackratParser[Expression] =
+ composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }
+
+ lazy val suffixMin: PackratParser[Expression] =
+ composite <~ "." ~ MIN ~ opt("()") ^^ { e => Min(e) }
+
+ lazy val suffixMax: PackratParser[Expression] =
+ composite <~ "." ~ MAX ~ opt("()") ^^ { e => Max(e) }
+
+ lazy val suffixCount: PackratParser[Expression] =
+ composite <~ "." ~ COUNT ~ opt("()") ^^ { e => Count(e) }
+
+ lazy val suffixAvg: PackratParser[Expression] =
+ composite <~ "." ~ AVG ~ opt("()") ^^ { e => Avg(e) }
+
+ lazy val suffixStart: PackratParser[Expression] =
+ composite <~ "." ~ START ~ opt("()") ^^ { e => WindowStart(e) }
+
+ lazy val suffixEnd: PackratParser[Expression] =
+ composite <~ "." ~ END ~ opt("()") ^^ { e => WindowEnd(e) }
+
+ lazy val suffixCast: PackratParser[Expression] =
+ composite ~ "." ~ CAST ~ "(" ~ dataType ~ ")" ^^ {
+ case e ~ _ ~ _ ~ _ ~ dt ~ _ => Cast(e, dt)
+ }
+
+ lazy val suffixAs: PackratParser[Expression] =
+ composite ~ "." ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+ case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
+ }
+
+ lazy val suffixTrim = composite ~ "." ~ TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ ")" ^^ {
+ case operand ~ _ ~ _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ => Trim(mode, trimCharacter, operand)
+ }
+
+ lazy val suffixTrimWithoutArgs = composite <~ "." ~ TRIM ~ opt("()") ^^ {
+ e => Trim(TrimMode.BOTH, TrimConstants.TRIM_DEFAULT_CHAR, e)
+ }
+
+ lazy val suffixIf: PackratParser[Expression] =
+ composite ~ "." ~ IF ~ "(" ~ expression ~ "," ~ expression ~ ")" ^^ {
+ case condition ~ _ ~ _ ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => If(condition, ifTrue, ifFalse)
+ }
+
+ lazy val suffixExtract = composite ~ "." ~ EXTRACT ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+ case operand ~ _ ~ _ ~ _ ~ unit ~ _ => Extract(unit, operand)
+ }
+
+ lazy val suffixFloor = composite ~ "." ~ FLOOR ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+ case operand ~ _ ~ _ ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
+ }
+
+ lazy val suffixCeil = composite ~ "." ~ CEIL ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
+ case operand ~ _ ~ _ ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
+ }
+
+ lazy val suffixFunctionCall =
+ composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+ case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
+ }
+
+ lazy val suffixFunctionCallOneArg = composite ~ "." ~ functionIdent ^^ {
+ case operand ~ _ ~ name => Call(name.toUpperCase, Seq(operand))
+ }
+
+ lazy val suffixAsc : PackratParser[Expression] =
+ atom <~ "." ~ ASC ~ opt("()") ^^ { e => Asc(e) }
+
+ lazy val suffixDesc : PackratParser[Expression] =
+ atom <~ "." ~ DESC ~ opt("()") ^^ { e => Desc(e) }
+
+ lazy val suffixToDate: PackratParser[Expression] =
+ composite <~ "." ~ TO_DATE ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.DATE) }
+
+ lazy val suffixToTimestamp: PackratParser[Expression] =
+ composite <~ "." ~ TO_TIMESTAMP ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIMESTAMP) }
+
+ lazy val suffixToTime: PackratParser[Expression] =
+ composite <~ "." ~ TO_TIME ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIME) }
+
+ lazy val suffixTimeInterval : PackratParser[Expression] =
+ composite ~ "." ~ (YEARS | MONTHS | DAYS | HOURS | MINUTES | SECONDS | MILLIS |
+ YEAR | MONTH | DAY | HOUR | MINUTE | SECOND | MILLI) ^^ {
+
+ case expr ~ _ ~ (YEARS.key | YEAR.key) => toMonthInterval(expr, 12)
+
+ case expr ~ _ ~ (MONTHS.key | MONTH.key) => toMonthInterval(expr, 1)
+
+ case expr ~ _ ~ (DAYS.key | DAY.key) => toMilliInterval(expr, MILLIS_PER_DAY)
+
+ case expr ~ _ ~ (HOURS.key | HOUR.key) => toMilliInterval(expr, MILLIS_PER_HOUR)
+
+ case expr ~ _ ~ (MINUTES.key | MINUTE.key) => toMilliInterval(expr, MILLIS_PER_MINUTE)
+
+ case expr ~ _ ~ (SECONDS.key | SECOND.key) => toMilliInterval(expr, MILLIS_PER_SECOND)
+
+ case expr ~ _ ~ (MILLIS.key | MILLI.key)=> toMilliInterval(expr, 1)
+ }
+
+ lazy val suffixRowInterval : PackratParser[Expression] =
+ composite <~ "." ~ ROWS ^^ { e => ExpressionUtils.toRowInterval(e) }
+
+ lazy val suffixGet: PackratParser[Expression] =
+ composite ~ "." ~ GET ~ "(" ~ literalExpr ~ ")" ^^ {
+ case e ~ _ ~ _ ~ _ ~ index ~ _ =>
+ GetCompositeField(e, index.asInstanceOf[Literal].value)
+ }
+
+ lazy val suffixFlattening: PackratParser[Expression] =
+ composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e => Flattening(e) }
+
+ lazy val suffixed: PackratParser[Expression] =
+ suffixTimeInterval | suffixRowInterval | suffixSum | suffixMin | suffixMax | suffixStart |
+ suffixEnd | suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim |
+ suffixTrimWithoutArgs | suffixIf | suffixAsc | suffixDesc | suffixToDate |
+ suffixToTimestamp | suffixToTime | suffixExtract | suffixFloor | suffixCeil |
+ suffixGet | suffixFlattening |
+ suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end
+
+ // prefix operators
+
+ lazy val prefixArray: PackratParser[Expression] =
+ ARRAY ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { elements => ArrayConstructor(elements) }
+
+ lazy val prefixSum: PackratParser[Expression] =
+ SUM ~ "(" ~> expression <~ ")" ^^ { e => Sum(e) }
+
+ lazy val prefixMin: PackratParser[Expression] =
+ MIN ~ "(" ~> expression <~ ")" ^^ { e => Min(e) }
+
+ lazy val prefixMax: PackratParser[Expression] =
+ MAX ~ "(" ~> expression <~ ")" ^^ { e => Max(e) }
+
+ lazy val prefixCount: PackratParser[Expression] =
+ COUNT ~ "(" ~> expression <~ ")" ^^ { e => Count(e) }
+
+ lazy val prefixAvg: PackratParser[Expression] =
+ AVG ~ "(" ~> expression <~ ")" ^^ { e => Avg(e) }
+
+ lazy val prefixStart: PackratParser[Expression] =
+ START ~ "(" ~> expression <~ ")" ^^ { e => WindowStart(e) }
+
+ lazy val prefixEnd: PackratParser[Expression] =
+ END ~ "(" ~> expression <~ ")" ^^ { e => WindowEnd(e) }
+
+ lazy val prefixCast: PackratParser[Expression] =
+ CAST ~ "(" ~ expression ~ "," ~ dataType ~ ")" ^^ {
+ case _ ~ _ ~ e ~ _ ~ dt ~ _ => Cast(e, dt)
+ }
+
+ lazy val prefixAs: PackratParser[Expression] =
+ AS ~ "(" ~ expression ~ "," ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+ case _ ~ _ ~ e ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
+ }
+
+ lazy val prefixIf: PackratParser[Expression] =
+ IF ~ "(" ~ expression ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+ case _ ~ _ ~ condition ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => If(condition, ifTrue, ifFalse)
+ }
+
+ lazy val prefixFunctionCall = functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
+ case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args)
+ }
+
+ lazy val prefixFunctionCallOneArg = functionIdent ~ "(" ~ expression ~ ")" ^^ {
+ case name ~ _ ~ arg ~ _ => Call(name.toUpperCase, Seq(arg))
+ }
+
+ lazy val prefixTrim = TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
+ case _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ ~ operand ~ _ => Trim(mode, trimCharacter, operand)
+ }
+
+ lazy val prefixTrimWithoutArgs = TRIM ~ "(" ~ expression ~ ")" ^^ {
+ case _ ~ _ ~ operand ~ _ => Trim(TrimMode.BOTH, TrimConstants.TRIM_DEFAULT_CHAR, operand)
+ }
+
+ lazy val prefixExtract = EXTRACT ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+ case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand)
+ }
+
+ lazy val prefixFloor = FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+ case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
+ }
+
+ lazy val prefixCeil = CEIL ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
+ case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
+ }
+
+ lazy val prefixGet: PackratParser[Expression] =
+ GET ~ "(" ~ composite ~ "," ~ literalExpr ~ ")" ^^ {
+ case _ ~ _ ~ e ~ _ ~ index ~ _ =>
+ GetCompositeField(e, index.asInstanceOf[Literal].value)
+ }
+
+ lazy val prefixFlattening: PackratParser[Expression] =
+ FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) }
+
+ lazy val prefixed: PackratParser[Expression] =
+ prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
+ prefixStart | prefixEnd |
+ prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract |
+ prefixFloor | prefixCeil | prefixGet | prefixFlattening |
+ prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end
+
+ // suffix/prefix composite
+
+ lazy val composite: PackratParser[Expression] = suffixed | prefixed | atom |
+ failure("Composite expression expected.")
+
+ // unary ops
+
+ lazy val unaryNot: PackratParser[Expression] = "!" ~> composite ^^ { e => Not(e) }
+
+ lazy val unaryMinus: PackratParser[Expression] = "-" ~> composite ^^ { e => UnaryMinus(e) }
+
+ lazy val unary = composite | unaryNot | unaryMinus |
+ failure("Unary expression expected.")
+
+ // arithmetic
+
+ lazy val product = unary * (
+ "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
+ "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
+ "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } ) |
+ failure("Product expected.")
+
+ lazy val term = product * (
+ "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
+ "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } ) |
+ failure("Term expected.")
+
+ // Comparison
+
+ lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "==" | "=") ~ term ^^ {
+ case l ~ _ ~ r => EqualTo(l, r)
+ }
+
+ lazy val notEqualTo: PackratParser[Expression] = term ~ ("!==" | "!=" | "<>") ~ term ^^ {
+ case l ~ _ ~ r => NotEqualTo(l, r)
+ }
+
+ lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
+ case l ~ _ ~ r => GreaterThan(l, r)
+ }
+
+ lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ {
+ case l ~ _ ~ r => GreaterThanOrEqual(l, r)
+ }
+
+ lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
+ case l ~ _ ~ r => LessThan(l, r)
+ }
+
+ lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
+ case l ~ _ ~ r => LessThanOrEqual(l, r)
+ }
+
+ lazy val comparison: PackratParser[Expression] =
+ equalTo | notEqualTo |
+ greaterThan | greaterThanOrEqual |
+ lessThan | lessThanOrEqual | term |
+ failure("Comparison expected.")
+
+ // logic
+
+ lazy val logic = comparison * (
+ "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
+ "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } ) |
+ failure("Logic expected.")
+
+ // alias
+
+ lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
+ case e ~ _ ~ name => Alias(e, name.name)
+ } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+ case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.tail.map(_.name))
+ } | logic
+
+ lazy val expression: PackratParser[Expression] = alias |
+ failure("Invalid expression.")
+
+ lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
+
+ def parseExpressionList(expression: String): List[Expression] = {
+ parseAll(expressionList, expression) match {
+ case Success(lst, _) => lst
+
+ case NoSuccess(msg, next) =>
+ throwError(msg, next)
+ }
+ }
+
+ def parseExpression(exprString: String): Expression = {
+ parseAll(expression, exprString) match {
+ case Success(lst, _) => lst
+
+ case NoSuccess(msg, next) =>
+ throwError(msg, next)
+ }
+ }
+
+ private def throwError(msg: String, next: Input): Nothing = {
+ val improvedMsg = msg.replace("string matching regex `\\z'", "End of expression")
+
+ throw ExpressionParserException(
+ s"""Could not parse expression at column ${next.pos.column}: $improvedMsg
+ |${next.pos.longString}""".stripMargin)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
new file mode 100644
index 0000000..4b5781f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short => JShort}
+import java.math.{BigDecimal => JBigDecimal}
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexBuilder, RexNode}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+
+object ExpressionUtils {
+
+ private[flink] def toMonthInterval(expr: Expression, multiplier: Int): Expression = expr match {
+ case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
+ Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MONTHS)
+ case _ =>
+ Cast(Mul(expr, Literal(multiplier)), TimeIntervalTypeInfo.INTERVAL_MONTHS)
+ }
+
+ private[flink] def toMilliInterval(expr: Expression, multiplier: Long): Expression = expr match {
+ case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
+ Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MILLIS)
+ case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) =>
+ Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MILLIS)
+ case _ =>
+ Cast(Mul(expr, Literal(multiplier)), TimeIntervalTypeInfo.INTERVAL_MILLIS)
+ }
+
+ private[flink] def toRowInterval(expr: Expression): Expression = expr match {
+ case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
+ Literal(value.toLong, RowIntervalTypeInfo.INTERVAL_ROWS)
+ case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) =>
+ Literal(value, RowIntervalTypeInfo.INTERVAL_ROWS)
+ case _ =>
+ throw new IllegalArgumentException("Invalid value for row interval literal.")
+ }
+
+ private[flink] def convertArray(array: Array[_]): Expression = {
+ def createArray(): Expression = {
+ ArrayConstructor(array.map(Literal(_)))
+ }
+
+ array match {
+ // primitives
+ case _: Array[Boolean] => createArray()
+ case _: Array[Byte] => createArray()
+ case _: Array[Short] => createArray()
+ case _: Array[Int] => createArray()
+ case _: Array[Long] => createArray()
+ case _: Array[Float] => createArray()
+ case _: Array[Double] => createArray()
+
+ // boxed types
+ case _: Array[JBoolean] => createArray()
+ case _: Array[JByte] => createArray()
+ case _: Array[JShort] => createArray()
+ case _: Array[JInteger] => createArray()
+ case _: Array[JLong] => createArray()
+ case _: Array[JFloat] => createArray()
+ case _: Array[JDouble] => createArray()
+
+ // others
+ case _: Array[String] => createArray()
+ case _: Array[JBigDecimal] => createArray()
+ case _: Array[Date] => createArray()
+ case _: Array[Time] => createArray()
+ case _: Array[Timestamp] => createArray()
+ case bda: Array[BigDecimal] => ArrayConstructor(bda.map { bd => Literal(bd.bigDecimal) })
+
+ case _ =>
+ // nested
+ if (array.length > 0 && array.head.isInstanceOf[Array[_]]) {
+ ArrayConstructor(array.map { na => convertArray(na.asInstanceOf[Array[_]]) })
+ } else {
+ throw ValidationException("Unsupported array type.")
+ }
+ }
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // RexNode conversion functions (see org.apache.calcite.sql2rel.StandardConvertletTable)
+ // ----------------------------------------------------------------------------------------------
+
+ /**
+ * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#getFactor()]].
+ */
+ private[flink] def getFactor(unit: TimeUnit): JBigDecimal = unit match {
+ case TimeUnit.DAY => java.math.BigDecimal.ONE
+ case TimeUnit.HOUR => TimeUnit.DAY.multiplier
+ case TimeUnit.MINUTE => TimeUnit.HOUR.multiplier
+ case TimeUnit.SECOND => TimeUnit.MINUTE.multiplier
+ case TimeUnit.YEAR => java.math.BigDecimal.ONE
+ case TimeUnit.MONTH => TimeUnit.YEAR.multiplier
+ case _ => throw new IllegalArgumentException("Invalid start unit.")
+ }
+
+ /**
+ * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#mod()]].
+ */
+ private[flink] def mod(
+ rexBuilder: RexBuilder,
+ resType: RelDataType,
+ res: RexNode,
+ value: JBigDecimal)
+ : RexNode = {
+ if (value == JBigDecimal.ONE) return res
+ rexBuilder.makeCall(SqlStdOperatorTable.MOD, res, rexBuilder.makeExactLiteral(value, resType))
+ }
+
+ /**
+ * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#divide()]].
+ */
+ private[flink] def divide(rexBuilder: RexBuilder, res: RexNode, value: JBigDecimal): RexNode = {
+ if (value == JBigDecimal.ONE) return res
+ if (value.compareTo(JBigDecimal.ONE) < 0 && value.signum == 1) {
+ try {
+ val reciprocal = JBigDecimal.ONE.divide(value, JBigDecimal.ROUND_UNNECESSARY)
+ return rexBuilder.makeCall(
+ SqlStdOperatorTable.MULTIPLY,
+ res,
+ rexBuilder.makeExactLiteral(reciprocal))
+ } catch {
+ case e: ArithmeticException => // ignore
+ }
+ }
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.DIVIDE_INTEGER,
+ res,
+ rexBuilder.makeExactLiteral(value))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
new file mode 100644
index 0000000..39e1fe2
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/InputTypeSpec.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import scala.collection.mutable
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.validate._
+
+/**
+ * Expressions that have specification on its inputs.
+ */
+trait InputTypeSpec extends Expression {
+
+ /**
+ * Input type specification for each child.
+ *
+ * For example, [[Power]] expecting both of the children be of Double Type should use:
+ * {{{
+ * def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil
+ * }}}
+ */
+ private[flink] def expectedTypes: Seq[TypeInformation[_]]
+
+ override private[flink] def validateInput(): ValidationResult = {
+ val typeMismatches = mutable.ArrayBuffer.empty[String]
+ children.zip(expectedTypes).zipWithIndex.foreach { case ((e, tpe), i) =>
+ if (e.resultType != tpe) {
+ typeMismatches += s"expecting $tpe on ${i}th input, get ${e.resultType}"
+ }
+ }
+ if (typeMismatches.isEmpty) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(
+ s"""|$this fails on input type checking: ${typeMismatches.mkString("[", ", ", "]")}.
+ |Operand should be casted to proper type
+ |""".stripMargin)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
new file mode 100644
index 0000000..b2fca88
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.tools.RelBuilder.AggCall
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TypeCheckUtils
+
+abstract sealed class Aggregation extends UnaryExpression {
+
+ override def toString = s"Aggregate($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+ throw new UnsupportedOperationException("Aggregate cannot be transformed to RexNode")
+
+ /**
+ * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
+ */
+ private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall
+}
+
+case class Sum(child: Expression) extends Aggregation {
+ override def toString = s"sum($child)"
+
+ override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, null, name, child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeCheckUtils.assertNumericExpr(child.resultType, "sum")
+}
+
+case class Min(child: Expression) extends Aggregation {
+ override def toString = s"min($child)"
+
+ override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, null, name, child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeCheckUtils.assertOrderableExpr(child.resultType, "min")
+}
+
+case class Max(child: Expression) extends Aggregation {
+ override def toString = s"max($child)"
+
+ override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, null, name, child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeCheckUtils.assertOrderableExpr(child.resultType, "max")
+}
+
+case class Count(child: Expression) extends Aggregation {
+ override def toString = s"count($child)"
+
+ override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, name, child.toRexNode)
+ }
+
+ override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
+}
+
+case class Avg(child: Expression) extends Aggregation {
+ override def toString = s"avg($child)"
+
+ override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
+ relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, null, name, child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput() =
+ TypeCheckUtils.assertNumericExpr(child.resultType, "avg")
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
new file mode 100644
index 0000000..ad1af63
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/arithmetic.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.IntervalSqlType
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.typeutils.TypeCheckUtils._
+import org.apache.flink.table.typeutils.TypeCoercion
+import org.apache.flink.table.validate._
+
+import scala.collection.JavaConversions._
+
+abstract class BinaryArithmetic extends BinaryExpression {
+ private[flink] def sqlOperator: SqlOperator
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(sqlOperator, children.map(_.toRexNode))
+ }
+
+ override private[flink] def resultType: TypeInformation[_] =
+ TypeCoercion.widerTypeOf(left.resultType, right.resultType) match {
+ case Some(t) => t
+ case None =>
+ throw new RuntimeException("This should never happen.")
+ }
+
+ // TODO: tighten this rule once we implemented type coercion rules during validation
+ override private[flink] def validateInput(): ValidationResult = {
+ if (!isNumeric(left.resultType) || !isNumeric(right.resultType)) {
+ ValidationFailure(s"$this requires both operands Numeric, get " +
+ s"$left : ${left.resultType} and $right : ${right.resultType}")
+ } else {
+ ValidationSuccess
+ }
+ }
+}
+
+case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
+ override def toString = s"($left + $right)"
+
+ private[flink] val sqlOperator = SqlStdOperatorTable.PLUS
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ if(isString(left.resultType)) {
+ val castedRight = Cast(right, BasicTypeInfo.STRING_TYPE_INFO)
+ relBuilder.call(SqlStdOperatorTable.CONCAT, left.toRexNode, castedRight.toRexNode)
+ } else if(isString(right.resultType)) {
+ val castedLeft = Cast(left, BasicTypeInfo.STRING_TYPE_INFO)
+ relBuilder.call(SqlStdOperatorTable.CONCAT, castedLeft.toRexNode, right.toRexNode)
+ } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
+ relBuilder.call(SqlStdOperatorTable.PLUS, left.toRexNode, right.toRexNode)
+ } else if (isTimeInterval(left.resultType) && isTemporal(right.resultType)) {
+ // Calcite has a bug that can't apply INTERVAL + DATETIME (INTERVAL at left)
+ // we manually switch them here
+ relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, right.toRexNode, left.toRexNode)
+ } else if (isTemporal(left.resultType) && isTemporal(right.resultType)) {
+ relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, left.toRexNode, right.toRexNode)
+ } else {
+ val castedLeft = Cast(left, resultType)
+ val castedRight = Cast(right, resultType)
+ relBuilder.call(SqlStdOperatorTable.PLUS, castedLeft.toRexNode, castedRight.toRexNode)
+ }
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (isString(left.resultType) || isString(right.resultType)) {
+ ValidationSuccess
+ } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
+ ValidationSuccess
+ } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) {
+ ValidationSuccess
+ } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
+ ValidationSuccess
+ } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(
+ s"$this requires Numeric, String, Intervals of same type, " +
+ s"or Interval and a time point input, " +
+ s"get $left : ${left.resultType} and $right : ${right.resultType}")
+ }
+ }
+}
+
+case class UnaryMinus(child: Expression) extends UnaryExpression {
+ override def toString = s"-($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(SqlStdOperatorTable.UNARY_MINUS, child.toRexNode)
+ }
+
+ override private[flink] def resultType = child.resultType
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (isNumeric(child.resultType)) {
+ ValidationSuccess
+ } else if (isTimeInterval(child.resultType)) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"$this requires Numeric, or Interval input, get ${child.resultType}")
+ }
+ }
+}
+
+case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
+ override def toString = s"($left - $right)"
+
+ private[flink] val sqlOperator = SqlStdOperatorTable.MINUS
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
+ ValidationSuccess
+ } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) {
+ ValidationSuccess
+ } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
+ ValidationSuccess
+ } else {
+ super.validateInput()
+ }
+ }
+}
+
+case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
+ override def toString = s"($left / $right)"
+
+ private[flink] val sqlOperator = SqlStdOperatorTable.DIVIDE
+}
+
+case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
+ override def toString = s"($left * $right)"
+
+ private[flink] val sqlOperator = SqlStdOperatorTable.MULTIPLY
+}
+
+case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
+ override def toString = s"($left % $right)"
+
+ private[flink] val sqlOperator = SqlStdOperatorTable.MOD
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
new file mode 100644
index 0000000..b087b61
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
+
+ override private[flink] def children: Seq[Expression] = elements
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val relDataType = relBuilder
+ .asInstanceOf[FlinkRelBuilder]
+ .getTypeFactory
+ .createTypeFromTypeInfo(resultType)
+ val values = elements.map(_.toRexNode).toList.asJava
+ relBuilder
+ .getRexBuilder
+ .makeCall(relDataType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, values)
+ }
+
+ override def toString = s"array(${elements.mkString(", ")})"
+
+ override private[flink] def resultType = ObjectArrayTypeInfo.getInfoFor(elements.head.resultType)
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (elements.isEmpty) {
+ return ValidationFailure("Empty arrays are not supported yet.")
+ }
+ val elementType = elements.head.resultType
+ if (!elements.forall(_.resultType == elementType)) {
+ ValidationFailure("Not all elements of the array have the same type.")
+ } else {
+ ValidationSuccess
+ }
+ }
+}
+
+case class ArrayElementAt(array: Expression, index: Expression) extends Expression {
+
+ override private[flink] def children: Seq[Expression] = Seq(array, index)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(SqlStdOperatorTable.ITEM, array.toRexNode, index.toRexNode)
+ }
+
+ override def toString = s"($array).at($index)"
+
+ override private[flink] def resultType = array.resultType match {
+ case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+ case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ array.resultType match {
+ case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] =>
+ if (index.resultType == INT_TYPE_INFO) {
+ // check for common user mistake
+ index match {
+ case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
+ ValidationFailure(
+ s"Array element access needs an index starting at 1 but was $value.")
+ case _ => ValidationSuccess
+ }
+ } else {
+ ValidationFailure(
+ s"Array element access needs an integer index but was '${index.resultType}'.")
+ }
+ case other@_ => ValidationFailure(s"Array expected but was '$other'.")
+ }
+ }
+}
+
+case class ArrayCardinality(array: Expression) extends Expression {
+
+ override private[flink] def children: Seq[Expression] = Seq(array)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(SqlStdOperatorTable.CARDINALITY, array.toRexNode)
+ }
+
+ override def toString = s"($array).cardinality()"
+
+ override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ array.resultType match {
+ case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => ValidationSuccess
+ case other@_ => ValidationFailure(s"Array expected but was '$other'.")
+ }
+ }
+}
+
+case class ArrayElement(array: Expression) extends Expression {
+
+ override private[flink] def children: Seq[Expression] = Seq(array)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeCall(SqlStdOperatorTable.ELEMENT, array.toRexNode)
+ }
+
+ override def toString = s"($array).element()"
+
+ override private[flink] def resultType = array.resultType match {
+ case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+ case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ array.resultType match {
+ case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => ValidationSuccess
+ case other@_ => ValidationFailure(s"Array expected but was '$other'.")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
new file mode 100644
index 0000000..ef2cf4e
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
+import org.apache.flink.table.plan.logical.{LogicalNode, LogicalTableFunctionCall}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+import org.apache.flink.table.api.{UnresolvedException, ValidationException}
+
+/**
+ * General expression for unresolved function calls. The function can be a built-in
+ * scalar function or a user-defined scalar function.
+ */
+case class Call(functionName: String, args: Seq[Expression]) extends Expression {
+
+ override private[flink] def children: Seq[Expression] = args
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ throw UnresolvedException(s"trying to convert UnresolvedFunction $functionName to RexNode")
+ }
+
+ override def toString = s"\\$functionName(${args.mkString(", ")})"
+
+ override private[flink] def resultType =
+ throw UnresolvedException(s"calling resultType on UnresolvedFunction $functionName")
+
+ override private[flink] def validateInput(): ValidationResult =
+ ValidationFailure(s"Unresolved function call: $functionName")
+}
+
+/**
+ * Expression for calling a user-defined scalar functions.
+ *
+ * @param scalarFunction scalar function to be called (might be overloaded)
+ * @param parameters actual parameters that determine target evaluation method
+ */
+case class ScalarFunctionCall(
+ scalarFunction: ScalarFunction,
+ parameters: Seq[Expression])
+ extends Expression {
+
+ private var foundSignature: Option[Array[Class[_]]] = None
+
+ override private[flink] def children: Seq[Expression] = parameters
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ relBuilder.call(
+ createScalarSqlFunction(
+ scalarFunction.getClass.getCanonicalName,
+ scalarFunction,
+ typeFactory),
+ parameters.map(_.toRexNode): _*)
+ }
+
+ override def toString =
+ s"${scalarFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
+
+ override private[flink] def resultType = getResultType(scalarFunction, foundSignature.get)
+
+ override private[flink] def validateInput(): ValidationResult = {
+ val signature = children.map(_.resultType)
+ // look for a signature that matches the input types
+ foundSignature = getSignature(scalarFunction, signature)
+ if (foundSignature.isEmpty) {
+ ValidationFailure(s"Given parameters do not match any signature. \n" +
+ s"Actual: ${signatureToString(signature)} \n" +
+ s"Expected: ${signaturesToString(scalarFunction)}")
+ } else {
+ ValidationSuccess
+ }
+ }
+}
+
+/**
+ *
+ * Expression for calling a user-defined table function with actual parameters.
+ *
+ * @param functionName function name
+ * @param tableFunction user-defined table function
+ * @param parameters actual parameters of function
+ * @param resultType type information of returned table
+ */
+case class TableFunctionCall(
+ functionName: String,
+ tableFunction: TableFunction[_],
+ parameters: Seq[Expression],
+ resultType: TypeInformation[_])
+ extends Expression {
+
+ private var aliases: Option[Seq[String]] = None
+
+ override private[flink] def children: Seq[Expression] = parameters
+
+ /**
+ * Assigns an alias for this table function's returned fields that the following operator
+ * can refer to.
+ *
+ * @param aliasList alias for this table function's returned fields
+ * @return this table function call
+ */
+ private[flink] def as(aliasList: Option[Seq[String]]): TableFunctionCall = {
+ this.aliases = aliasList
+ this
+ }
+
+ /**
+ * Converts an API class to a logical node for planning.
+ */
+ private[flink] def toLogicalTableFunctionCall(child: LogicalNode): LogicalTableFunctionCall = {
+ val originNames = getFieldInfo(resultType)._1
+
+ // determine the final field names
+ val fieldNames = if (aliases.isDefined) {
+ val aliasList = aliases.get
+ if (aliasList.length != originNames.length) {
+ throw ValidationException(
+ s"List of column aliases must have same degree as table; " +
+ s"the returned table of function '$functionName' has ${originNames.length} " +
+ s"columns (${originNames.mkString(",")}), " +
+ s"whereas alias list has ${aliasList.length} columns")
+ } else {
+ aliasList.toArray
+ }
+ } else {
+ originNames
+ }
+
+ LogicalTableFunctionCall(
+ functionName,
+ tableFunction,
+ parameters,
+ resultType,
+ fieldNames,
+ child)
+ }
+
+ override def toString =
+ s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
new file mode 100644
index 0000000..312bf12
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cast.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.typeutils.TypeCoercion
+import org.apache.flink.table.validate._
+
+case class Cast(child: Expression, resultType: TypeInformation[_]) extends UnaryExpression {
+
+ override def toString = s"$child.cast($resultType)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ relBuilder
+ .getRexBuilder
+ // we use abstract cast here because RelBuilder.cast() has to many side effects
+ .makeAbstractCast(
+ typeFactory.createTypeFromTypeInfo(resultType),
+ child.toRexNode)
+ }
+
+ override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+ val child: Expression = anyRefs.head.asInstanceOf[Expression]
+ copy(child, resultType).asInstanceOf[this.type]
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (TypeCoercion.canCast(child.resultType, resultType)) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"Unsupported cast from ${child.resultType} to $resultType")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
new file mode 100644
index 0000000..0c7e57c
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isComparable, isNumeric}
+import org.apache.flink.table.validate._
+
+import scala.collection.JavaConversions._
+
+abstract class BinaryComparison extends BinaryExpression {
+ private[flink] def sqlOperator: SqlOperator
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(sqlOperator, children.map(_.toRexNode))
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult =
+ (left.resultType, right.resultType) match {
+ case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+ case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess
+ case (lType, rType) =>
+ ValidationFailure(
+ s"Comparison is only supported for numeric types and " +
+ s"comparable types of same type, got $lType and $rType")
+ }
+}
+
+case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
+ override def toString = s"$left === $right"
+
+ private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.EQUALS
+
+ override private[flink] def validateInput(): ValidationResult =
+ (left.resultType, right.resultType) match {
+ case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+ case (lType, rType) if lType == rType => ValidationSuccess
+ case (lType, rType) =>
+ ValidationFailure(s"Equality predicate on incompatible types: $lType and $rType")
+ }
+}
+
+case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
+ override def toString = s"$left !== $right"
+
+ private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.NOT_EQUALS
+
+ override private[flink] def validateInput(): ValidationResult =
+ (left.resultType, right.resultType) match {
+ case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
+ case (lType, rType) if lType == rType => ValidationSuccess
+ case (lType, rType) =>
+ ValidationFailure(s"Inequality predicate on incompatible types: $lType and $rType")
+ }
+}
+
+case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
+ override def toString = s"$left > $right"
+
+ private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.GREATER_THAN
+}
+
+case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+ override def toString = s"$left >= $right"
+
+ private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.GREATER_THAN_OR_EQUAL
+}
+
+case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
+ override def toString = s"$left < $right"
+
+ private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.LESS_THAN
+}
+
+case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
+ override def toString = s"$left <= $right"
+
+ private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.LESS_THAN_OR_EQUAL
+}
+
+case class IsNull(child: Expression) extends UnaryExpression {
+ override def toString = s"($child).isNull"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.isNull(child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsNotNull(child: Expression) extends UnaryExpression {
+ override def toString = s"($child).isNotNull"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.isNotNull(child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsTrue(child: Expression) extends UnaryExpression {
+ override def toString = s"($child).isTrue"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(SqlStdOperatorTable.IS_TRUE, child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsFalse(child: Expression) extends UnaryExpression {
+ override def toString = s"($child).isFalse"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(SqlStdOperatorTable.IS_FALSE, child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsNotTrue(child: Expression) extends UnaryExpression {
+ override def toString = s"($child).isNotTrue"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(SqlStdOperatorTable.IS_NOT_TRUE, child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
+
+case class IsNotFalse(child: Expression) extends UnaryExpression {
+ override def toString = s"($child).isNotFalse"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.call(SqlStdOperatorTable.IS_NOT_FALSE, child.toRexNode)
+ }
+
+ override private[flink] def resultType = BOOLEAN_TYPE_INFO
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/composite.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/composite.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/composite.scala
new file mode 100644
index 0000000..2f3fdb1
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/composite.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.table.api.UnresolvedException
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+/**
+ * Flattening of composite types. All flattenings are resolved into
+ * `GetCompositeField` expressions.
+ */
+case class Flattening(child: Expression) extends UnaryExpression {
+
+ override def toString = s"$child.flatten()"
+
+ override private[flink] def resultType: TypeInformation[_] =
+ throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
+
+ override private[flink] def validateInput(): ValidationResult =
+ ValidationFailure(s"Unresolved flattening of $child")
+}
+
+case class GetCompositeField(child: Expression, key: Any) extends UnaryExpression {
+
+ private var fieldIndex: Option[Int] = None
+
+ override def toString = s"$child.get($key)"
+
+ override private[flink] def validateInput(): ValidationResult = {
+ // check for composite type
+ if (!child.resultType.isInstanceOf[CompositeType[_]]) {
+ return ValidationFailure(s"Cannot access field of non-composite type '${child.resultType}'.")
+ }
+ val compositeType = child.resultType.asInstanceOf[CompositeType[_]]
+
+ // check key
+ key match {
+ case name: String =>
+ val index = compositeType.getFieldIndex(name)
+ if (index < 0) {
+ ValidationFailure(s"Field name '$name' could not be found.")
+ } else {
+ fieldIndex = Some(index)
+ ValidationSuccess
+ }
+ case index: Int =>
+ if (index >= compositeType.getArity) {
+ ValidationFailure(s"Field index '$index' exceeds arity.")
+ } else {
+ fieldIndex = Some(index)
+ ValidationSuccess
+ }
+ case _ =>
+ ValidationFailure(s"Invalid key '$key'.")
+ }
+ }
+
+ override private[flink] def resultType: TypeInformation[_] =
+ child.resultType.asInstanceOf[CompositeType[_]].getTypeAt(fieldIndex.get)
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder
+ .getRexBuilder
+ .makeFieldAccess(child.toRexNode, fieldIndex.get)
+ }
+
+ override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+ val child: Expression = anyRefs.head.asInstanceOf[Expression]
+ copy(child, key).asInstanceOf[this.type]
+ }
+
+ /**
+ * Gives a meaningful alias if possible (e.g. a$mypojo$field).
+ */
+ private[flink] def aliasName(): Option[String] = child match {
+ case gcf: GetCompositeField =>
+ val alias = gcf.aliasName()
+ if (alias.isDefined) {
+ Some(s"${alias.get}$$$key")
+ } else {
+ None
+ }
+ case c: ResolvedFieldReference => Some(s"${c.name}$$$key")
+ case _ => None
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
new file mode 100644
index 0000000..299a850
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.table.api.{UnresolvedException, ValidationException}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
+
+trait NamedExpression extends Expression {
+ private[flink] def name: String
+ private[flink] def toAttribute: Attribute
+}
+
+abstract class Attribute extends LeafExpression with NamedExpression {
+ override private[flink] def toAttribute: Attribute = this
+
+ private[flink] def withName(newName: String): Attribute
+}
+
+case class UnresolvedFieldReference(name: String) extends Attribute {
+
+ override def toString = "\"" + name
+
+ override private[flink] def withName(newName: String): Attribute =
+ UnresolvedFieldReference(newName)
+
+ override private[flink] def resultType: TypeInformation[_] =
+ throw UnresolvedException(s"Calling resultType on ${this.getClass}.")
+
+ override private[flink] def validateInput(): ValidationResult =
+ ValidationFailure(s"Unresolved reference $name.")
+}
+
+case class ResolvedFieldReference(
+ name: String,
+ resultType: TypeInformation[_]) extends Attribute {
+
+ override def toString = s"'$name"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.field(name)
+ }
+
+ override private[flink] def withName(newName: String): Attribute = {
+ if (newName == name) {
+ this
+ } else {
+ ResolvedFieldReference(newName, resultType)
+ }
+ }
+}
+
+case class Alias(child: Expression, name: String, extraNames: Seq[String] = Seq())
+ extends UnaryExpression with NamedExpression {
+
+ override def toString = s"$child as '$name"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.alias(child.toRexNode, name)
+ }
+
+ override private[flink] def resultType: TypeInformation[_] = child.resultType
+
+ override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
+ val child: Expression = anyRefs.head.asInstanceOf[Expression]
+ copy(child, name, extraNames).asInstanceOf[this.type]
+ }
+
+ override private[flink] def toAttribute: Attribute = {
+ if (valid) {
+ ResolvedFieldReference(name, child.resultType)
+ } else {
+ UnresolvedFieldReference(name)
+ }
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (name == "*") {
+ ValidationFailure("Alias can not accept '*' as name.")
+ } else if (extraNames.nonEmpty) {
+ ValidationFailure("Invalid call to Alias with multiple names.")
+ } else {
+ ValidationSuccess
+ }
+ }
+}
+
+case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression {
+
+ override private[flink] def name: String =
+ throw UnresolvedException("Invalid call to name on UnresolvedAlias")
+
+ override private[flink] def toAttribute: Attribute =
+ throw UnresolvedException("Invalid call to toAttribute on UnresolvedAlias")
+
+ override private[flink] def resultType: TypeInformation[_] =
+ throw UnresolvedException("Invalid call to resultType on UnresolvedAlias")
+
+ override private[flink] lazy val valid = false
+}
+
+case class RowtimeAttribute() extends Attribute {
+ override private[flink] def withName(newName: String): Attribute = {
+ if (newName == "rowtime") {
+ this
+ } else {
+ throw new ValidationException("Cannot rename streaming rowtime attribute.")
+ }
+ }
+
+ override private[flink] def name: String = "rowtime"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ throw new UnsupportedOperationException("A rowtime attribute can not be used solely.")
+ }
+
+ override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.LONG_TYPE_INFO
+}
+
+case class WindowReference(name: String) extends Attribute {
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
+ throw new UnsupportedOperationException("A window reference can not be used solely.")
+
+ override private[flink] def resultType: TypeInformation[_] =
+ throw new UnsupportedOperationException("A window reference has no result type.")
+
+ override private[flink] def withName(newName: String): Attribute = {
+ if (newName == name) {
+ this
+ } else {
+ throw new ValidationException("Cannot rename window reference.")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
new file mode 100644
index 0000000..ccdfc2d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import java.sql.{Date, Time, Timestamp}
+import java.util.{Calendar, TimeZone}
+
+import org.apache.calcite.avatica.util.TimeUnit
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlIntervalQualifier
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.calcite.sql.parser.SqlParserPos
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
+
+object Literal {
+ private[flink] def apply(l: Any): Literal = l match {
+ case i: Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
+ case s: Short => Literal(s, BasicTypeInfo.SHORT_TYPE_INFO)
+ case b: Byte => Literal(b, BasicTypeInfo.BYTE_TYPE_INFO)
+ case l: Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
+ case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
+ case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
+ case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
+ case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
+ case javaDec: java.math.BigDecimal => Literal(javaDec, BasicTypeInfo.BIG_DEC_TYPE_INFO)
+ case scalaDec: scala.math.BigDecimal =>
+ Literal(scalaDec.bigDecimal, BasicTypeInfo.BIG_DEC_TYPE_INFO)
+ case sqlDate: Date => Literal(sqlDate, SqlTimeTypeInfo.DATE)
+ case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
+ case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
+ }
+}
+
+case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpression {
+ override def toString = resultType match {
+ case _: BasicTypeInfo[_] => value.toString
+ case _@SqlTimeTypeInfo.DATE => value.toString + ".toDate"
+ case _@SqlTimeTypeInfo.TIME => value.toString + ".toTime"
+ case _@SqlTimeTypeInfo.TIMESTAMP => value.toString + ".toTimestamp"
+ case _@TimeIntervalTypeInfo.INTERVAL_MILLIS => value.toString + ".millis"
+ case _@TimeIntervalTypeInfo.INTERVAL_MONTHS => value.toString + ".months"
+ case _@RowIntervalTypeInfo.INTERVAL_ROWS => value.toString + ".rows"
+ case _ => s"Literal($value, $resultType)"
+ }
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ resultType match {
+ case BasicTypeInfo.BIG_DEC_TYPE_INFO =>
+ val bigDecValue = value.asInstanceOf[java.math.BigDecimal]
+ val decType = relBuilder.getTypeFactory.createSqlType(SqlTypeName.DECIMAL)
+ relBuilder.getRexBuilder.makeExactLiteral(bigDecValue, decType)
+
+ // date/time
+ case SqlTimeTypeInfo.DATE =>
+ relBuilder.getRexBuilder.makeDateLiteral(dateToCalendar)
+ case SqlTimeTypeInfo.TIME =>
+ relBuilder.getRexBuilder.makeTimeLiteral(dateToCalendar, 0)
+ case SqlTimeTypeInfo.TIMESTAMP =>
+ relBuilder.getRexBuilder.makeTimestampLiteral(dateToCalendar, 3)
+
+ case TimeIntervalTypeInfo.INTERVAL_MONTHS =>
+ val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Int])
+ val intervalQualifier = new SqlIntervalQualifier(
+ TimeUnit.YEAR,
+ TimeUnit.MONTH,
+ SqlParserPos.ZERO)
+ relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
+
+ case TimeIntervalTypeInfo.INTERVAL_MILLIS =>
+ val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Long])
+ val intervalQualifier = new SqlIntervalQualifier(
+ TimeUnit.DAY,
+ TimeUnit.SECOND,
+ SqlParserPos.ZERO)
+ relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
+
+ case _ => relBuilder.literal(value)
+ }
+ }
+
+ private def dateToCalendar: Calendar = {
+ val date = value.asInstanceOf[java.util.Date]
+ val cal = Calendar.getInstance()
+ val t = date.getTime
+ // according to Calcite's SqlFunctions.internalToXXX methods
+ cal.setTimeInMillis(t + TimeZone.getDefault.getOffset(t))
+ cal
+ }
+}
+
+case class Null(resultType: TypeInformation[_]) extends LeafExpression {
+ override def toString = s"null"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val rexBuilder = relBuilder.getRexBuilder
+ val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
+ rexBuilder
+ .makeCast(
+ typeFactory.createTypeFromTypeInfo(resultType),
+ rexBuilder.constantNull())
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/logic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/logic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/logic.scala
new file mode 100644
index 0000000..dfe00cc
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/logic.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.validate._
+
+abstract class BinaryPredicate extends BinaryExpression {
+ override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (left.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
+ right.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"$this only accepts children of Boolean type, " +
+ s"get $left : ${left.resultType} and $right : ${right.resultType}")
+ }
+ }
+}
+
+case class Not(child: Expression) extends UnaryExpression {
+
+ override def toString = s"!($child)"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.not(child.toRexNode)
+ }
+
+ override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (child.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(s"Not operator requires a boolean expression as input, " +
+ s"but $child is of type ${child.resultType}")
+ }
+ }
+}
+
+case class And(left: Expression, right: Expression) extends BinaryPredicate {
+
+ override def toString = s"$left && $right"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.and(left.toRexNode, right.toRexNode)
+ }
+}
+
+case class Or(left: Expression, right: Expression) extends BinaryPredicate {
+
+ override def toString = s"$left || $right"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ relBuilder.or(left.toRexNode, right.toRexNode)
+ }
+}
+
+case class If(
+ condition: Expression,
+ ifTrue: Expression,
+ ifFalse: Expression)
+ extends Expression {
+ private[flink] def children = Seq(condition, ifTrue, ifFalse)
+
+ override private[flink] def resultType = ifTrue.resultType
+
+ override def toString = s"($condition)? $ifTrue : $ifFalse"
+
+ override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
+ val c = condition.toRexNode
+ val t = ifTrue.toRexNode
+ val f = ifFalse.toRexNode
+ relBuilder.call(SqlStdOperatorTable.CASE, c, t, f)
+ }
+
+ override private[flink] def validateInput(): ValidationResult = {
+ if (condition.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
+ ifTrue.resultType == ifFalse.resultType) {
+ ValidationSuccess
+ } else {
+ ValidationFailure(
+ s"If should have boolean condition and same type of ifTrue and ifFalse, get " +
+ s"(${condition.resultType}, ${ifTrue.resultType}, ${ifFalse.resultType})")
+ }
+ }
+}