You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:06 UTC
[37/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
deleted file mode 100644
index c960a79..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ /dev/null
@@ -1,489 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.avatica.util.DateTimeUtils.{MILLIS_PER_DAY, MILLIS_PER_HOUR, MILLIS_PER_MINUTE, MILLIS_PER_SECOND}
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.table.ExpressionParserException
-import org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval}
-import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
-import org.apache.flink.api.table.expressions.TimePointUnit.TimePointUnit
-import org.apache.flink.api.table.expressions.TrimMode.TrimMode
-import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
-
-import scala.language.implicitConversions
-import scala.util.parsing.combinator.{JavaTokenParsers, PackratParsers}
-
-/**
- * Parser for expressions inside a String. This parses exactly the same expressions that
- * would be accepted by the Scala Expression DSL.
- *
- * See [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] for the constructs
- * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL
- * lazy valined in the above files.
- */
-object ExpressionParser extends JavaTokenParsers with PackratParsers {
- case class Keyword(key: String)
-
- // Convert the keyword into an case insensitive Parser
- implicit def keyword2Parser(kw: Keyword): Parser[String] = {
- ("""(?i)\Q""" + kw.key + """\E""").r
- }
-
- // Keyword
-
- lazy val ARRAY: Keyword = Keyword("Array")
- lazy val AS: Keyword = Keyword("as")
- lazy val COUNT: Keyword = Keyword("count")
- lazy val AVG: Keyword = Keyword("avg")
- lazy val MIN: Keyword = Keyword("min")
- lazy val MAX: Keyword = Keyword("max")
- lazy val SUM: Keyword = Keyword("sum")
- lazy val START: Keyword = Keyword("start")
- lazy val END: Keyword = Keyword("end")
- lazy val CAST: Keyword = Keyword("cast")
- lazy val NULL: Keyword = Keyword("Null")
- lazy val IF: Keyword = Keyword("?")
- lazy val ASC: Keyword = Keyword("asc")
- lazy val DESC: Keyword = Keyword("desc")
- lazy val TO_DATE: Keyword = Keyword("toDate")
- lazy val TO_TIME: Keyword = Keyword("toTime")
- lazy val TO_TIMESTAMP: Keyword = Keyword("toTimestamp")
- lazy val TRIM: Keyword = Keyword("trim")
- lazy val EXTRACT: Keyword = Keyword("extract")
- lazy val FLOOR: Keyword = Keyword("floor")
- lazy val CEIL: Keyword = Keyword("ceil")
- lazy val YEARS: Keyword = Keyword("years")
- lazy val YEAR: Keyword = Keyword("year")
- lazy val MONTHS: Keyword = Keyword("months")
- lazy val MONTH: Keyword = Keyword("month")
- lazy val DAYS: Keyword = Keyword("days")
- lazy val DAY: Keyword = Keyword("day")
- lazy val HOURS: Keyword = Keyword("hours")
- lazy val HOUR: Keyword = Keyword("hour")
- lazy val MINUTES: Keyword = Keyword("minutes")
- lazy val MINUTE: Keyword = Keyword("minute")
- lazy val SECONDS: Keyword = Keyword("seconds")
- lazy val SECOND: Keyword = Keyword("second")
- lazy val MILLIS: Keyword = Keyword("millis")
- lazy val MILLI: Keyword = Keyword("milli")
- lazy val ROWS: Keyword = Keyword("rows")
- lazy val STAR: Keyword = Keyword("*")
- lazy val GET: Keyword = Keyword("get")
- lazy val FLATTEN: Keyword = Keyword("flatten")
-
- def functionIdent: ExpressionParser.Parser[String] =
- not(ARRAY) ~ not(AS) ~ not(COUNT) ~ not(AVG) ~ not(MIN) ~ not(MAX) ~
- not(SUM) ~ not(START) ~ not(END)~ not(CAST) ~ not(NULL) ~
- not(IF) ~> super.ident
-
- // symbols
-
- lazy val timeIntervalUnit: PackratParser[Expression] = TimeIntervalUnit.values map {
- case unit: TimeIntervalUnit => literal(unit.toString) ^^^ unit.toExpr
- } reduceLeft(_ | _)
-
- lazy val timePointUnit: PackratParser[Expression] = TimePointUnit.values map {
- case unit: TimePointUnit => literal(unit.toString) ^^^ unit.toExpr
- } reduceLeft(_ | _)
-
- lazy val trimMode: PackratParser[Expression] = TrimMode.values map {
- case mode: TrimMode => literal(mode.toString) ^^^ mode.toExpr
- } reduceLeft(_ | _)
-
- // data types
-
- lazy val dataType: PackratParser[TypeInformation[_]] =
- "BYTE" ^^ { ti => BasicTypeInfo.BYTE_TYPE_INFO } |
- "SHORT" ^^ { ti => BasicTypeInfo.SHORT_TYPE_INFO } |
- "INTERVAL_MONTHS" ^^ {
- ti => TimeIntervalTypeInfo.INTERVAL_MONTHS.asInstanceOf[TypeInformation[_]]
- } |
- "INTERVAL_MILLIS" ^^ {
- ti => TimeIntervalTypeInfo.INTERVAL_MILLIS.asInstanceOf[TypeInformation[_]]
- } |
- "INT" ^^ { ti => BasicTypeInfo.INT_TYPE_INFO } |
- "LONG" ^^ { ti => BasicTypeInfo.LONG_TYPE_INFO } |
- "FLOAT" ^^ { ti => BasicTypeInfo.FLOAT_TYPE_INFO } |
- "DOUBLE" ^^ { ti => BasicTypeInfo.DOUBLE_TYPE_INFO } |
- ("BOOLEAN" | "BOOL") ^^ { ti => BasicTypeInfo.BOOLEAN_TYPE_INFO } |
- "STRING" ^^ { ti => BasicTypeInfo.STRING_TYPE_INFO } |
- "DATE" ^^ { ti => SqlTimeTypeInfo.DATE.asInstanceOf[TypeInformation[_]] } |
- "TIMESTAMP" ^^ { ti => SqlTimeTypeInfo.TIMESTAMP } |
- "TIME" ^^ { ti => SqlTimeTypeInfo.TIME } |
- "DECIMAL" ^^ { ti => BasicTypeInfo.BIG_DEC_TYPE_INFO }
-
- // Literals
-
- // same as floatingPointNumber but we do not allow trailing dot "12.d" or "2."
- lazy val floatingPointNumberFlink: Parser[String] =
- """-?(\d+(\.\d+)?|\d*\.\d+)([eE][+-]?\d+)?[fFdD]?""".r
-
- lazy val numberLiteral: PackratParser[Expression] =
- (wholeNumber <~ ("l" | "L")) ^^ { n => Literal(n.toLong) } |
- (decimalNumber <~ ("p" | "P")) ^^ { n => Literal(BigDecimal(n)) } |
- (floatingPointNumberFlink | decimalNumber) ^^ {
- n =>
- if (n.matches("""-?\d+""")) {
- Literal(n.toInt)
- } else if (n.endsWith("f") || n.endsWith("F")) {
- Literal(n.toFloat)
- } else {
- Literal(n.toDouble)
- }
- }
-
- lazy val singleQuoteStringLiteral: Parser[Expression] =
- ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ {
- str => Literal(str.substring(1, str.length - 1))
- }
-
- lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ {
- str => Literal(str.substring(1, str.length - 1))
- }
-
- lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ {
- str => Literal(str.toBoolean)
- }
-
- lazy val nullLiteral: PackratParser[Expression] = NULL ~ "(" ~> dataType <~ ")" ^^ {
- dt => Null(dt)
- }
-
- lazy val literalExpr: PackratParser[Expression] =
- numberLiteral | stringLiteralFlink | singleQuoteStringLiteral | boolLiteral | nullLiteral
-
- lazy val fieldReference: PackratParser[NamedExpression] = (STAR | ident) ^^ {
- sym => UnresolvedFieldReference(sym)
- }
-
- lazy val atom: PackratParser[Expression] =
- ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
-
- // suffix operators
-
- lazy val suffixSum: PackratParser[Expression] =
- composite <~ "." ~ SUM ~ opt("()") ^^ { e => Sum(e) }
-
- lazy val suffixMin: PackratParser[Expression] =
- composite <~ "." ~ MIN ~ opt("()") ^^ { e => Min(e) }
-
- lazy val suffixMax: PackratParser[Expression] =
- composite <~ "." ~ MAX ~ opt("()") ^^ { e => Max(e) }
-
- lazy val suffixCount: PackratParser[Expression] =
- composite <~ "." ~ COUNT ~ opt("()") ^^ { e => Count(e) }
-
- lazy val suffixAvg: PackratParser[Expression] =
- composite <~ "." ~ AVG ~ opt("()") ^^ { e => Avg(e) }
-
- lazy val suffixStart: PackratParser[Expression] =
- composite <~ "." ~ START ~ opt("()") ^^ { e => WindowStart(e) }
-
- lazy val suffixEnd: PackratParser[Expression] =
- composite <~ "." ~ END ~ opt("()") ^^ { e => WindowEnd(e) }
-
- lazy val suffixCast: PackratParser[Expression] =
- composite ~ "." ~ CAST ~ "(" ~ dataType ~ ")" ^^ {
- case e ~ _ ~ _ ~ _ ~ dt ~ _ => Cast(e, dt)
- }
-
- lazy val suffixAs: PackratParser[Expression] =
- composite ~ "." ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
- case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
- }
-
- lazy val suffixTrim = composite ~ "." ~ TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ ")" ^^ {
- case operand ~ _ ~ _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ => Trim(mode, trimCharacter, operand)
- }
-
- lazy val suffixTrimWithoutArgs = composite <~ "." ~ TRIM ~ opt("()") ^^ {
- e => Trim(TrimMode.BOTH, TrimConstants.TRIM_DEFAULT_CHAR, e)
- }
-
- lazy val suffixIf: PackratParser[Expression] =
- composite ~ "." ~ IF ~ "(" ~ expression ~ "," ~ expression ~ ")" ^^ {
- case condition ~ _ ~ _ ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => If(condition, ifTrue, ifFalse)
- }
-
- lazy val suffixExtract = composite ~ "." ~ EXTRACT ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
- case operand ~ _ ~ _ ~ _ ~ unit ~ _ => Extract(unit, operand)
- }
-
- lazy val suffixFloor = composite ~ "." ~ FLOOR ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
- case operand ~ _ ~ _ ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
- }
-
- lazy val suffixCeil = composite ~ "." ~ CEIL ~ "(" ~ timeIntervalUnit ~ ")" ^^ {
- case operand ~ _ ~ _ ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
- }
-
- lazy val suffixFunctionCall =
- composite ~ "." ~ functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
- case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args)
- }
-
- lazy val suffixFunctionCallOneArg = composite ~ "." ~ functionIdent ^^ {
- case operand ~ _ ~ name => Call(name.toUpperCase, Seq(operand))
- }
-
- lazy val suffixAsc : PackratParser[Expression] =
- atom <~ "." ~ ASC ~ opt("()") ^^ { e => Asc(e) }
-
- lazy val suffixDesc : PackratParser[Expression] =
- atom <~ "." ~ DESC ~ opt("()") ^^ { e => Desc(e) }
-
- lazy val suffixToDate: PackratParser[Expression] =
- composite <~ "." ~ TO_DATE ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.DATE) }
-
- lazy val suffixToTimestamp: PackratParser[Expression] =
- composite <~ "." ~ TO_TIMESTAMP ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIMESTAMP) }
-
- lazy val suffixToTime: PackratParser[Expression] =
- composite <~ "." ~ TO_TIME ~ opt("()") ^^ { e => Cast(e, SqlTimeTypeInfo.TIME) }
-
- lazy val suffixTimeInterval : PackratParser[Expression] =
- composite ~ "." ~ (YEARS | MONTHS | DAYS | HOURS | MINUTES | SECONDS | MILLIS |
- YEAR | MONTH | DAY | HOUR | MINUTE | SECOND | MILLI) ^^ {
-
- case expr ~ _ ~ (YEARS.key | YEAR.key) => toMonthInterval(expr, 12)
-
- case expr ~ _ ~ (MONTHS.key | MONTH.key) => toMonthInterval(expr, 1)
-
- case expr ~ _ ~ (DAYS.key | DAY.key) => toMilliInterval(expr, MILLIS_PER_DAY)
-
- case expr ~ _ ~ (HOURS.key | HOUR.key) => toMilliInterval(expr, MILLIS_PER_HOUR)
-
- case expr ~ _ ~ (MINUTES.key | MINUTE.key) => toMilliInterval(expr, MILLIS_PER_MINUTE)
-
- case expr ~ _ ~ (SECONDS.key | SECOND.key) => toMilliInterval(expr, MILLIS_PER_SECOND)
-
- case expr ~ _ ~ (MILLIS.key | MILLI.key)=> toMilliInterval(expr, 1)
- }
-
- lazy val suffixRowInterval : PackratParser[Expression] =
- composite <~ "." ~ ROWS ^^ { e => ExpressionUtils.toRowInterval(e) }
-
- lazy val suffixGet: PackratParser[Expression] =
- composite ~ "." ~ GET ~ "(" ~ literalExpr ~ ")" ^^ {
- case e ~ _ ~ _ ~ _ ~ index ~ _ =>
- GetCompositeField(e, index.asInstanceOf[Literal].value)
- }
-
- lazy val suffixFlattening: PackratParser[Expression] =
- composite <~ "." ~ FLATTEN ~ opt("()") ^^ { e => Flattening(e) }
-
- lazy val suffixed: PackratParser[Expression] =
- suffixTimeInterval | suffixRowInterval | suffixSum | suffixMin | suffixMax | suffixStart |
- suffixEnd | suffixCount | suffixAvg | suffixCast | suffixAs | suffixTrim |
- suffixTrimWithoutArgs | suffixIf | suffixAsc | suffixDesc | suffixToDate |
- suffixToTimestamp | suffixToTime | suffixExtract | suffixFloor | suffixCeil |
- suffixGet | suffixFlattening |
- suffixFunctionCall | suffixFunctionCallOneArg // function call must always be at the end
-
- // prefix operators
-
- lazy val prefixArray: PackratParser[Expression] =
- ARRAY ~ "(" ~> repsep(expression, ",") <~ ")" ^^ { elements => ArrayConstructor(elements) }
-
- lazy val prefixSum: PackratParser[Expression] =
- SUM ~ "(" ~> expression <~ ")" ^^ { e => Sum(e) }
-
- lazy val prefixMin: PackratParser[Expression] =
- MIN ~ "(" ~> expression <~ ")" ^^ { e => Min(e) }
-
- lazy val prefixMax: PackratParser[Expression] =
- MAX ~ "(" ~> expression <~ ")" ^^ { e => Max(e) }
-
- lazy val prefixCount: PackratParser[Expression] =
- COUNT ~ "(" ~> expression <~ ")" ^^ { e => Count(e) }
-
- lazy val prefixAvg: PackratParser[Expression] =
- AVG ~ "(" ~> expression <~ ")" ^^ { e => Avg(e) }
-
- lazy val prefixStart: PackratParser[Expression] =
- START ~ "(" ~> expression <~ ")" ^^ { e => WindowStart(e) }
-
- lazy val prefixEnd: PackratParser[Expression] =
- END ~ "(" ~> expression <~ ")" ^^ { e => WindowEnd(e) }
-
- lazy val prefixCast: PackratParser[Expression] =
- CAST ~ "(" ~ expression ~ "," ~ dataType ~ ")" ^^ {
- case _ ~ _ ~ e ~ _ ~ dt ~ _ => Cast(e, dt)
- }
-
- lazy val prefixAs: PackratParser[Expression] =
- AS ~ "(" ~ expression ~ "," ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
- case _ ~ _ ~ e ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
- }
-
- lazy val prefixIf: PackratParser[Expression] =
- IF ~ "(" ~ expression ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
- case _ ~ _ ~ condition ~ _ ~ ifTrue ~ _ ~ ifFalse ~ _ => If(condition, ifTrue, ifFalse)
- }
-
- lazy val prefixFunctionCall = functionIdent ~ "(" ~ repsep(expression, ",") ~ ")" ^^ {
- case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args)
- }
-
- lazy val prefixFunctionCallOneArg = functionIdent ~ "(" ~ expression ~ ")" ^^ {
- case name ~ _ ~ arg ~ _ => Call(name.toUpperCase, Seq(arg))
- }
-
- lazy val prefixTrim = TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ "," ~ expression ~ ")" ^^ {
- case _ ~ _ ~ mode ~ _ ~ trimCharacter ~ _ ~ operand ~ _ => Trim(mode, trimCharacter, operand)
- }
-
- lazy val prefixTrimWithoutArgs = TRIM ~ "(" ~ expression ~ ")" ^^ {
- case _ ~ _ ~ operand ~ _ => Trim(TrimMode.BOTH, TrimConstants.TRIM_DEFAULT_CHAR, operand)
- }
-
- lazy val prefixExtract = EXTRACT ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
- case _ ~ _ ~ operand ~ _ ~ unit ~ _ => Extract(unit, operand)
- }
-
- lazy val prefixFloor = FLOOR ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
- case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalFloor(unit, operand)
- }
-
- lazy val prefixCeil = CEIL ~ "(" ~ expression ~ "," ~ timeIntervalUnit ~ ")" ^^ {
- case _ ~ _ ~ operand ~ _ ~ unit ~ _ => TemporalCeil(unit, operand)
- }
-
- lazy val prefixGet: PackratParser[Expression] =
- GET ~ "(" ~ composite ~ "," ~ literalExpr ~ ")" ^^ {
- case _ ~ _ ~ e ~ _ ~ index ~ _ =>
- GetCompositeField(e, index.asInstanceOf[Literal].value)
- }
-
- lazy val prefixFlattening: PackratParser[Expression] =
- FLATTEN ~ "(" ~> composite <~ ")" ^^ { e => Flattening(e) }
-
- lazy val prefixed: PackratParser[Expression] =
- prefixArray | prefixSum | prefixMin | prefixMax | prefixCount | prefixAvg |
- prefixStart | prefixEnd |
- prefixCast | prefixAs | prefixTrim | prefixTrimWithoutArgs | prefixIf | prefixExtract |
- prefixFloor | prefixCeil | prefixGet | prefixFlattening |
- prefixFunctionCall | prefixFunctionCallOneArg // function call must always be at the end
-
- // suffix/prefix composite
-
- lazy val composite: PackratParser[Expression] = suffixed | prefixed | atom |
- failure("Composite expression expected.")
-
- // unary ops
-
- lazy val unaryNot: PackratParser[Expression] = "!" ~> composite ^^ { e => Not(e) }
-
- lazy val unaryMinus: PackratParser[Expression] = "-" ~> composite ^^ { e => UnaryMinus(e) }
-
- lazy val unary = composite | unaryNot | unaryMinus |
- failure("Unary expression expected.")
-
- // arithmetic
-
- lazy val product = unary * (
- "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
- "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
- "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } ) |
- failure("Product expected.")
-
- lazy val term = product * (
- "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
- "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } ) |
- failure("Term expected.")
-
- // Comparison
-
- lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "==" | "=") ~ term ^^ {
- case l ~ _ ~ r => EqualTo(l, r)
- }
-
- lazy val notEqualTo: PackratParser[Expression] = term ~ ("!==" | "!=" | "<>") ~ term ^^ {
- case l ~ _ ~ r => NotEqualTo(l, r)
- }
-
- lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
- case l ~ _ ~ r => GreaterThan(l, r)
- }
-
- lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ {
- case l ~ _ ~ r => GreaterThanOrEqual(l, r)
- }
-
- lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
- case l ~ _ ~ r => LessThan(l, r)
- }
-
- lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
- case l ~ _ ~ r => LessThanOrEqual(l, r)
- }
-
- lazy val comparison: PackratParser[Expression] =
- equalTo | notEqualTo |
- greaterThan | greaterThanOrEqual |
- lessThan | lessThanOrEqual | term |
- failure("Comparison expected.")
-
- // logic
-
- lazy val logic = comparison * (
- "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
- "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } ) |
- failure("Logic expected.")
-
- // alias
-
- lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
- case e ~ _ ~ name => Alias(e, name.name)
- } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
- case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.tail.map(_.name))
- } | logic
-
- lazy val expression: PackratParser[Expression] = alias |
- failure("Invalid expression.")
-
- lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
-
- def parseExpressionList(expression: String): List[Expression] = {
- parseAll(expressionList, expression) match {
- case Success(lst, _) => lst
-
- case NoSuccess(msg, next) =>
- throwError(msg, next)
- }
- }
-
- def parseExpression(exprString: String): Expression = {
- parseAll(expression, exprString) match {
- case Success(lst, _) => lst
-
- case NoSuccess(msg, next) =>
- throwError(msg, next)
- }
- }
-
- private def throwError(msg: String, next: Input): Nothing = {
- val improvedMsg = msg.replace("string matching regex `\\z'", "End of expression")
-
- throw ExpressionParserException(
- s"""Could not parse expression at column ${next.pos.column}: $improvedMsg
- |${next.pos.longString}""".stripMargin)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
deleted file mode 100644
index 8657534..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionUtils.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import java.lang.{Boolean => JBoolean, Byte => JByte, Short => JShort, Integer => JInteger, Long => JLong, Float => JFloat, Double => JDouble}
-import java.math.{BigDecimal => JBigDecimal}
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.calcite.avatica.util.TimeUnit
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexBuilder, RexNode}
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.ValidationException
-import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-
-object ExpressionUtils {
-
- private[flink] def toMonthInterval(expr: Expression, multiplier: Int): Expression = expr match {
- case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
- Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MONTHS)
- case _ =>
- Cast(Mul(expr, Literal(multiplier)), TimeIntervalTypeInfo.INTERVAL_MONTHS)
- }
-
- private[flink] def toMilliInterval(expr: Expression, multiplier: Long): Expression = expr match {
- case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
- Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MILLIS)
- case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) =>
- Literal(value * multiplier, TimeIntervalTypeInfo.INTERVAL_MILLIS)
- case _ =>
- Cast(Mul(expr, Literal(multiplier)), TimeIntervalTypeInfo.INTERVAL_MILLIS)
- }
-
- private[flink] def toRowInterval(expr: Expression): Expression = expr match {
- case Literal(value: Int, BasicTypeInfo.INT_TYPE_INFO) =>
- Literal(value.toLong, RowIntervalTypeInfo.INTERVAL_ROWS)
- case Literal(value: Long, BasicTypeInfo.LONG_TYPE_INFO) =>
- Literal(value, RowIntervalTypeInfo.INTERVAL_ROWS)
- case _ =>
- throw new IllegalArgumentException("Invalid value for row interval literal.")
- }
-
- private[flink] def convertArray(array: Array[_]): Expression = {
- def createArray(): Expression = {
- ArrayConstructor(array.map(Literal(_)))
- }
-
- array match {
- // primitives
- case _: Array[Boolean] => createArray()
- case _: Array[Byte] => createArray()
- case _: Array[Short] => createArray()
- case _: Array[Int] => createArray()
- case _: Array[Long] => createArray()
- case _: Array[Float] => createArray()
- case _: Array[Double] => createArray()
-
- // boxed types
- case _: Array[JBoolean] => createArray()
- case _: Array[JByte] => createArray()
- case _: Array[JShort] => createArray()
- case _: Array[JInteger] => createArray()
- case _: Array[JLong] => createArray()
- case _: Array[JFloat] => createArray()
- case _: Array[JDouble] => createArray()
-
- // others
- case _: Array[String] => createArray()
- case _: Array[JBigDecimal] => createArray()
- case _: Array[Date] => createArray()
- case _: Array[Time] => createArray()
- case _: Array[Timestamp] => createArray()
- case bda: Array[BigDecimal] => ArrayConstructor(bda.map { bd => Literal(bd.bigDecimal) })
-
- case _ =>
- // nested
- if (array.length > 0 && array.head.isInstanceOf[Array[_]]) {
- ArrayConstructor(array.map { na => convertArray(na.asInstanceOf[Array[_]]) })
- } else {
- throw ValidationException("Unsupported array type.")
- }
- }
- }
-
- // ----------------------------------------------------------------------------------------------
- // RexNode conversion functions (see org.apache.calcite.sql2rel.StandardConvertletTable)
- // ----------------------------------------------------------------------------------------------
-
- /**
- * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#getFactor()]].
- */
- private[flink] def getFactor(unit: TimeUnit): JBigDecimal = unit match {
- case TimeUnit.DAY => java.math.BigDecimal.ONE
- case TimeUnit.HOUR => TimeUnit.DAY.multiplier
- case TimeUnit.MINUTE => TimeUnit.HOUR.multiplier
- case TimeUnit.SECOND => TimeUnit.MINUTE.multiplier
- case TimeUnit.YEAR => java.math.BigDecimal.ONE
- case TimeUnit.MONTH => TimeUnit.YEAR.multiplier
- case _ => throw new IllegalArgumentException("Invalid start unit.")
- }
-
- /**
- * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#mod()]].
- */
- private[flink] def mod(
- rexBuilder: RexBuilder,
- resType: RelDataType,
- res: RexNode,
- value: JBigDecimal)
- : RexNode = {
- if (value == JBigDecimal.ONE) return res
- rexBuilder.makeCall(SqlStdOperatorTable.MOD, res, rexBuilder.makeExactLiteral(value, resType))
- }
-
- /**
- * Copy of [[org.apache.calcite.sql2rel.StandardConvertletTable#divide()]].
- */
- private[flink] def divide(rexBuilder: RexBuilder, res: RexNode, value: JBigDecimal): RexNode = {
- if (value == JBigDecimal.ONE) return res
- if (value.compareTo(JBigDecimal.ONE) < 0 && value.signum == 1) {
- try {
- val reciprocal = JBigDecimal.ONE.divide(value, JBigDecimal.ROUND_UNNECESSARY)
- return rexBuilder.makeCall(
- SqlStdOperatorTable.MULTIPLY,
- res,
- rexBuilder.makeExactLiteral(reciprocal))
- } catch {
- case e: ArithmeticException => // ignore
- }
- }
- rexBuilder.makeCall(
- SqlStdOperatorTable.DIVIDE_INTEGER,
- res,
- rexBuilder.makeExactLiteral(value))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
deleted file mode 100644
index 67e44a1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/InputTypeSpec.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import scala.collection.mutable
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.validate._
-
-/**
- * Expressions that have specification on its inputs.
- */
-trait InputTypeSpec extends Expression {
-
- /**
- * Input type specification for each child.
- *
- * For example, [[Power]] expecting both of the children be of Double Type should use:
- * {{{
- * def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil
- * }}}
- */
- private[flink] def expectedTypes: Seq[TypeInformation[_]]
-
- override private[flink] def validateInput(): ValidationResult = {
- val typeMismatches = mutable.ArrayBuffer.empty[String]
- children.zip(expectedTypes).zipWithIndex.foreach { case ((e, tpe), i) =>
- if (e.resultType != tpe) {
- typeMismatches += s"expecting $tpe on ${i}th input, get ${e.resultType}"
- }
- }
- if (typeMismatches.isEmpty) {
- ValidationSuccess
- } else {
- ValidationFailure(
- s"""|$this fails on input type checking: ${typeMismatches.mkString("[", ", ", "]")}.
- |Operand should be casted to proper type
- |""".stripMargin)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
deleted file mode 100644
index 259f7e5..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.calcite.tools.RelBuilder.AggCall
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.typeutils.TypeCheckUtils
-
-abstract sealed class Aggregation extends UnaryExpression {
-
- override def toString = s"Aggregate($child)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
- throw new UnsupportedOperationException("Aggregate cannot be transformed to RexNode")
-
- /**
- * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
- */
- private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall
-}
-
-case class Sum(child: Expression) extends Aggregation {
- override def toString = s"sum($child)"
-
- override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
- relBuilder.aggregateCall(SqlStdOperatorTable.SUM, false, null, name, child.toRexNode)
- }
-
- override private[flink] def resultType = child.resultType
-
- override private[flink] def validateInput() =
- TypeCheckUtils.assertNumericExpr(child.resultType, "sum")
-}
-
-case class Min(child: Expression) extends Aggregation {
- override def toString = s"min($child)"
-
- override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
- relBuilder.aggregateCall(SqlStdOperatorTable.MIN, false, null, name, child.toRexNode)
- }
-
- override private[flink] def resultType = child.resultType
-
- override private[flink] def validateInput() =
- TypeCheckUtils.assertOrderableExpr(child.resultType, "min")
-}
-
-case class Max(child: Expression) extends Aggregation {
- override def toString = s"max($child)"
-
- override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
- relBuilder.aggregateCall(SqlStdOperatorTable.MAX, false, null, name, child.toRexNode)
- }
-
- override private[flink] def resultType = child.resultType
-
- override private[flink] def validateInput() =
- TypeCheckUtils.assertOrderableExpr(child.resultType, "max")
-}
-
-case class Count(child: Expression) extends Aggregation {
- override def toString = s"count($child)"
-
- override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
- relBuilder.aggregateCall(SqlStdOperatorTable.COUNT, false, null, name, child.toRexNode)
- }
-
- override private[flink] def resultType = BasicTypeInfo.LONG_TYPE_INFO
-}
-
-case class Avg(child: Expression) extends Aggregation {
- override def toString = s"avg($child)"
-
- override private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall = {
- relBuilder.aggregateCall(SqlStdOperatorTable.AVG, false, null, name, child.toRexNode)
- }
-
- override private[flink] def resultType = child.resultType
-
- override private[flink] def validateInput() =
- TypeCheckUtils.assertNumericExpr(child.resultType, "avg")
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
deleted file mode 100644
index 8702886..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.`type`.IntervalSqlType
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.typeutils.TypeCheckUtils._
-import org.apache.flink.api.table.typeutils.TypeCoercion
-import org.apache.flink.api.table.validate._
-
-import scala.collection.JavaConversions._
-
-abstract class BinaryArithmetic extends BinaryExpression {
- private[flink] def sqlOperator: SqlOperator
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(sqlOperator, children.map(_.toRexNode))
- }
-
- override private[flink] def resultType: TypeInformation[_] =
- TypeCoercion.widerTypeOf(left.resultType, right.resultType) match {
- case Some(t) => t
- case None =>
- throw new RuntimeException("This should never happen.")
- }
-
- // TODO: tighten this rule once we implemented type coercion rules during validation
- override private[flink] def validateInput(): ValidationResult = {
- if (!isNumeric(left.resultType) || !isNumeric(right.resultType)) {
- ValidationFailure(s"$this requires both operands Numeric, get " +
- s"$left : ${left.resultType} and $right : ${right.resultType}")
- } else {
- ValidationSuccess
- }
- }
-}
-
-case class Plus(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left + $right)"
-
- private[flink] val sqlOperator = SqlStdOperatorTable.PLUS
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- if(isString(left.resultType)) {
- val castedRight = Cast(right, BasicTypeInfo.STRING_TYPE_INFO)
- relBuilder.call(SqlStdOperatorTable.CONCAT, left.toRexNode, castedRight.toRexNode)
- } else if(isString(right.resultType)) {
- val castedLeft = Cast(left, BasicTypeInfo.STRING_TYPE_INFO)
- relBuilder.call(SqlStdOperatorTable.CONCAT, castedLeft.toRexNode, right.toRexNode)
- } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
- relBuilder.call(SqlStdOperatorTable.PLUS, left.toRexNode, right.toRexNode)
- } else if (isTimeInterval(left.resultType) && isTemporal(right.resultType)) {
- // Calcite has a bug that can't apply INTERVAL + DATETIME (INTERVAL at left)
- // we manually switch them here
- relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, right.toRexNode, left.toRexNode)
- } else if (isTemporal(left.resultType) && isTemporal(right.resultType)) {
- relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, left.toRexNode, right.toRexNode)
- } else {
- val castedLeft = Cast(left, resultType)
- val castedRight = Cast(right, resultType)
- relBuilder.call(SqlStdOperatorTable.PLUS, castedLeft.toRexNode, castedRight.toRexNode)
- }
- }
-
- override private[flink] def validateInput(): ValidationResult = {
- if (isString(left.resultType) || isString(right.resultType)) {
- ValidationSuccess
- } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
- ValidationSuccess
- } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) {
- ValidationSuccess
- } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
- ValidationSuccess
- } else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
- ValidationSuccess
- } else {
- ValidationFailure(
- s"$this requires Numeric, String, Intervals of same type, " +
- s"or Interval and a time point input, " +
- s"get $left : ${left.resultType} and $right : ${right.resultType}")
- }
- }
-}
-
-case class UnaryMinus(child: Expression) extends UnaryExpression {
- override def toString = s"-($child)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.UNARY_MINUS, child.toRexNode)
- }
-
- override private[flink] def resultType = child.resultType
-
- override private[flink] def validateInput(): ValidationResult = {
- if (isNumeric(child.resultType)) {
- ValidationSuccess
- } else if (isTimeInterval(child.resultType)) {
- ValidationSuccess
- } else {
- ValidationFailure(s"$this requires Numeric, or Interval input, get ${child.resultType}")
- }
- }
-}
-
-case class Minus(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left - $right)"
-
- private[flink] val sqlOperator = SqlStdOperatorTable.MINUS
-
- override private[flink] def validateInput(): ValidationResult = {
- if (isTimeInterval(left.resultType) && left.resultType == right.resultType) {
- ValidationSuccess
- } else if (isTimePoint(left.resultType) && isTimeInterval(right.resultType)) {
- ValidationSuccess
- } else if (isTimeInterval(left.resultType) && isTimePoint(right.resultType)) {
- ValidationSuccess
- } else {
- super.validateInput()
- }
- }
-}
-
-case class Div(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left / $right)"
-
- private[flink] val sqlOperator = SqlStdOperatorTable.DIVIDE
-}
-
-case class Mul(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left * $right)"
-
- private[flink] val sqlOperator = SqlStdOperatorTable.MULTIPLY
-}
-
-case class Mod(left: Expression, right: Expression) extends BinaryArithmetic {
- override def toString = s"($left % $right)"
-
- private[flink] val sqlOperator = SqlStdOperatorTable.MOD
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/array.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/array.scala
deleted file mode 100644
index 78084de..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/array.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.api.table.FlinkRelBuilder
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-
-import scala.collection.JavaConverters._
-
-case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
-
- override private[flink] def children: Seq[Expression] = elements
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- val relDataType = relBuilder
- .asInstanceOf[FlinkRelBuilder]
- .getTypeFactory
- .createTypeFromTypeInfo(resultType)
- val values = elements.map(_.toRexNode).toList.asJava
- relBuilder
- .getRexBuilder
- .makeCall(relDataType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, values)
- }
-
- override def toString = s"array(${elements.mkString(", ")})"
-
- override private[flink] def resultType = ObjectArrayTypeInfo.getInfoFor(elements.head.resultType)
-
- override private[flink] def validateInput(): ValidationResult = {
- if (elements.isEmpty) {
- return ValidationFailure("Empty arrays are not supported yet.")
- }
- val elementType = elements.head.resultType
- if (!elements.forall(_.resultType == elementType)) {
- ValidationFailure("Not all elements of the array have the same type.")
- } else {
- ValidationSuccess
- }
- }
-}
-
-case class ArrayElementAt(array: Expression, index: Expression) extends Expression {
-
- override private[flink] def children: Seq[Expression] = Seq(array, index)
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder
- .getRexBuilder
- .makeCall(SqlStdOperatorTable.ITEM, array.toRexNode, index.toRexNode)
- }
-
- override def toString = s"($array).at($index)"
-
- override private[flink] def resultType = array.resultType match {
- case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
- case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
- }
-
- override private[flink] def validateInput(): ValidationResult = {
- array.resultType match {
- case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] =>
- if (index.resultType == INT_TYPE_INFO) {
- // check for common user mistake
- index match {
- case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
- ValidationFailure(
- s"Array element access needs an index starting at 1 but was $value.")
- case _ => ValidationSuccess
- }
- } else {
- ValidationFailure(
- s"Array element access needs an integer index but was '${index.resultType}'.")
- }
- case other@_ => ValidationFailure(s"Array expected but was '$other'.")
- }
- }
-}
-
-case class ArrayCardinality(array: Expression) extends Expression {
-
- override private[flink] def children: Seq[Expression] = Seq(array)
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder
- .getRexBuilder
- .makeCall(SqlStdOperatorTable.CARDINALITY, array.toRexNode)
- }
-
- override def toString = s"($array).cardinality()"
-
- override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- array.resultType match {
- case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => ValidationSuccess
- case other@_ => ValidationFailure(s"Array expected but was '$other'.")
- }
- }
-}
-
-case class ArrayElement(array: Expression) extends Expression {
-
- override private[flink] def children: Seq[Expression] = Seq(array)
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder
- .getRexBuilder
- .makeCall(SqlStdOperatorTable.ELEMENT, array.toRexNode)
- }
-
- override def toString = s"($array).element()"
-
- override private[flink] def resultType = array.resultType match {
- case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
- case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
- }
-
- override private[flink] def validateInput(): ValidationResult = {
- array.resultType match {
- case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => ValidationSuccess
- case other@_ => ValidationFailure(s"Array expected but was '$other'.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
deleted file mode 100644
index 3bb9dac..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction}
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
-import org.apache.flink.api.table.plan.logical.{LogicalNode, LogicalTableFunctionCall}
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-import org.apache.flink.api.table.{FlinkTypeFactory, UnresolvedException, ValidationException}
-
-/**
- * General expression for unresolved function calls. The function can be a built-in
- * scalar function or a user-defined scalar function.
- */
-case class Call(functionName: String, args: Seq[Expression]) extends Expression {
-
- override private[flink] def children: Seq[Expression] = args
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- throw UnresolvedException(s"trying to convert UnresolvedFunction $functionName to RexNode")
- }
-
- override def toString = s"\\$functionName(${args.mkString(", ")})"
-
- override private[flink] def resultType =
- throw UnresolvedException(s"calling resultType on UnresolvedFunction $functionName")
-
- override private[flink] def validateInput(): ValidationResult =
- ValidationFailure(s"Unresolved function call: $functionName")
-}
-
-/**
- * Expression for calling a user-defined scalar functions.
- *
- * @param scalarFunction scalar function to be called (might be overloaded)
- * @param parameters actual parameters that determine target evaluation method
- */
-case class ScalarFunctionCall(
- scalarFunction: ScalarFunction,
- parameters: Seq[Expression])
- extends Expression {
-
- private var foundSignature: Option[Array[Class[_]]] = None
-
- override private[flink] def children: Seq[Expression] = parameters
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- relBuilder.call(
- createScalarSqlFunction(
- scalarFunction.getClass.getCanonicalName,
- scalarFunction,
- typeFactory),
- parameters.map(_.toRexNode): _*)
- }
-
- override def toString =
- s"${scalarFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
-
- override private[flink] def resultType = getResultType(scalarFunction, foundSignature.get)
-
- override private[flink] def validateInput(): ValidationResult = {
- val signature = children.map(_.resultType)
- // look for a signature that matches the input types
- foundSignature = getSignature(scalarFunction, signature)
- if (foundSignature.isEmpty) {
- ValidationFailure(s"Given parameters do not match any signature. \n" +
- s"Actual: ${signatureToString(signature)} \n" +
- s"Expected: ${signaturesToString(scalarFunction)}")
- } else {
- ValidationSuccess
- }
- }
-}
-
-/**
- *
- * Expression for calling a user-defined table function with actual parameters.
- *
- * @param functionName function name
- * @param tableFunction user-defined table function
- * @param parameters actual parameters of function
- * @param resultType type information of returned table
- */
-case class TableFunctionCall(
- functionName: String,
- tableFunction: TableFunction[_],
- parameters: Seq[Expression],
- resultType: TypeInformation[_])
- extends Expression {
-
- private var aliases: Option[Seq[String]] = None
-
- override private[flink] def children: Seq[Expression] = parameters
-
- /**
- * Assigns an alias for this table function's returned fields that the following operator
- * can refer to.
- *
- * @param aliasList alias for this table function's returned fields
- * @return this table function call
- */
- private[flink] def as(aliasList: Option[Seq[String]]): TableFunctionCall = {
- this.aliases = aliasList
- this
- }
-
- /**
- * Converts an API class to a logical node for planning.
- */
- private[flink] def toLogicalTableFunctionCall(child: LogicalNode): LogicalTableFunctionCall = {
- val originNames = getFieldInfo(resultType)._1
-
- // determine the final field names
- val fieldNames = if (aliases.isDefined) {
- val aliasList = aliases.get
- if (aliasList.length != originNames.length) {
- throw ValidationException(
- s"List of column aliases must have same degree as table; " +
- s"the returned table of function '$functionName' has ${originNames.length} " +
- s"columns (${originNames.mkString(",")}), " +
- s"whereas alias list has ${aliasList.length} columns")
- } else {
- aliasList.toArray
- }
- } else {
- originNames
- }
-
- LogicalTableFunctionCall(
- functionName,
- tableFunction,
- parameters,
- resultType,
- fieldNames,
- child)
- }
-
- override def toString =
- s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
deleted file mode 100644
index 2232a91..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/cast.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.FlinkTypeFactory
-import org.apache.flink.api.table.typeutils.TypeCoercion
-import org.apache.flink.api.table.validate._
-
-case class Cast(child: Expression, resultType: TypeInformation[_]) extends UnaryExpression {
-
- override def toString = s"$child.cast($resultType)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- relBuilder
- .getRexBuilder
- // we use abstract cast here because RelBuilder.cast() has to many side effects
- .makeAbstractCast(
- typeFactory.createTypeFromTypeInfo(resultType),
- child.toRexNode)
- }
-
- override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
- val child: Expression = anyRefs.head.asInstanceOf[Expression]
- copy(child, resultType).asInstanceOf[this.type]
- }
-
- override private[flink] def validateInput(): ValidationResult = {
- if (TypeCoercion.canCast(child.resultType, resultType)) {
- ValidationSuccess
- } else {
- ValidationFailure(s"Unsupported cast from ${child.resultType} to $resultType")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
deleted file mode 100644
index 5a150f8..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/comparison.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.table.typeutils.TypeCheckUtils.{isComparable, isNumeric}
-import org.apache.flink.api.table.validate._
-
-import scala.collection.JavaConversions._
-
-abstract class BinaryComparison extends BinaryExpression {
- private[flink] def sqlOperator: SqlOperator
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(sqlOperator, children.map(_.toRexNode))
- }
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult =
- (left.resultType, right.resultType) match {
- case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
- case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess
- case (lType, rType) =>
- ValidationFailure(
- s"Comparison is only supported for numeric types and " +
- s"comparable types of same type, got $lType and $rType")
- }
-}
-
-case class EqualTo(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left === $right"
-
- private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.EQUALS
-
- override private[flink] def validateInput(): ValidationResult =
- (left.resultType, right.resultType) match {
- case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
- case (lType, rType) if lType == rType => ValidationSuccess
- case (lType, rType) =>
- ValidationFailure(s"Equality predicate on incompatible types: $lType and $rType")
- }
-}
-
-case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left !== $right"
-
- private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.NOT_EQUALS
-
- override private[flink] def validateInput(): ValidationResult =
- (left.resultType, right.resultType) match {
- case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
- case (lType, rType) if lType == rType => ValidationSuccess
- case (lType, rType) =>
- ValidationFailure(s"Inequality predicate on incompatible types: $lType and $rType")
- }
-}
-
-case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left > $right"
-
- private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.GREATER_THAN
-}
-
-case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left >= $right"
-
- private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.GREATER_THAN_OR_EQUAL
-}
-
-case class LessThan(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left < $right"
-
- private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.LESS_THAN
-}
-
-case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison {
- override def toString = s"$left <= $right"
-
- private[flink] val sqlOperator: SqlOperator = SqlStdOperatorTable.LESS_THAN_OR_EQUAL
-}
-
-case class IsNull(child: Expression) extends UnaryExpression {
- override def toString = s"($child).isNull"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.isNull(child.toRexNode)
- }
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsNotNull(child: Expression) extends UnaryExpression {
- override def toString = s"($child).isNotNull"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.isNotNull(child.toRexNode)
- }
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsTrue(child: Expression) extends UnaryExpression {
- override def toString = s"($child).isTrue"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.IS_TRUE, child.toRexNode)
- }
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsFalse(child: Expression) extends UnaryExpression {
- override def toString = s"($child).isFalse"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.IS_FALSE, child.toRexNode)
- }
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsNotTrue(child: Expression) extends UnaryExpression {
- override def toString = s"($child).isNotTrue"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.IS_NOT_TRUE, child.toRexNode)
- }
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
-
-case class IsNotFalse(child: Expression) extends UnaryExpression {
- override def toString = s"($child).isNotFalse"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.IS_NOT_FALSE, child.toRexNode)
- }
-
- override private[flink] def resultType = BOOLEAN_TYPE_INFO
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
deleted file mode 100644
index ee1eb46..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/composite.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.UnresolvedException
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-
-/**
- * Flattening of composite types. All flattenings are resolved into
- * `GetCompositeField` expressions.
- */
-case class Flattening(child: Expression) extends UnaryExpression {
-
- override def toString = s"$child.flatten()"
-
- override private[flink] def resultType: TypeInformation[_] =
- throw UnresolvedException(s"Invalid call to on ${this.getClass}.")
-
- override private[flink] def validateInput(): ValidationResult =
- ValidationFailure(s"Unresolved flattening of $child")
-}
-
-case class GetCompositeField(child: Expression, key: Any) extends UnaryExpression {
-
- private var fieldIndex: Option[Int] = None
-
- override def toString = s"$child.get($key)"
-
- override private[flink] def validateInput(): ValidationResult = {
- // check for composite type
- if (!child.resultType.isInstanceOf[CompositeType[_]]) {
- return ValidationFailure(s"Cannot access field of non-composite type '${child.resultType}'.")
- }
- val compositeType = child.resultType.asInstanceOf[CompositeType[_]]
-
- // check key
- key match {
- case name: String =>
- val index = compositeType.getFieldIndex(name)
- if (index < 0) {
- ValidationFailure(s"Field name '$name' could not be found.")
- } else {
- fieldIndex = Some(index)
- ValidationSuccess
- }
- case index: Int =>
- if (index >= compositeType.getArity) {
- ValidationFailure(s"Field index '$index' exceeds arity.")
- } else {
- fieldIndex = Some(index)
- ValidationSuccess
- }
- case _ =>
- ValidationFailure(s"Invalid key '$key'.")
- }
- }
-
- override private[flink] def resultType: TypeInformation[_] =
- child.resultType.asInstanceOf[CompositeType[_]].getTypeAt(fieldIndex.get)
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder
- .getRexBuilder
- .makeFieldAccess(child.toRexNode, fieldIndex.get)
- }
-
- override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
- val child: Expression = anyRefs.head.asInstanceOf[Expression]
- copy(child, key).asInstanceOf[this.type]
- }
-
- /**
- * Gives a meaningful alias if possible (e.g. a$mypojo$field).
- */
- private[flink] def aliasName(): Option[String] = child match {
- case gcf: GetCompositeField =>
- val alias = gcf.aliasName()
- if (alias.isDefined) {
- Some(s"${alias.get}$$$key")
- } else {
- None
- }
- case c: ResolvedFieldReference => Some(s"${c.name}$$$key")
- case _ => None
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
deleted file mode 100644
index e651bb3..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/fieldExpression.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.{UnresolvedException, ValidationException}
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
-
-trait NamedExpression extends Expression {
- private[flink] def name: String
- private[flink] def toAttribute: Attribute
-}
-
-abstract class Attribute extends LeafExpression with NamedExpression {
- override private[flink] def toAttribute: Attribute = this
-
- private[flink] def withName(newName: String): Attribute
-}
-
-case class UnresolvedFieldReference(name: String) extends Attribute {
-
- override def toString = "\"" + name
-
- override private[flink] def withName(newName: String): Attribute =
- UnresolvedFieldReference(newName)
-
- override private[flink] def resultType: TypeInformation[_] =
- throw UnresolvedException(s"Calling resultType on ${this.getClass}.")
-
- override private[flink] def validateInput(): ValidationResult =
- ValidationFailure(s"Unresolved reference $name.")
-}
-
-case class ResolvedFieldReference(
- name: String,
- resultType: TypeInformation[_]) extends Attribute {
-
- override def toString = s"'$name"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.field(name)
- }
-
- override private[flink] def withName(newName: String): Attribute = {
- if (newName == name) {
- this
- } else {
- ResolvedFieldReference(newName, resultType)
- }
- }
-}
-
-case class Alias(child: Expression, name: String, extraNames: Seq[String] = Seq())
- extends UnaryExpression with NamedExpression {
-
- override def toString = s"$child as '$name"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.alias(child.toRexNode, name)
- }
-
- override private[flink] def resultType: TypeInformation[_] = child.resultType
-
- override private[flink] def makeCopy(anyRefs: Array[AnyRef]): this.type = {
- val child: Expression = anyRefs.head.asInstanceOf[Expression]
- copy(child, name, extraNames).asInstanceOf[this.type]
- }
-
- override private[flink] def toAttribute: Attribute = {
- if (valid) {
- ResolvedFieldReference(name, child.resultType)
- } else {
- UnresolvedFieldReference(name)
- }
- }
-
- override private[flink] def validateInput(): ValidationResult = {
- if (name == "*") {
- ValidationFailure("Alias can not accept '*' as name.")
- } else if (extraNames.nonEmpty) {
- ValidationFailure("Invalid call to Alias with multiple names.")
- } else {
- ValidationSuccess
- }
- }
-}
-
-case class UnresolvedAlias(child: Expression) extends UnaryExpression with NamedExpression {
-
- override private[flink] def name: String =
- throw UnresolvedException("Invalid call to name on UnresolvedAlias")
-
- override private[flink] def toAttribute: Attribute =
- throw UnresolvedException("Invalid call to toAttribute on UnresolvedAlias")
-
- override private[flink] def resultType: TypeInformation[_] =
- throw UnresolvedException("Invalid call to resultType on UnresolvedAlias")
-
- override private[flink] lazy val valid = false
-}
-
-case class RowtimeAttribute() extends Attribute {
- override private[flink] def withName(newName: String): Attribute = {
- if (newName == "rowtime") {
- this
- } else {
- throw new ValidationException("Cannot rename streaming rowtime attribute.")
- }
- }
-
- override private[flink] def name: String = "rowtime"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- throw new UnsupportedOperationException("A rowtime attribute can not be used solely.")
- }
-
- override private[flink] def resultType: TypeInformation[_] = BasicTypeInfo.LONG_TYPE_INFO
-}
-
-case class WindowReference(name: String) extends Attribute {
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
- throw new UnsupportedOperationException("A window reference can not be used solely.")
-
- override private[flink] def resultType: TypeInformation[_] =
- throw new UnsupportedOperationException("A window reference has no result type.")
-
- override private[flink] def withName(newName: String): Attribute = {
- if (newName == name) {
- this
- } else {
- throw new ValidationException("Cannot rename window reference.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
deleted file mode 100644
index 6382abe..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import java.sql.{Date, Time, Timestamp}
-import java.util.{Calendar, TimeZone}
-
-import org.apache.calcite.avatica.util.TimeUnit
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.SqlIntervalQualifier
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.table.FlinkTypeFactory
-import org.apache.flink.api.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo}
-
-object Literal {
- private[flink] def apply(l: Any): Literal = l match {
- case i: Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO)
- case s: Short => Literal(s, BasicTypeInfo.SHORT_TYPE_INFO)
- case b: Byte => Literal(b, BasicTypeInfo.BYTE_TYPE_INFO)
- case l: Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO)
- case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO)
- case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO)
- case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO)
- case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO)
- case javaDec: java.math.BigDecimal => Literal(javaDec, BasicTypeInfo.BIG_DEC_TYPE_INFO)
- case scalaDec: scala.math.BigDecimal =>
- Literal(scalaDec.bigDecimal, BasicTypeInfo.BIG_DEC_TYPE_INFO)
- case sqlDate: Date => Literal(sqlDate, SqlTimeTypeInfo.DATE)
- case sqlTime: Time => Literal(sqlTime, SqlTimeTypeInfo.TIME)
- case sqlTimestamp: Timestamp => Literal(sqlTimestamp, SqlTimeTypeInfo.TIMESTAMP)
- }
-}
-
-case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpression {
- override def toString = resultType match {
- case _: BasicTypeInfo[_] => value.toString
- case _@SqlTimeTypeInfo.DATE => value.toString + ".toDate"
- case _@SqlTimeTypeInfo.TIME => value.toString + ".toTime"
- case _@SqlTimeTypeInfo.TIMESTAMP => value.toString + ".toTimestamp"
- case _@TimeIntervalTypeInfo.INTERVAL_MILLIS => value.toString + ".millis"
- case _@TimeIntervalTypeInfo.INTERVAL_MONTHS => value.toString + ".months"
- case _@RowIntervalTypeInfo.INTERVAL_ROWS => value.toString + ".rows"
- case _ => s"Literal($value, $resultType)"
- }
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- resultType match {
- case BasicTypeInfo.BIG_DEC_TYPE_INFO =>
- val bigDecValue = value.asInstanceOf[java.math.BigDecimal]
- val decType = relBuilder.getTypeFactory.createSqlType(SqlTypeName.DECIMAL)
- relBuilder.getRexBuilder.makeExactLiteral(bigDecValue, decType)
-
- // date/time
- case SqlTimeTypeInfo.DATE =>
- relBuilder.getRexBuilder.makeDateLiteral(dateToCalendar)
- case SqlTimeTypeInfo.TIME =>
- relBuilder.getRexBuilder.makeTimeLiteral(dateToCalendar, 0)
- case SqlTimeTypeInfo.TIMESTAMP =>
- relBuilder.getRexBuilder.makeTimestampLiteral(dateToCalendar, 3)
-
- case TimeIntervalTypeInfo.INTERVAL_MONTHS =>
- val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Int])
- val intervalQualifier = new SqlIntervalQualifier(
- TimeUnit.YEAR,
- TimeUnit.MONTH,
- SqlParserPos.ZERO)
- relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
-
- case TimeIntervalTypeInfo.INTERVAL_MILLIS =>
- val interval = java.math.BigDecimal.valueOf(value.asInstanceOf[Long])
- val intervalQualifier = new SqlIntervalQualifier(
- TimeUnit.DAY,
- TimeUnit.SECOND,
- SqlParserPos.ZERO)
- relBuilder.getRexBuilder.makeIntervalLiteral(interval, intervalQualifier)
-
- case _ => relBuilder.literal(value)
- }
- }
-
- private def dateToCalendar: Calendar = {
- val date = value.asInstanceOf[java.util.Date]
- val cal = Calendar.getInstance()
- val t = date.getTime
- // according to Calcite's SqlFunctions.internalToXXX methods
- cal.setTimeInMillis(t + TimeZone.getDefault.getOffset(t))
- cal
- }
-}
-
-case class Null(resultType: TypeInformation[_]) extends LeafExpression {
- override def toString = s"null"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- val rexBuilder = relBuilder.getRexBuilder
- val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
- rexBuilder
- .makeCast(
- typeFactory.createTypeFromTypeInfo(resultType),
- rexBuilder.constantNull())
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
deleted file mode 100644
index 9c8e279..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/logic.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.table.validate._
-
-abstract class BinaryPredicate extends BinaryExpression {
- override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (left.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
- right.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
- ValidationSuccess
- } else {
- ValidationFailure(s"$this only accepts children of Boolean type, " +
- s"get $left : ${left.resultType} and $right : ${right.resultType}")
- }
- }
-}
-
-case class Not(child: Expression) extends UnaryExpression {
-
- override def toString = s"!($child)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.not(child.toRexNode)
- }
-
- override private[flink] def resultType = BasicTypeInfo.BOOLEAN_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (child.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
- ValidationSuccess
- } else {
- ValidationFailure(s"Not operator requires a boolean expression as input, " +
- s"but $child is of type ${child.resultType}")
- }
- }
-}
-
-case class And(left: Expression, right: Expression) extends BinaryPredicate {
-
- override def toString = s"$left && $right"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.and(left.toRexNode, right.toRexNode)
- }
-}
-
-case class Or(left: Expression, right: Expression) extends BinaryPredicate {
-
- override def toString = s"$left || $right"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.or(left.toRexNode, right.toRexNode)
- }
-}
-
-case class If(
- condition: Expression,
- ifTrue: Expression,
- ifFalse: Expression)
- extends Expression {
- private[flink] def children = Seq(condition, ifTrue, ifFalse)
-
- override private[flink] def resultType = ifTrue.resultType
-
- override def toString = s"($condition)? $ifTrue : $ifFalse"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- val c = condition.toRexNode
- val t = ifTrue.toRexNode
- val f = ifFalse.toRexNode
- relBuilder.call(SqlStdOperatorTable.CASE, c, t, f)
- }
-
- override private[flink] def validateInput(): ValidationResult = {
- if (condition.resultType == BasicTypeInfo.BOOLEAN_TYPE_INFO &&
- ifTrue.resultType == ifFalse.resultType) {
- ValidationSuccess
- } else {
- ValidationFailure(
- s"If should have boolean condition and same type of ifTrue and ifFalse, get " +
- s"(${condition.resultType}, ${ifTrue.resultType}, ${ifFalse.resultType})")
- }
- }
-}