You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:05 UTC
[36/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
deleted file mode 100644
index e0f4691..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/mathExpressions.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.typeutils.TypeCheckUtils
-import org.apache.flink.api.table.validate._
-
-case class Abs(child: Expression) extends UnaryExpression {
- override private[flink] def resultType: TypeInformation[_] = child.resultType
-
- override private[flink] def validateInput(): ValidationResult =
- TypeCheckUtils.assertNumericExpr(child.resultType, "Abs")
-
- override def toString: String = s"abs($child)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.ABS, child.toRexNode)
- }
-}
-
-case class Ceil(child: Expression) extends UnaryExpression {
- override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult =
- TypeCheckUtils.assertNumericExpr(child.resultType, "Ceil")
-
- override def toString: String = s"ceil($child)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.CEIL, child.toRexNode)
- }
-}
-
-case class Exp(child: Expression) extends UnaryExpression with InputTypeSpec {
- override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
-
- override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
-
- override def toString: String = s"exp($child)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.EXP, child.toRexNode)
- }
-}
-
-
-case class Floor(child: Expression) extends UnaryExpression {
- override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult =
- TypeCheckUtils.assertNumericExpr(child.resultType, "Floor")
-
- override def toString: String = s"floor($child)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.FLOOR, child.toRexNode)
- }
-}
-
-case class Log10(child: Expression) extends UnaryExpression with InputTypeSpec {
- override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
-
- override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
-
- override def toString: String = s"log10($child)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.LOG10, child.toRexNode)
- }
-}
-
-case class Ln(child: Expression) extends UnaryExpression with InputTypeSpec {
- override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
-
- override private[flink] def expectedTypes: Seq[TypeInformation[_]] = DOUBLE_TYPE_INFO :: Nil
-
- override def toString: String = s"ln($child)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.LN, child.toRexNode)
- }
-}
-
-case class Power(left: Expression, right: Expression) extends BinaryExpression with InputTypeSpec {
- override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
-
- override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
- DOUBLE_TYPE_INFO :: DOUBLE_TYPE_INFO :: Nil
-
- override def toString: String = s"pow($left, $right)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.POWER, left.toRexNode, right.toRexNode)
- }
-}
-
-case class Sqrt(child: Expression) extends UnaryExpression with InputTypeSpec {
- override private[flink] def resultType: TypeInformation[_] = DOUBLE_TYPE_INFO
-
- override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
- Seq(DOUBLE_TYPE_INFO)
-
- override def toString: String = s"sqrt($child)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.POWER, child.toRexNode, Literal(0.5).toRexNode)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
deleted file mode 100644
index c15d462..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ordering.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.validate._
-
-abstract class Ordering extends UnaryExpression {
- override private[flink] def validateInput(): ValidationResult = {
- if (!child.isInstanceOf[NamedExpression]) {
- ValidationFailure(s"Sort should only based on field reference")
- } else {
- ValidationSuccess
- }
- }
-}
-
-case class Asc(child: Expression) extends Ordering {
- override def toString: String = s"($child).asc"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- child.toRexNode
- }
-
- override private[flink] def resultType: TypeInformation[_] = child.resultType
-}
-
-case class Desc(child: Expression) extends Ordering {
- override def toString: String = s"($child).desc"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.desc(child.toRexNode)
- }
-
- override private[flink] def resultType: TypeInformation[_] = child.resultType
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
deleted file mode 100644
index 2e5d0b2..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * This package contains the base class of AST nodes and all the expression language AST classes.
- * Expression trees should not be manually constructed by users. They are implicitly constructed
- * from the implicit DSL conversions in
- * [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]]. For the Java API,
- * expression trees should be generated from a string parser that parses expressions and creates
- * AST nodes.
- */
-package object expressions
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
deleted file mode 100644
index 56b5b5e..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import scala.collection.JavaConversions._
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.expressions.TrimMode.TrimMode
-import org.apache.flink.api.table.validate._
-
-/**
- * Returns the length of this `str`.
- */
-case class CharLength(child: Expression) extends UnaryExpression {
- override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (child.resultType == STRING_TYPE_INFO) {
- ValidationSuccess
- } else {
- ValidationFailure(s"CharLength operator requires String input, " +
- s"but $child is of type ${child.resultType}")
- }
- }
-
- override def toString: String = s"($child).charLength()"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.CHAR_LENGTH, child.toRexNode)
- }
-}
-
-/**
- * Returns str with the first letter of each word in uppercase.
- * All other letters are in lowercase. Words are delimited by white space.
- */
-case class InitCap(child: Expression) extends UnaryExpression {
- override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (child.resultType == STRING_TYPE_INFO) {
- ValidationSuccess
- } else {
- ValidationFailure(s"InitCap operator requires String input, " +
- s"but $child is of type ${child.resultType}")
- }
- }
-
- override def toString: String = s"($child).initCap()"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.INITCAP, child.toRexNode)
- }
-}
-
-/**
- * Returns true if `str` matches `pattern`.
- */
-case class Like(str: Expression, pattern: Expression) extends BinaryExpression {
- private[flink] def left: Expression = str
- private[flink] def right: Expression = pattern
-
- override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) {
- ValidationSuccess
- } else {
- ValidationFailure(s"Like operator requires (String, String) input, " +
- s"but ($str, $pattern) is of type (${str.resultType}, ${pattern.resultType})")
- }
- }
-
- override def toString: String = s"($str).like($pattern)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.LIKE, children.map(_.toRexNode))
- }
-}
-
-/**
- * Returns str with all characters changed to lowercase.
- */
-case class Lower(child: Expression) extends UnaryExpression {
- override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (child.resultType == STRING_TYPE_INFO) {
- ValidationSuccess
- } else {
- ValidationFailure(s"Lower operator requires String input, " +
- s"but $child is of type ${child.resultType}")
- }
- }
-
- override def toString: String = s"($child).toLowerCase()"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.LOWER, child.toRexNode)
- }
-}
-
-/**
- * Returns true if `str` is similar to `pattern`.
- */
-case class Similar(str: Expression, pattern: Expression) extends BinaryExpression {
- private[flink] def left: Expression = str
- private[flink] def right: Expression = pattern
-
- override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (str.resultType == STRING_TYPE_INFO && pattern.resultType == STRING_TYPE_INFO) {
- ValidationSuccess
- } else {
- ValidationFailure(s"Similar operator requires (String, String) input, " +
- s"but ($str, $pattern) is of type (${str.resultType}, ${pattern.resultType})")
- }
- }
-
- override def toString: String = s"($str).similarTo($pattern)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.SIMILAR_TO, children.map(_.toRexNode))
- }
-}
-
-/**
- * Returns substring of `str` from `begin`(inclusive) for `length`.
- */
-case class Substring(
- str: Expression,
- begin: Expression,
- length: Expression) extends Expression with InputTypeSpec {
-
- def this(str: Expression, begin: Expression) = this(str, begin, CharLength(str))
-
- override private[flink] def children: Seq[Expression] = str :: begin :: length :: Nil
-
- override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
-
- override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
- Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
-
- override def toString: String = s"($str).substring($begin, $length)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.SUBSTRING, children.map(_.toRexNode))
- }
-}
-
-/**
- * Trim `trimString` from `str` according to `trimMode`.
- */
-case class Trim(
- trimMode: Expression,
- trimString: Expression,
- str: Expression) extends Expression {
-
- override private[flink] def children: Seq[Expression] = trimMode :: trimString :: str :: Nil
-
- override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- trimMode match {
- case SymbolExpression(_: TrimMode) =>
- if (trimString.resultType != STRING_TYPE_INFO) {
- ValidationFailure(s"String expected for trimString, get ${trimString.resultType}")
- } else if (str.resultType != STRING_TYPE_INFO) {
- ValidationFailure(s"String expected for str, get ${str.resultType}")
- } else {
- ValidationSuccess
- }
- case _ => ValidationFailure("TrimMode symbol expected.")
- }
- }
-
- override def toString: String = s"($str).trim($trimMode, $trimString)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.TRIM, children.map(_.toRexNode))
- }
-}
-
-/**
- * Enumeration of trim flags.
- */
-object TrimConstants {
- val TRIM_DEFAULT_CHAR = Literal(" ")
-}
-
-/**
- * Returns str with all characters changed to uppercase.
- */
-case class Upper(child: Expression) extends UnaryExpression with InputTypeSpec {
-
- override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
-
- override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
- Seq(STRING_TYPE_INFO)
-
- override def toString: String = s"($child).upperCase()"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.UPPER, child.toRexNode)
- }
-}
-
-/**
- * Returns the position of string needle in string haystack.
- */
-case class Position(needle: Expression, haystack: Expression)
- extends Expression with InputTypeSpec {
-
- override private[flink] def children: Seq[Expression] = Seq(needle, haystack)
-
- override private[flink] def resultType: TypeInformation[_] = INT_TYPE_INFO
-
- override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
- Seq(STRING_TYPE_INFO, STRING_TYPE_INFO)
-
- override def toString: String = s"($needle).position($haystack)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.POSITION, needle.toRexNode, haystack.toRexNode)
- }
-}
-
-/**
- * Replaces a substring of a string with a replacement string.
- * Starting at a position for a given length.
- */
-case class Overlay(
- str: Expression,
- replacement: Expression,
- starting: Expression,
- position: Expression)
- extends Expression with InputTypeSpec {
-
- def this(str: Expression, replacement: Expression, starting: Expression) =
- this(str, replacement, starting, CharLength(replacement))
-
- override private[flink] def children: Seq[Expression] =
- Seq(str, replacement, starting, position)
-
- override private[flink] def resultType: TypeInformation[_] = STRING_TYPE_INFO
-
- override private[flink] def expectedTypes: Seq[TypeInformation[_]] =
- Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO)
-
- override def toString: String = s"($str).overlay($replacement, $starting, $position)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(
- SqlStdOperatorTable.OVERLAY,
- str.toRexNode,
- replacement.toRexNode,
- starting.toRexNode,
- position.toRexNode)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/symbols.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/symbols.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/symbols.scala
deleted file mode 100644
index dfa8820..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/symbols.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlTrimFunction
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import scala.language.{existentials, implicitConversions}
-
-/**
- * General expression class to represent a symbol.
- */
-case class SymbolExpression(symbol: TableSymbol) extends LeafExpression {
-
- override private[flink] def resultType: TypeInformation[_] =
- throw new UnsupportedOperationException("This should not happen. A symbol has no result type.")
-
- def toExpr = this // triggers implicit conversion
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- // dirty hack to pass Java enums to Java from Scala
- val enum = symbol.enum.asInstanceOf[Enum[T] forSome { type T <: Enum[T] }]
- relBuilder.getRexBuilder.makeFlag(enum)
- }
-
- override def toString: String = s"${symbol.symbols}.${symbol.name}"
-
-}
-
-/**
- * Symbol that wraps a Calcite symbol in form of a Java enum.
- */
-trait TableSymbol {
- def symbols: TableSymbols
- def name: String
- def enum: Enum[_]
-}
-
-/**
- * Enumeration of symbols.
- */
-abstract class TableSymbols extends Enumeration {
-
- class TableSymbolValue(e: Enum[_]) extends Val(e.name()) with TableSymbol {
- override def symbols: TableSymbols = TableSymbols.this
-
- override def enum: Enum[_] = e
-
- override def name: String = toString()
- }
-
- protected final def Value(enum: Enum[_]): TableSymbolValue = new TableSymbolValue(enum)
-
- implicit def symbolToExpression(symbol: TableSymbolValue): SymbolExpression =
- SymbolExpression(symbol)
-
-}
-
-/**
- * Units for working with time intervals.
- */
-object TimeIntervalUnit extends TableSymbols {
-
- type TimeIntervalUnit = TableSymbolValue
-
- val YEAR = Value(TimeUnitRange.YEAR)
- val YEAR_TO_MONTH = Value(TimeUnitRange.YEAR_TO_MONTH)
- val MONTH = Value(TimeUnitRange.MONTH)
- val DAY = Value(TimeUnitRange.DAY)
- val DAY_TO_HOUR = Value(TimeUnitRange.DAY_TO_HOUR)
- val DAY_TO_MINUTE = Value(TimeUnitRange.DAY_TO_MINUTE)
- val DAY_TO_SECOND = Value(TimeUnitRange.DAY_TO_SECOND)
- val HOUR = Value(TimeUnitRange.HOUR)
- val HOUR_TO_MINUTE = Value(TimeUnitRange.HOUR_TO_MINUTE)
- val HOUR_TO_SECOND = Value(TimeUnitRange.HOUR_TO_SECOND)
- val MINUTE = Value(TimeUnitRange.MINUTE)
- val MINUTE_TO_SECOND = Value(TimeUnitRange.MINUTE_TO_SECOND)
- val SECOND = Value(TimeUnitRange.SECOND)
-
-}
-
-/**
- * Units for working with time points.
- */
-object TimePointUnit extends TableSymbols {
-
- type TimePointUnit = TableSymbolValue
-
- val YEAR = Value(TimeUnit.YEAR)
- val MONTH = Value(TimeUnit.MONTH)
- val DAY = Value(TimeUnit.DAY)
- val HOUR = Value(TimeUnit.HOUR)
- val MINUTE = Value(TimeUnit.MINUTE)
- val SECOND = Value(TimeUnit.SECOND)
- val QUARTER = Value(TimeUnit.QUARTER)
- val WEEK = Value(TimeUnit.WEEK)
- val MILLISECOND = Value(TimeUnit.MILLISECOND)
- val MICROSECOND = Value(TimeUnit.MICROSECOND)
-
-}
-
-/**
- * Modes for trimming strings.
- */
-object TrimMode extends TableSymbols {
-
- type TrimMode = TableSymbolValue
-
- val BOTH = Value(SqlTrimFunction.Flag.BOTH)
- val LEADING = Value(SqlTrimFunction.Flag.LEADING)
- val TRAILING = Value(SqlTrimFunction.Flag.TRAILING)
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
deleted file mode 100644
index cd5ca0a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/time.scala
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.avatica.util.{TimeUnit, TimeUnitRange}
-import org.apache.calcite.rex._
-import org.apache.calcite.sql.`type`.SqlTypeName
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.table.FlinkRelBuilder
-import org.apache.flink.api.table.expressions.ExpressionUtils.{divide, getFactor, mod}
-import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
-import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
-import org.apache.flink.api.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils}
-import org.apache.flink.api.table.validate.{ValidationResult, ValidationFailure, ValidationSuccess}
-
-import scala.collection.JavaConversions._
-
-case class Extract(timeIntervalUnit: Expression, temporal: Expression) extends Expression {
-
- override private[flink] def children: Seq[Expression] = timeIntervalUnit :: temporal :: Nil
-
- override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (!TypeCheckUtils.isTemporal(temporal.resultType)) {
- return ValidationFailure(s"Extract operator requires Temporal input, " +
- s"but $temporal is of type ${temporal.resultType}")
- }
-
- timeIntervalUnit match {
- case SymbolExpression(TimeIntervalUnit.YEAR)
- | SymbolExpression(TimeIntervalUnit.MONTH)
- | SymbolExpression(TimeIntervalUnit.DAY)
- if temporal.resultType == SqlTimeTypeInfo.DATE
- || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
- || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS
- || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MONTHS =>
- ValidationSuccess
-
- case SymbolExpression(TimeIntervalUnit.HOUR)
- | SymbolExpression(TimeIntervalUnit.MINUTE)
- | SymbolExpression(TimeIntervalUnit.SECOND)
- if temporal.resultType == SqlTimeTypeInfo.TIME
- || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP
- || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS =>
- ValidationSuccess
-
- case _ =>
- ValidationFailure(s"Extract operator does not support unit '$timeIntervalUnit' for input" +
- s" of type '${temporal.resultType}'.")
- }
- }
-
- override def toString: String = s"($temporal).extract($timeIntervalUnit)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- // get wrapped Calcite unit
- val timeUnitRange = timeIntervalUnit
- .asInstanceOf[SymbolExpression]
- .symbol
- .enum
- .asInstanceOf[TimeUnitRange]
-
- // convert RexNodes
- convertExtract(
- timeIntervalUnit.toRexNode,
- timeUnitRange,
- temporal.toRexNode,
- relBuilder.asInstanceOf[FlinkRelBuilder])
- }
-
- /**
- * Standard conversion of the EXTRACT operator.
- * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertExtract()]]
- */
- private def convertExtract(
- timeUnitRangeRexNode: RexNode,
- timeUnitRange: TimeUnitRange,
- temporal: RexNode,
- relBuilder: FlinkRelBuilder)
- : RexNode = {
-
- // TODO convert this into Table API expressions to make the code more readable
- val rexBuilder = relBuilder.getRexBuilder
- val resultType = relBuilder.getTypeFactory().createTypeFromTypeInfo(LONG_TYPE_INFO)
- var result = rexBuilder.makeReinterpretCast(
- resultType,
- temporal,
- rexBuilder.makeLiteral(false))
-
- val unit = timeUnitRange.startUnit
- val sqlTypeName = temporal.getType.getSqlTypeName
- unit match {
- case TimeUnit.YEAR | TimeUnit.MONTH | TimeUnit.DAY =>
- sqlTypeName match {
- case SqlTypeName.TIMESTAMP =>
- result = divide(rexBuilder, result, TimeUnit.DAY.multiplier)
- return rexBuilder.makeCall(
- resultType,
- SqlStdOperatorTable.EXTRACT_DATE,
- Seq(timeUnitRangeRexNode, result))
-
- case SqlTypeName.DATE =>
- return rexBuilder.makeCall(
- resultType,
- SqlStdOperatorTable.EXTRACT_DATE,
- Seq(timeUnitRangeRexNode, result))
-
- case _ => // do nothing
- }
-
- case _ => // do nothing
- }
-
- result = mod(rexBuilder, resultType, result, getFactor(unit))
- result = divide(rexBuilder, result, unit.multiplier)
- result
- }
-}
-
-abstract class TemporalCeilFloor(
- timeIntervalUnit: Expression,
- temporal: Expression)
- extends Expression {
-
- override private[flink] def children: Seq[Expression] = timeIntervalUnit :: temporal :: Nil
-
- override private[flink] def resultType: TypeInformation[_] = temporal.resultType
-
- override private[flink] def validateInput(): ValidationResult = {
- if (!TypeCheckUtils.isTimePoint(temporal.resultType)) {
- return ValidationFailure(s"Temporal ceil/floor operator requires Time Point input, " +
- s"but $temporal is of type ${temporal.resultType}")
- }
- val unit = timeIntervalUnit match {
- case SymbolExpression(u: TimeIntervalUnit) => Some(u)
- case _ => None
- }
- if (unit.isEmpty) {
- return ValidationFailure(s"Temporal ceil/floor operator requires Time Interval Unit " +
- s"input, but $timeIntervalUnit is of type ${timeIntervalUnit.resultType}")
- }
-
- (unit.get, temporal.resultType) match {
- case (TimeIntervalUnit.YEAR | TimeIntervalUnit.MONTH,
- SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP) =>
- ValidationSuccess
- case (TimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP) =>
- ValidationSuccess
- case (TimeIntervalUnit.HOUR | TimeIntervalUnit.MINUTE | TimeIntervalUnit.SECOND,
- SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP) =>
- ValidationSuccess
- case _ =>
- ValidationFailure(s"Temporal ceil/floor operator does not support " +
- s"unit '$timeIntervalUnit' for input of type '${temporal.resultType}'.")
- }
- }
-}
-
-case class TemporalFloor(
- timeIntervalUnit: Expression,
- temporal: Expression)
- extends TemporalCeilFloor(
- timeIntervalUnit,
- temporal) {
-
- override def toString: String = s"($temporal).floor($timeIntervalUnit)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.FLOOR, temporal.toRexNode, timeIntervalUnit.toRexNode)
- }
-}
-
-case class TemporalCeil(
- timeIntervalUnit: Expression,
- temporal: Expression)
- extends TemporalCeilFloor(
- timeIntervalUnit,
- temporal) {
-
- override def toString: String = s"($temporal).ceil($timeIntervalUnit)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- relBuilder.call(SqlStdOperatorTable.CEIL, temporal.toRexNode, timeIntervalUnit.toRexNode)
- }
-}
-
-abstract class CurrentTimePoint(
- targetType: TypeInformation[_],
- local: Boolean)
- extends LeafExpression {
-
- override private[flink] def resultType: TypeInformation[_] = targetType
-
- override private[flink] def validateInput(): ValidationResult = {
- if (!TypeCheckUtils.isTimePoint(targetType)) {
- ValidationFailure(s"CurrentTimePoint operator requires Time Point target type, " +
- s"but get $targetType.")
- } else if (local && targetType == SqlTimeTypeInfo.DATE) {
- ValidationFailure(s"Localized CurrentTimePoint operator requires Time or Timestamp target " +
- s"type, but get $targetType.")
- } else {
- ValidationSuccess
- }
- }
-
- override def toString: String = if (local) {
- s"local$targetType()"
- } else {
- s"current$targetType()"
- }
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- val operator = targetType match {
- case SqlTimeTypeInfo.TIME if local => SqlStdOperatorTable.LOCALTIME
- case SqlTimeTypeInfo.TIMESTAMP if local => SqlStdOperatorTable.LOCALTIMESTAMP
- case SqlTimeTypeInfo.DATE => SqlStdOperatorTable.CURRENT_DATE
- case SqlTimeTypeInfo.TIME => SqlStdOperatorTable.CURRENT_TIME
- case SqlTimeTypeInfo.TIMESTAMP => SqlStdOperatorTable.CURRENT_TIMESTAMP
- }
- relBuilder.call(operator)
- }
-}
-
-case class CurrentDate() extends CurrentTimePoint(SqlTimeTypeInfo.DATE, local = false)
-
-case class CurrentTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = false)
-
-case class CurrentTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = false)
-
-case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = true)
-
-case class LocalTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true)
-
-/**
- * Extracts the quarter of a year from a SQL date.
- */
-case class Quarter(child: Expression) extends UnaryExpression with InputTypeSpec {
-
- override private[flink] def expectedTypes: Seq[TypeInformation[_]] = Seq(SqlTimeTypeInfo.DATE)
-
- override private[flink] def resultType: TypeInformation[_] = LONG_TYPE_INFO
-
- override def toString: String = s"($child).quarter()"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- /**
- * Standard conversion of the QUARTER operator.
- * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertQuarter()]]
- */
- Plus(
- Div(
- Minus(
- Extract(TimeIntervalUnit.MONTH, child),
- Literal(1L)),
- Literal(TimeUnit.QUARTER.multiplier.longValue())),
- Literal(1L)
- ).toRexNode
- }
-}
-
-/**
- * Determines whether two anchored time intervals overlap.
- */
-case class TemporalOverlaps(
- leftTimePoint: Expression,
- leftTemporal: Expression,
- rightTimePoint: Expression,
- rightTemporal: Expression)
- extends Expression {
-
- override private[flink] def children: Seq[Expression] =
- Seq(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal)
-
- override private[flink] def resultType: TypeInformation[_] = BOOLEAN_TYPE_INFO
-
- override private[flink] def validateInput(): ValidationResult = {
- if (!TypeCheckUtils.isTimePoint(leftTimePoint.resultType)) {
- return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint to be of type " +
- s"Time Point, but get ${leftTimePoint.resultType}.")
- }
- if (!TypeCheckUtils.isTimePoint(rightTimePoint.resultType)) {
- return ValidationFailure(s"TemporalOverlaps operator requires rightTimePoint to be of " +
- s"type Time Point, but get ${rightTimePoint.resultType}.")
- }
- if (leftTimePoint.resultType != rightTimePoint.resultType) {
- return ValidationFailure(s"TemporalOverlaps operator requires leftTimePoint and " +
- s"rightTimePoint to be of same type.")
- }
-
- // leftTemporal is point, then it must be comparable with leftTimePoint
- if (TypeCheckUtils.isTimePoint(leftTemporal.resultType)) {
- if (leftTemporal.resultType != leftTimePoint.resultType) {
- return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal and " +
- s"leftTimePoint to be of same type if leftTemporal is of type Time Point.")
- }
- } else if (!isTimeInterval(leftTemporal.resultType)) {
- return ValidationFailure(s"TemporalOverlaps operator requires leftTemporal to be of " +
- s"type Time Point or Time Interval.")
- }
-
- // rightTemporal is point, then it must be comparable with rightTimePoint
- if (TypeCheckUtils.isTimePoint(rightTemporal.resultType)) {
- if (rightTemporal.resultType != rightTimePoint.resultType) {
- return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal and " +
- s"rightTimePoint to be of same type if rightTemporal is of type Time Point.")
- }
- } else if (!isTimeInterval(rightTemporal.resultType)) {
- return ValidationFailure(s"TemporalOverlaps operator requires rightTemporal to be of " +
- s"type Time Point or Time Interval.")
- }
- ValidationSuccess
- }
-
- override def toString: String = s"temporalOverlaps(${children.mkString(", ")})"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- convertOverlaps(
- leftTimePoint.toRexNode,
- leftTemporal.toRexNode,
- rightTimePoint.toRexNode,
- rightTemporal.toRexNode,
- relBuilder.asInstanceOf[FlinkRelBuilder])
- }
-
- /**
- * Standard conversion of the OVERLAPS operator.
- * Source: [[org.apache.calcite.sql2rel.StandardConvertletTable#convertOverlaps()]]
- */
- private def convertOverlaps(
- leftP: RexNode,
- leftT: RexNode,
- rightP: RexNode,
- rightT: RexNode,
- relBuilder: FlinkRelBuilder)
- : RexNode = {
- // leftT = leftP + leftT if leftT is an interval
- val convLeftT = if (isTimeInterval(leftTemporal.resultType)) {
- relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, leftP, leftT)
- } else {
- leftT
- }
- // rightT = rightP + rightT if rightT is an interval
- val convRightT = if (isTimeInterval(rightTemporal.resultType)) {
- relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, rightP, rightT)
- } else {
- rightT
- }
- // leftT >= rightP
- val leftPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, convLeftT, rightP)
- // rightT >= leftP
- val rightPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, convRightT, leftP)
-
- // leftT >= rightP and rightT >= leftP
- relBuilder.call(SqlStdOperatorTable.AND, leftPred, rightPred)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/windowProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/windowProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/windowProperties.scala
deleted file mode 100644
index 8386c46..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/windowProperties.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
-import org.apache.flink.api.table.FlinkRelBuilder.NamedWindowProperty
-import org.apache.flink.api.table.validate.{ValidationFailure, ValidationSuccess}
-
-abstract class WindowProperty(child: Expression) extends UnaryExpression {
-
- override def toString = s"WindowProperty($child)"
-
- override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
- throw new UnsupportedOperationException("WindowProperty cannot be transformed to RexNode.")
-
- override private[flink] def validateInput() =
- if (child.isInstanceOf[WindowReference]) {
- ValidationSuccess
- } else {
- ValidationFailure("Child must be a window reference.")
- }
-
- private[flink] def toNamedWindowProperty(name: String)(implicit relBuilder: RelBuilder)
- : NamedWindowProperty = NamedWindowProperty(name, this)
-}
-
-case class WindowStart(child: Expression) extends WindowProperty(child) {
-
- override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
-
- override def toString: String = s"start($child)"
-}
-
-case class WindowEnd(child: Expression) extends WindowProperty(child) {
-
- override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
-
- override def toString: String = s"end($child)"
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
deleted file mode 100644
index 2e16096..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.functions
-
-import org.apache.flink.api.common.functions.InvalidTypesException
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.ValidationException
-import org.apache.flink.api.table.expressions.{Expression, ScalarFunctionCall}
-
-/**
- * Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one,
- * or multiple scalar values to a new scalar value.
- *
- * The behavior of a [[ScalarFunction]] can be defined by implementing a custom evaluation
- * method. An evaluation method must be declared publicly and named "eval". Evaluation methods
- * can also be overloaded by implementing multiple methods named "eval".
- *
- * User-defined functions must have a default constructor and must be instantiable during runtime.
- *
- * By default the result type of an evaluation method is determined by Flink's type extraction
- * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
- * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type
- * can be manually defined by overriding [[getResultType()]].
- *
- * Internally, the Table/SQL API code generation works with primitive values as much as possible.
- * If a user-defined scalar function should not introduce much overhead during runtime, it is
- * recommended to declare parameters and result types as primitive types instead of their boxed
- * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
- */
-abstract class ScalarFunction extends UserDefinedFunction {
-
- /**
- * Creates a call to a [[ScalarFunction]] in Scala Table API.
- *
- * @param params actual parameters of function
- * @return [[Expression]] in form of a [[ScalarFunctionCall]]
- */
- final def apply(params: Expression*): Expression = {
- ScalarFunctionCall(this, params)
- }
-
- override def toString: String = getClass.getCanonicalName
-
- // ----------------------------------------------------------------------------------------------
-
- /**
- * Returns the result type of the evaluation method with a given signature.
- *
- * This method needs to be overriden in case Flink's type extraction facilities are not
- * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
- * method. Flink's type extraction facilities can handle basic types or
- * simple POJOs but might be wrong for more complex, custom, or composite types.
- *
- * @param signature signature of the method the return type needs to be determined
- * @return [[TypeInformation]] of result type or null if Flink should determine the type
- */
- def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null
-
- /**
- * Returns [[TypeInformation]] about the operands of the evaluation method with a given
- * signature.
- *
- * In order to perform operand type inference in SQL (especially when NULL is used) it might be
- * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
- * By default Flink's type extraction facilities are used for this but might be wrong for
- * more complex, custom, or composite types.
- *
- * @param signature signature of the method the operand types need to be determined
- * @return [[TypeInformation]] of operand types
- */
- def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
- signature.map { c =>
- try {
- TypeExtractor.getForClass(c)
- } catch {
- case ite: InvalidTypesException =>
- throw new ValidationException(
- s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +
- s"automatically determined. Please provide type information manually.")
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
deleted file mode 100644
index ca9aaf1..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.functions
-
-import java.util
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.expressions.{Expression, TableFunctionCall}
-
-/**
- * Base class for a user-defined table function (UDTF). A user-defined table functions works on
- * zero, one, or multiple scalar values as input and returns multiple rows as output.
- *
- * The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation
- * method. An evaluation method must be declared publicly, not static and named "eval".
- * Evaluation methods can also be overloaded by implementing multiple methods named "eval".
- *
- * User-defined functions must have a default constructor and must be instantiable during runtime.
- *
- * By default the result type of an evaluation method is determined by Flink's type extraction
- * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
- * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type
- * can be manually defined by overriding [[getResultType()]].
- *
- * Internally, the Table/SQL API code generation works with primitive values as much as possible.
- * If a user-defined table function should not introduce much overhead during runtime, it is
- * recommended to declare parameters and result types as primitive types instead of their boxed
- * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
- *
- * Example:
- *
- * {{{
- *
- * public class Split extends TableFunction<String> {
- *
- * // implement an "eval" method with as many parameters as you want
- * public void eval(String str) {
- * for (String s : str.split(" ")) {
- * collect(s); // use collect(...) to emit an output row
- * }
- * }
- *
- * // you can overload the eval method here ...
- * }
- *
- * val tEnv: TableEnvironment = ...
- * val table: Table = ... // schema: [a: String]
- *
- * // for Scala users
- * val split = new Split()
- * table.join(split('c) as ('s)).select('a, 's)
- *
- * // for Java users
- * tEnv.registerFunction("split", new Split()) // register table function first
- * table.join("split(a) as (s)").select("a, s")
- *
- * // for SQL users
- * tEnv.registerFunction("split", new Split()) // register table function first
- * tEnv.sql("SELECT a, s FROM MyTable, LATERAL TABLE(split(a)) as T(s)")
- *
- * }}}
- *
- * @tparam T The type of the output row
- */
-abstract class TableFunction[T] extends UserDefinedFunction {
-
- /**
- * Creates a call to a [[TableFunction]] in Scala Table API.
- *
- * @param params actual parameters of function
- * @return [[Expression]] in form of a [[TableFunctionCall]]
- */
- final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = {
- val resultType = if (getResultType == null) {
- typeInfo
- } else {
- getResultType
- }
- TableFunctionCall(getClass.getSimpleName, this, params, resultType)
- }
-
- override def toString: String = getClass.getCanonicalName
-
- // ----------------------------------------------------------------------------------------------
-
- private val rows: util.ArrayList[T] = new util.ArrayList[T]()
-
- /**
- * Emit an output row.
- *
- * @param row the output row
- */
- protected def collect(row: T): Unit = {
- // cache rows for now, maybe immediately process them further
- rows.add(row)
- }
-
- /**
- * Internal use. Get an iterator of the buffered rows.
- */
- def getRowsIterator = rows.iterator()
-
- /**
- * Internal use. Clear buffered rows.
- */
- def clear() = rows.clear()
-
- // ----------------------------------------------------------------------------------------------
-
- /**
- * Returns the result type of the evaluation method with a given signature.
- *
- * This method needs to be overriden in case Flink's type extraction facilities are not
- * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
- * method. Flink's type extraction facilities can handle basic types or
- * simple POJOs but might be wrong for more complex, custom, or composite types.
- *
- * @return [[TypeInformation]] of result type or null if Flink should determine the type
- */
- def getResultType: TypeInformation[T] = null
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/UserDefinedFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/UserDefinedFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/UserDefinedFunction.scala
deleted file mode 100644
index cdf6b07..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/UserDefinedFunction.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.functions
-
-/**
- * Base class for all user-defined functions such as scalar functions, table functions,
- * or aggregation functions.
- *
- * User-defined functions must have a default constructor and must be instantiable during runtime.
- */
-trait UserDefinedFunction {
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala
deleted file mode 100644
index 8a0fe65..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/MathFunctions.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.functions.utils
-
-import java.math.{BigDecimal => JBigDecimal}
-
-class MathFunctions {}
-
-object MathFunctions {
- def power(a: Double, b: JBigDecimal): Double = {
- Math.pow(a, b.doubleValue())
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
deleted file mode 100644
index 7953b25..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.functions.utils
-
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql._
-import org.apache.calcite.sql.`type`.SqlOperandTypeChecker.Consistency
-import org.apache.calcite.sql.`type`._
-import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.table.functions.utils.ScalarSqlFunction.{createOperandTypeChecker, createOperandTypeInference, createReturnTypeInference}
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils.{getResultType, getSignature, getSignatures, signatureToString, signaturesToString}
-import org.apache.flink.api.table.{FlinkTypeFactory, ValidationException}
-
-import scala.collection.JavaConverters._
-
-/**
- * Calcite wrapper for user-defined scalar functions.
- *
- * @param name function name (used by SQL parser)
- * @param scalarFunction scalar function to be called
- * @param typeFactory type factory for converting Flink's between Calcite's types
- */
-class ScalarSqlFunction(
- name: String,
- scalarFunction: ScalarFunction,
- typeFactory: FlinkTypeFactory)
- extends SqlFunction(
- new SqlIdentifier(name, SqlParserPos.ZERO),
- createReturnTypeInference(name, scalarFunction, typeFactory),
- createOperandTypeInference(scalarFunction, typeFactory),
- createOperandTypeChecker(name, scalarFunction),
- null,
- SqlFunctionCategory.USER_DEFINED_FUNCTION) {
-
- def getScalarFunction = scalarFunction
-
-}
-
-object ScalarSqlFunction {
-
- private[flink] def createReturnTypeInference(
- name: String,
- scalarFunction: ScalarFunction,
- typeFactory: FlinkTypeFactory)
- : SqlReturnTypeInference = {
- /**
- * Return type inference based on [[ScalarFunction]] given information.
- */
- new SqlReturnTypeInference {
- override def inferReturnType(opBinding: SqlOperatorBinding): RelDataType = {
- val parameters = opBinding
- .collectOperandTypes()
- .asScala
- .map { operandType =>
- if (operandType.getSqlTypeName == SqlTypeName.NULL) {
- null
- } else {
- FlinkTypeFactory.toTypeInfo(operandType)
- }
- }
- val foundSignature = getSignature(scalarFunction, parameters)
- if (foundSignature.isEmpty) {
- throw new ValidationException(
- s"Given parameters of function '$name' do not match any signature. \n" +
- s"Actual: ${signatureToString(parameters)} \n" +
- s"Expected: ${signaturesToString(scalarFunction)}")
- }
- val resultType = getResultType(scalarFunction, foundSignature.get)
- typeFactory.createTypeFromTypeInfo(resultType)
- }
- }
- }
-
- private[flink] def createOperandTypeInference(
- scalarFunction: ScalarFunction,
- typeFactory: FlinkTypeFactory)
- : SqlOperandTypeInference = {
- /**
- * Operand type inference based on [[ScalarFunction]] given information.
- */
- new SqlOperandTypeInference {
- override def inferOperandTypes(
- callBinding: SqlCallBinding,
- returnType: RelDataType,
- operandTypes: Array[RelDataType]): Unit = {
-
- val operandTypeInfo = getOperandTypeInfo(callBinding)
-
- val foundSignature = getSignature(scalarFunction, operandTypeInfo)
- .getOrElse(throw new ValidationException(s"Operand types of could not be inferred."))
-
- val inferredTypes = scalarFunction
- .getParameterTypes(foundSignature)
- .map(typeFactory.createTypeFromTypeInfo)
-
- inferredTypes.zipWithIndex.foreach {
- case (inferredType, i) =>
- operandTypes(i) = inferredType
- }
- }
- }
- }
-
- private[flink] def createOperandTypeChecker(
- name: String,
- scalarFunction: ScalarFunction)
- : SqlOperandTypeChecker = {
-
- val signatures = getSignatures(scalarFunction)
-
- /**
- * Operand type checker based on [[ScalarFunction]] given information.
- */
- new SqlOperandTypeChecker {
- override def getAllowedSignatures(op: SqlOperator, opName: String): String = {
- s"$opName[${signaturesToString(scalarFunction)}]"
- }
-
- override def getOperandCountRange: SqlOperandCountRange = {
- val signatureLengths = signatures.map(_.length)
- SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max)
- }
-
- override def checkOperandTypes(
- callBinding: SqlCallBinding,
- throwOnFailure: Boolean)
- : Boolean = {
- val operandTypeInfo = getOperandTypeInfo(callBinding)
-
- val foundSignature = getSignature(scalarFunction, operandTypeInfo)
-
- if (foundSignature.isEmpty) {
- if (throwOnFailure) {
- throw new ValidationException(
- s"Given parameters of function '$name' do not match any signature. \n" +
- s"Actual: ${signatureToString(operandTypeInfo)} \n" +
- s"Expected: ${signaturesToString(scalarFunction)}")
- } else {
- false
- }
- } else {
- true
- }
- }
-
- override def isOptional(i: Int): Boolean = false
-
- override def getConsistency: Consistency = Consistency.NONE
-
- }
- }
-
- private[flink] def getOperandTypeInfo(callBinding: SqlCallBinding): Seq[TypeInformation[_]] = {
- val operandTypes = for (i <- 0 until callBinding.getOperandCount)
- yield callBinding.getOperandType(i)
- operandTypes.map { operandType =>
- if (operandType.getSqlTypeName == SqlTypeName.NULL) {
- null
- } else {
- FlinkTypeFactory.toTypeInfo(operandType)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
deleted file mode 100644
index 738238d..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.functions.utils
-
-import com.google.common.base.Predicate
-import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.sql._
-import org.apache.calcite.sql.`type`._
-import org.apache.calcite.sql.parser.SqlParserPos
-import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction
-import org.apache.calcite.util.Util
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.FlinkTypeFactory
-import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
-
-import scala.collection.JavaConverters._
-import java.util
-
-/**
- * Calcite wrapper for user-defined table functions.
- */
-class TableSqlFunction(
- name: String,
- udtf: TableFunction[_],
- rowTypeInfo: TypeInformation[_],
- returnTypeInference: SqlReturnTypeInference,
- operandTypeInference: SqlOperandTypeInference,
- operandTypeChecker: SqlOperandTypeChecker,
- paramTypes: util.List[RelDataType],
- functionImpl: FlinkTableFunctionImpl[_])
- extends SqlUserDefinedTableFunction(
- new SqlIdentifier(name, SqlParserPos.ZERO),
- returnTypeInference,
- operandTypeInference,
- operandTypeChecker,
- paramTypes,
- functionImpl) {
-
- /**
- * Get the user-defined table function.
- */
- def getTableFunction = udtf
-
- /**
- * Get the type information of the table returned by the table function.
- */
- def getRowTypeInfo = rowTypeInfo
-
- /**
- * Get additional mapping information if the returned table type is a POJO
- * (POJO types have no deterministic field order).
- */
- def getPojoFieldMapping = functionImpl.fieldIndexes
-
-}
-
-object TableSqlFunction {
-
- /**
- * Util function to create a [[TableSqlFunction]].
- *
- * @param name function name (used by SQL parser)
- * @param udtf user-defined table function to be called
- * @param rowTypeInfo the row type information generated by the table function
- * @param typeFactory type factory for converting Flink's between Calcite's types
- * @param functionImpl Calcite table function schema
- * @return [[TableSqlFunction]]
- */
- def apply(
- name: String,
- udtf: TableFunction[_],
- rowTypeInfo: TypeInformation[_],
- typeFactory: FlinkTypeFactory,
- functionImpl: FlinkTableFunctionImpl[_]): TableSqlFunction = {
-
- val argTypes: util.List[RelDataType] = new util.ArrayList[RelDataType]
- val typeFamilies: util.List[SqlTypeFamily] = new util.ArrayList[SqlTypeFamily]
- // derives operands' data types and type families
- functionImpl.getParameters.asScala.foreach{ o =>
- val relType: RelDataType = o.getType(typeFactory)
- argTypes.add(relType)
- typeFamilies.add(Util.first(relType.getSqlTypeName.getFamily, SqlTypeFamily.ANY))
- }
- // derives whether the 'input'th parameter of a method is optional.
- val optional: Predicate[Integer] = new Predicate[Integer]() {
- def apply(input: Integer): Boolean = {
- functionImpl.getParameters.get(input).isOptional
- }
- }
- // create type check for the operands
- val typeChecker: FamilyOperandTypeChecker = OperandTypes.family(typeFamilies, optional)
-
- new TableSqlFunction(
- name,
- udtf,
- rowTypeInfo,
- ReturnTypes.CURSOR,
- InferTypes.explicit(argTypes),
- typeChecker,
- argTypes,
- functionImpl)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
deleted file mode 100644
index 4899691..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.table.functions.utils
-
-import java.lang.reflect.{Method, Modifier}
-import java.sql.{Date, Time, Timestamp}
-
-import com.google.common.primitives.Primitives
-import org.apache.calcite.sql.SqlFunction
-import org.apache.flink.api.common.functions.InvalidTypesException
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.{FlinkTypeFactory, TableException, ValidationException}
-import org.apache.flink.api.table.functions.{ScalarFunction, TableFunction, UserDefinedFunction}
-import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
-import org.apache.flink.util.InstantiationUtil
-
-object UserDefinedFunctionUtils {
-
- /**
- * Instantiates a user-defined function.
- */
- def instantiate[T <: UserDefinedFunction](clazz: Class[T]): T = {
- val constructor = clazz.getDeclaredConstructor()
- constructor.setAccessible(true)
- constructor.newInstance()
- }
-
- /**
- * Checks if a user-defined function can be easily instantiated.
- */
- def checkForInstantiation(clazz: Class[_]): Unit = {
- if (!InstantiationUtil.isPublic(clazz)) {
- throw ValidationException("Function class is not public.")
- }
- else if (!InstantiationUtil.isProperClass(clazz)) {
- throw ValidationException("Function class is no proper class, it is either abstract," +
- " an interface, or a primitive type.")
- }
- else if (InstantiationUtil.isNonStaticInnerClass(clazz)) {
- throw ValidationException("The class is an inner class, but not statically accessible.")
- }
-
- // check for default constructor (can be private)
- clazz
- .getDeclaredConstructors
- .find(_.getParameterTypes.isEmpty)
- .getOrElse(throw ValidationException("Function class needs a default constructor."))
- }
-
- /**
- * Check whether this is a Scala object. It is forbidden to use [[TableFunction]] implemented
- * by a Scala object, since concurrent risks.
- */
- def checkNotSingleton(clazz: Class[_]): Unit = {
- // TODO it is not a good way to check singleton. Maybe improve it further.
- if (clazz.getFields.map(_.getName) contains "MODULE$") {
- throw new ValidationException(
- s"TableFunction implemented by class ${clazz.getCanonicalName} " +
- s"is a Scala object, it is forbidden since concurrent risks.")
- }
- }
-
- // ----------------------------------------------------------------------------------------------
- // Utilities for eval methods
- // ----------------------------------------------------------------------------------------------
-
- /**
- * Returns signatures matching the given signature of [[TypeInformation]].
- * Elements of the signature can be null (act as a wildcard).
- */
- def getSignature(
- function: UserDefinedFunction,
- signature: Seq[TypeInformation[_]])
- : Option[Array[Class[_]]] = {
- // We compare the raw Java classes not the TypeInformation.
- // TypeInformation does not matter during runtime (e.g. within a MapFunction).
- val actualSignature = typeInfoToClass(signature)
- val signatures = getSignatures(function)
-
- signatures
- // go over all signatures and find one matching actual signature
- .find { curSig =>
- // match parameters of signature to actual parameters
- actualSignature.length == curSig.length &&
- curSig.zipWithIndex.forall { case (clazz, i) =>
- parameterTypeEquals(actualSignature(i), clazz)
- }
- }
- }
-
- /**
- * Returns eval method matching the given signature of [[TypeInformation]].
- */
- def getEvalMethod(
- function: UserDefinedFunction,
- signature: Seq[TypeInformation[_]])
- : Option[Method] = {
- // We compare the raw Java classes not the TypeInformation.
- // TypeInformation does not matter during runtime (e.g. within a MapFunction).
- val actualSignature = typeInfoToClass(signature)
- val evalMethods = checkAndExtractEvalMethods(function)
-
- evalMethods
- // go over all eval methods and find one matching
- .find { cur =>
- val signatures = cur.getParameterTypes
- // match parameters of signature to actual parameters
- actualSignature.length == signatures.length &&
- signatures.zipWithIndex.forall { case (clazz, i) =>
- parameterTypeEquals(actualSignature(i), clazz)
- }
- }
- }
-
- /**
- * Extracts "eval" methods and throws a [[ValidationException]] if no implementation
- * can be found.
- */
- def checkAndExtractEvalMethods(function: UserDefinedFunction): Array[Method] = {
- val methods = function
- .getClass
- .getDeclaredMethods
- .filter { m =>
- val modifiers = m.getModifiers
- m.getName == "eval" &&
- Modifier.isPublic(modifiers) &&
- !Modifier.isAbstract(modifiers) &&
- !(function.isInstanceOf[TableFunction[_]] && Modifier.isStatic(modifiers))
- }
-
- if (methods.isEmpty) {
- throw new ValidationException(
- s"Function class '${function.getClass.getCanonicalName}' does not implement at least " +
- s"one method named 'eval' which is public, not abstract and " +
- s"(in case of table functions) not static.")
- } else {
- methods
- }
- }
-
- def getSignatures(function: UserDefinedFunction): Array[Array[Class[_]]] = {
- checkAndExtractEvalMethods(function).map(_.getParameterTypes)
- }
-
- // ----------------------------------------------------------------------------------------------
- // Utilities for SQL functions
- // ----------------------------------------------------------------------------------------------
-
- /**
- * Create [[SqlFunction]] for a [[ScalarFunction]]
- * @param name function name
- * @param function scalar function
- * @param typeFactory type factory
- * @return the ScalarSqlFunction
- */
- def createScalarSqlFunction(
- name: String,
- function: ScalarFunction,
- typeFactory: FlinkTypeFactory)
- : SqlFunction = {
- new ScalarSqlFunction(name, function, typeFactory)
- }
-
- /**
- * Create [[SqlFunction]]s for a [[TableFunction]]'s every eval method
- * @param name function name
- * @param tableFunction table function
- * @param resultType the type information of returned table
- * @param typeFactory type factory
- * @return the TableSqlFunction
- */
- def createTableSqlFunctions(
- name: String,
- tableFunction: TableFunction[_],
- resultType: TypeInformation[_],
- typeFactory: FlinkTypeFactory)
- : Seq[SqlFunction] = {
- val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType)
- val evalMethods = checkAndExtractEvalMethods(tableFunction)
-
- evalMethods.map { method =>
- val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames, method)
- TableSqlFunction(name, tableFunction, resultType, typeFactory, function)
- }
- }
-
- // ----------------------------------------------------------------------------------------------
- // Utilities for scalar functions
- // ----------------------------------------------------------------------------------------------
-
- /**
- * Internal method of [[ScalarFunction#getResultType()]] that does some pre-checking and uses
- * [[TypeExtractor]] as default return type inference.
- */
- def getResultType(
- function: ScalarFunction,
- signature: Array[Class[_]])
- : TypeInformation[_] = {
- // find method for signature
- val evalMethod = checkAndExtractEvalMethods(function)
- .find(m => signature.sameElements(m.getParameterTypes))
- .getOrElse(throw new ValidationException("Given signature is invalid."))
-
- val userDefinedTypeInfo = function.getResultType(signature)
- if (userDefinedTypeInfo != null) {
- userDefinedTypeInfo
- } else {
- try {
- TypeExtractor.getForClass(evalMethod.getReturnType)
- } catch {
- case ite: InvalidTypesException =>
- throw new ValidationException(
- s"Return type of scalar function '${function.getClass.getCanonicalName}' cannot be " +
- s"automatically determined. Please provide type information manually.")
- }
- }
- }
-
- /**
- * Returns the return type of the evaluation method matching the given signature.
- */
- def getResultTypeClass(
- function: ScalarFunction,
- signature: Array[Class[_]])
- : Class[_] = {
- // find method for signature
- val evalMethod = checkAndExtractEvalMethods(function)
- .find(m => signature.sameElements(m.getParameterTypes))
- .getOrElse(throw new IllegalArgumentException("Given signature is invalid."))
- evalMethod.getReturnType
- }
-
- // ----------------------------------------------------------------------------------------------
- // Miscellaneous
- // ----------------------------------------------------------------------------------------------
-
- /**
- * Returns field names and field positions for a given [[TypeInformation]].
- *
- * Field names are automatically extracted for
- * [[org.apache.flink.api.common.typeutils.CompositeType]].
- *
- * @param inputType The TypeInformation to extract the field names and positions from.
- * @return A tuple of two arrays holding the field names and corresponding field positions.
- */
- def getFieldInfo(inputType: TypeInformation[_])
- : (Array[String], Array[Int], Array[TypeInformation[_]]) = {
-
- val fieldNames: Array[String] = inputType match {
- case t: CompositeType[_] => t.getFieldNames
- case a: AtomicType[_] => Array("f0")
- case tpe =>
- throw new TableException(s"Currently only CompositeType and AtomicType are supported. " +
- s"Type $tpe lacks explicit field naming")
- }
- val fieldIndexes = fieldNames.indices.toArray
- val fieldTypes: Array[TypeInformation[_]] = fieldNames.map { i =>
- inputType match {
- case t: CompositeType[_] => t.getTypeAt(i).asInstanceOf[TypeInformation[_]]
- case a: AtomicType[_] => a.asInstanceOf[TypeInformation[_]]
- case tpe =>
- throw new TableException(s"Currently only CompositeType and AtomicType are supported.")
- }
- }
- (fieldNames, fieldIndexes, fieldTypes)
- }
-
- /**
- * Prints one signature consisting of classes.
- */
- def signatureToString(signature: Array[Class[_]]): String =
- signature.map { clazz =>
- if (clazz == null) {
- "null"
- } else {
- clazz.getCanonicalName
- }
- }.mkString("(", ", ", ")")
-
- /**
- * Prints one signature consisting of TypeInformation.
- */
- def signatureToString(signature: Seq[TypeInformation[_]]): String = {
- signatureToString(typeInfoToClass(signature))
- }
-
- /**
- * Prints all eval methods signatures of a class.
- */
- def signaturesToString(function: UserDefinedFunction): String = {
- getSignatures(function).map(signatureToString).mkString(", ")
- }
-
- /**
- * Extracts type classes of [[TypeInformation]] in a null-aware way.
- */
- private def typeInfoToClass(typeInfos: Seq[TypeInformation[_]]): Array[Class[_]] =
- typeInfos.map { typeInfo =>
- if (typeInfo == null) {
- null
- } else {
- typeInfo.getTypeClass
- }
- }.toArray
-
-
- /**
- * Compares parameter candidate classes with expected classes. If true, the parameters match.
- * Candidate can be null (acts as a wildcard).
- */
- private def parameterTypeEquals(candidate: Class[_], expected: Class[_]): Boolean =
- candidate == null ||
- candidate == expected ||
- expected.isPrimitive && Primitives.wrap(expected) == candidate ||
- candidate == classOf[Date] && expected == classOf[Int] ||
- candidate == classOf[Time] && expected == classOf[Int] ||
- candidate == classOf[Timestamp] && expected == classOf[Long]
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
deleted file mode 100644
index bdcb22c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/package.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api
-
-/**
- * == Table API ==
- *
- * This package contains the generic part of the Table API. It can be used with Flink Streaming
- * and Flink Batch. From Scala as well as from Java.
- *
- * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from
- * a DataSet or DataStream. On this relational operations can be performed. A table can also
- * be converted back to a DataSet or DataStream.
- *
- * Packages [[org.apache.flink.api.scala.table]] and [[org.apache.flink.api.java.table]] contain
- * the language specific part of the API. Refer to these packages for documentation on how
- * the Table API can be used in Java and Scala.
- */
-package object table