You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:47:07 UTC
[38/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
deleted file mode 100644
index d41e9a7..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FloorCeilCallGen.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import java.lang.reflect.Method
-
-import org.apache.calcite.avatica.util.TimeUnitRange
-import org.apache.calcite.avatica.util.TimeUnitRange.{MONTH, YEAR}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{BIG_DEC_TYPE_INFO, DOUBLE_TYPE_INFO, FLOAT_TYPE_INFO}
-import org.apache.flink.api.table.codegen.CodeGenUtils.{getEnum, primitiveTypeTermForTypeInfo, qualifyMethod}
-import org.apache.flink.api.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
-
-/**
- * Generates floor/ceil function calls.
- */
-class FloorCeilCallGen(
- arithmeticMethod: Method,
- temporalMethod: Option[Method] = None)
- extends MultiTypeMethodCallGen(arithmeticMethod) {
-
- override def generate(
- codeGenerator: CodeGenerator,
- operands: Seq[GeneratedExpression])
- : GeneratedExpression = operands.size match {
- // arithmetic
- case 1 =>
- operands.head.resultType match {
- case FLOAT_TYPE_INFO | DOUBLE_TYPE_INFO | BIG_DEC_TYPE_INFO =>
- super.generate(codeGenerator, operands)
- case _ =>
- operands.head // no floor/ceil necessary
- }
-
- // temporal
- case 2 =>
- val operand = operands.head
- val unit = getEnum(operands(1)).asInstanceOf[TimeUnitRange]
- val internalType = primitiveTypeTermForTypeInfo(operand.resultType)
-
- generateCallIfArgsNotNull(codeGenerator.nullCheck, operand.resultType, operands) {
- (terms) =>
- unit match {
- case YEAR | MONTH =>
- s"""
- |($internalType) ${qualifyMethod(temporalMethod.get)}(${terms(1)}, ${terms.head})
- |""".stripMargin
- case _ =>
- s"""
- |${qualifyMethod(arithmeticMethod)}(
- | ($internalType) ${terms.head},
- | ($internalType) ${unit.startUnit.multiplier.intValue()})
- |""".stripMargin
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FunctionGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FunctionGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FunctionGenerator.scala
deleted file mode 100644
index 9b144ba..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/FunctionGenerator.scala
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import java.lang.reflect.Method
-
-import org.apache.calcite.avatica.util.TimeUnitRange
-import org.apache.calcite.sql.SqlOperator
-import org.apache.calcite.sql.fun.SqlStdOperatorTable._
-import org.apache.calcite.sql.fun.SqlTrimFunction
-import org.apache.calcite.util.BuiltInMethod
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.GenericTypeInfo
-import org.apache.flink.api.table.functions.utils.{TableSqlFunction, ScalarSqlFunction}
-
-import scala.collection.mutable
-
-/**
- * Global hub for user-defined and built-in advanced SQL functions.
- */
-object FunctionGenerator {
-
- private val sqlFunctions: mutable.Map[(SqlOperator, Seq[TypeInformation[_]]), CallGenerator] =
- mutable.Map()
-
- // ----------------------------------------------------------------------------------------------
- // String functions
- // ----------------------------------------------------------------------------------------------
-
- addSqlFunctionMethod(
- SUBSTRING,
- Seq(STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO),
- STRING_TYPE_INFO,
- BuiltInMethod.SUBSTRING.method)
-
- addSqlFunctionMethod(
- SUBSTRING,
- Seq(STRING_TYPE_INFO, INT_TYPE_INFO),
- STRING_TYPE_INFO,
- BuiltInMethod.SUBSTRING.method)
-
- addSqlFunction(
- TRIM,
- Seq(new GenericTypeInfo(classOf[SqlTrimFunction.Flag]), STRING_TYPE_INFO, STRING_TYPE_INFO),
- new TrimCallGen())
-
- addSqlFunctionMethod(
- CHAR_LENGTH,
- Seq(STRING_TYPE_INFO),
- INT_TYPE_INFO,
- BuiltInMethod.CHAR_LENGTH.method)
-
- addSqlFunctionMethod(
- CHARACTER_LENGTH,
- Seq(STRING_TYPE_INFO),
- INT_TYPE_INFO,
- BuiltInMethod.CHAR_LENGTH.method)
-
- addSqlFunctionMethod(
- UPPER,
- Seq(STRING_TYPE_INFO),
- STRING_TYPE_INFO,
- BuiltInMethod.UPPER.method)
-
- addSqlFunctionMethod(
- LOWER,
- Seq(STRING_TYPE_INFO),
- STRING_TYPE_INFO,
- BuiltInMethod.LOWER.method)
-
- addSqlFunctionMethod(
- INITCAP,
- Seq(STRING_TYPE_INFO),
- STRING_TYPE_INFO,
- BuiltInMethod.INITCAP.method)
-
- addSqlFunctionMethod(
- LIKE,
- Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
- BOOLEAN_TYPE_INFO,
- BuiltInMethod.LIKE.method)
-
- addSqlFunctionMethod(
- LIKE,
- Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
- BOOLEAN_TYPE_INFO,
- BuiltInMethods.LIKE_WITH_ESCAPE)
-
- addSqlFunctionNotMethod(
- NOT_LIKE,
- Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
- BuiltInMethod.LIKE.method)
-
- addSqlFunctionMethod(
- SIMILAR_TO,
- Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
- BOOLEAN_TYPE_INFO,
- BuiltInMethod.SIMILAR.method)
-
- addSqlFunctionMethod(
- SIMILAR_TO,
- Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
- BOOLEAN_TYPE_INFO,
- BuiltInMethods.SIMILAR_WITH_ESCAPE)
-
- addSqlFunctionNotMethod(
- NOT_SIMILAR_TO,
- Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
- BuiltInMethod.SIMILAR.method)
-
- addSqlFunctionMethod(
- POSITION,
- Seq(STRING_TYPE_INFO, STRING_TYPE_INFO),
- INT_TYPE_INFO,
- BuiltInMethod.POSITION.method)
-
- addSqlFunctionMethod(
- OVERLAY,
- Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO),
- STRING_TYPE_INFO,
- BuiltInMethod.OVERLAY.method)
-
- addSqlFunctionMethod(
- OVERLAY,
- Seq(STRING_TYPE_INFO, STRING_TYPE_INFO, INT_TYPE_INFO, INT_TYPE_INFO),
- STRING_TYPE_INFO,
- BuiltInMethod.OVERLAY.method)
-
- // ----------------------------------------------------------------------------------------------
- // Arithmetic functions
- // ----------------------------------------------------------------------------------------------
-
- addSqlFunctionMethod(
- LOG10,
- Seq(DOUBLE_TYPE_INFO),
- DOUBLE_TYPE_INFO,
- BuiltInMethods.LOG10)
-
- addSqlFunctionMethod(
- LN,
- Seq(DOUBLE_TYPE_INFO),
- DOUBLE_TYPE_INFO,
- BuiltInMethods.LN)
-
- addSqlFunctionMethod(
- EXP,
- Seq(DOUBLE_TYPE_INFO),
- DOUBLE_TYPE_INFO,
- BuiltInMethods.EXP)
-
- addSqlFunctionMethod(
- POWER,
- Seq(DOUBLE_TYPE_INFO, DOUBLE_TYPE_INFO),
- DOUBLE_TYPE_INFO,
- BuiltInMethods.POWER)
-
- addSqlFunctionMethod(
- POWER,
- Seq(DOUBLE_TYPE_INFO, BIG_DEC_TYPE_INFO),
- DOUBLE_TYPE_INFO,
- BuiltInMethods.POWER_DEC)
-
- addSqlFunction(
- ABS,
- Seq(DOUBLE_TYPE_INFO),
- new MultiTypeMethodCallGen(BuiltInMethods.ABS))
-
- addSqlFunction(
- ABS,
- Seq(BIG_DEC_TYPE_INFO),
- new MultiTypeMethodCallGen(BuiltInMethods.ABS_DEC))
-
- addSqlFunction(
- FLOOR,
- Seq(DOUBLE_TYPE_INFO),
- new FloorCeilCallGen(BuiltInMethod.FLOOR.method))
-
- addSqlFunction(
- FLOOR,
- Seq(BIG_DEC_TYPE_INFO),
- new FloorCeilCallGen(BuiltInMethod.FLOOR.method))
-
- addSqlFunction(
- CEIL,
- Seq(DOUBLE_TYPE_INFO),
- new FloorCeilCallGen(BuiltInMethod.CEIL.method))
-
- addSqlFunction(
- CEIL,
- Seq(BIG_DEC_TYPE_INFO),
- new FloorCeilCallGen(BuiltInMethod.CEIL.method))
-
- // ----------------------------------------------------------------------------------------------
- // Temporal functions
- // ----------------------------------------------------------------------------------------------
-
- addSqlFunctionMethod(
- EXTRACT_DATE,
- Seq(new GenericTypeInfo(classOf[TimeUnitRange]), LONG_TYPE_INFO),
- LONG_TYPE_INFO,
- BuiltInMethod.UNIX_DATE_EXTRACT.method)
-
- addSqlFunctionMethod(
- EXTRACT_DATE,
- Seq(new GenericTypeInfo(classOf[TimeUnitRange]), SqlTimeTypeInfo.DATE),
- LONG_TYPE_INFO,
- BuiltInMethod.UNIX_DATE_EXTRACT.method)
-
- addSqlFunction(
- FLOOR,
- Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
- new FloorCeilCallGen(
- BuiltInMethod.FLOOR.method,
- Some(BuiltInMethod.UNIX_DATE_FLOOR.method)))
-
- addSqlFunction(
- FLOOR,
- Seq(SqlTimeTypeInfo.TIME, new GenericTypeInfo(classOf[TimeUnitRange])),
- new FloorCeilCallGen(
- BuiltInMethod.FLOOR.method,
- Some(BuiltInMethod.UNIX_DATE_FLOOR.method)))
-
- addSqlFunction(
- FLOOR,
- Seq(SqlTimeTypeInfo.TIMESTAMP, new GenericTypeInfo(classOf[TimeUnitRange])),
- new FloorCeilCallGen(
- BuiltInMethod.FLOOR.method,
- Some(BuiltInMethod.UNIX_TIMESTAMP_FLOOR.method)))
-
- addSqlFunction(
- CEIL,
- Seq(SqlTimeTypeInfo.DATE, new GenericTypeInfo(classOf[TimeUnitRange])),
- new FloorCeilCallGen(
- BuiltInMethod.CEIL.method,
- Some(BuiltInMethod.UNIX_DATE_CEIL.method)))
-
- addSqlFunction(
- CEIL,
- Seq(SqlTimeTypeInfo.TIME, new GenericTypeInfo(classOf[TimeUnitRange])),
- new FloorCeilCallGen(
- BuiltInMethod.CEIL.method,
- Some(BuiltInMethod.UNIX_DATE_CEIL.method)))
-
- addSqlFunction(
- CEIL,
- Seq(SqlTimeTypeInfo.TIMESTAMP, new GenericTypeInfo(classOf[TimeUnitRange])),
- new FloorCeilCallGen(
- BuiltInMethod.CEIL.method,
- Some(BuiltInMethod.UNIX_TIMESTAMP_CEIL.method)))
-
- addSqlFunction(
- CURRENT_DATE,
- Seq(),
- new CurrentTimePointCallGen(SqlTimeTypeInfo.DATE, local = false))
-
- addSqlFunction(
- CURRENT_TIME,
- Seq(),
- new CurrentTimePointCallGen(SqlTimeTypeInfo.TIME, local = false))
-
- addSqlFunction(
- CURRENT_TIMESTAMP,
- Seq(),
- new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = false))
-
- addSqlFunction(
- LOCALTIME,
- Seq(),
- new CurrentTimePointCallGen(SqlTimeTypeInfo.TIME, local = true))
-
- addSqlFunction(
- LOCALTIMESTAMP,
- Seq(),
- new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
-
- // ----------------------------------------------------------------------------------------------
-
- /**
- * Returns a [[CallGenerator]] that generates all required code for calling the given
- * [[SqlOperator]].
- *
- * @param sqlOperator SQL operator (might be overloaded)
- * @param operandTypes actual operand types
- * @param resultType expected return type
- * @return [[CallGenerator]]
- */
- def getCallGenerator(
- sqlOperator: SqlOperator,
- operandTypes: Seq[TypeInformation[_]],
- resultType: TypeInformation[_])
- : Option[CallGenerator] = sqlOperator match {
-
- // user-defined scalar function
- case ssf: ScalarSqlFunction =>
- Some(
- new ScalarFunctionCallGen(
- ssf.getScalarFunction,
- operandTypes,
- resultType
- )
- )
-
- // user-defined table function
- case tsf: TableSqlFunction =>
- Some(
- new TableFunctionCallGen(
- tsf.getTableFunction,
- operandTypes,
- resultType
- )
- )
-
- // built-in scalar function
- case _ =>
- sqlFunctions.get((sqlOperator, operandTypes))
- .orElse(sqlFunctions.find(entry => entry._1._1 == sqlOperator
- && entry._1._2.length == operandTypes.length
- && entry._1._2.zip(operandTypes).forall {
- case (x: BasicTypeInfo[_], y: BasicTypeInfo[_]) => y.shouldAutocastTo(x) || x == y
- case _ => false
- }).map(_._2))
- }
-
- // ----------------------------------------------------------------------------------------------
-
- private def addSqlFunctionMethod(
- sqlOperator: SqlOperator,
- operandTypes: Seq[TypeInformation[_]],
- returnType: TypeInformation[_],
- method: Method)
- : Unit = {
- sqlFunctions((sqlOperator, operandTypes)) = new MethodCallGen(returnType, method)
- }
-
- private def addSqlFunctionNotMethod(
- sqlOperator: SqlOperator,
- operandTypes: Seq[TypeInformation[_]],
- method: Method)
- : Unit = {
- sqlFunctions((sqlOperator, operandTypes)) =
- new NotCallGenerator(new MethodCallGen(BOOLEAN_TYPE_INFO, method))
- }
-
- private def addSqlFunction(
- sqlOperator: SqlOperator,
- operandTypes: Seq[TypeInformation[_]],
- callGenerator: CallGenerator)
- : Unit = {
- sqlFunctions((sqlOperator, operandTypes)) = callGenerator
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGen.scala
deleted file mode 100644
index 376f54a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MethodCallGen.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import java.lang.reflect.Method
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.codegen.CodeGenUtils.qualifyMethod
-import org.apache.flink.api.table.codegen.calls.CallGenerator.generateCallIfArgsNotNull
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
-
-/**
- * Generates a function call by using a [[java.lang.reflect.Method]].
- */
-class MethodCallGen(returnType: TypeInformation[_], method: Method) extends CallGenerator {
-
- override def generate(
- codeGenerator: CodeGenerator,
- operands: Seq[GeneratedExpression])
- : GeneratedExpression = {
- generateCallIfArgsNotNull(codeGenerator.nullCheck, returnType, operands) {
- (terms) =>
- s"""
- |${qualifyMethod(method)}(${terms.mkString(", ")})
- |""".stripMargin
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MultiTypeMethodCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MultiTypeMethodCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MultiTypeMethodCallGen.scala
deleted file mode 100644
index e9e8f18..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/MultiTypeMethodCallGen.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import java.lang.reflect.Method
-
-import org.apache.flink.api.table.codegen.calls.CallGenerator._
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
-
-/**
- * Generates a function call that calls a method which returns the same type that it
- * takes as first argument.
- */
-class MultiTypeMethodCallGen(method: Method) extends CallGenerator {
-
- override def generate(
- codeGenerator: CodeGenerator,
- operands: Seq[GeneratedExpression])
- : GeneratedExpression = {
- generateCallIfArgsNotNull(codeGenerator.nullCheck, operands.head.resultType, operands) {
- (operandResultTerms) =>
- s"""
- |${method.getDeclaringClass.getCanonicalName}.
- | ${method.getName}(${operandResultTerms.mkString(", ")})
- """.stripMargin
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/NotCallGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/NotCallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/NotCallGenerator.scala
deleted file mode 100644
index 5cd358a..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/NotCallGenerator.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import org.apache.flink.api.table.codegen.calls.ScalarOperators.generateNot
-import org.apache.flink.api.table.codegen.{GeneratedExpression, CodeGenerator}
-
-/**
- * Inverts the boolean value of a CallGenerator result.
- */
-class NotCallGenerator(callGenerator: CallGenerator) extends CallGenerator {
-
- override def generate(
- codeGenerator: CodeGenerator,
- operands: Seq[GeneratedExpression])
- : GeneratedExpression = {
- val expr = callGenerator.generate(codeGenerator, operands)
- generateNot(codeGenerator.nullCheck, expr)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctionCallGen.scala
deleted file mode 100644
index b6ef8ad..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctionCallGen.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.codegen.CodeGenUtils._
-import org.apache.flink.api.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
-import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
-
-/**
- * Generates a call to user-defined [[ScalarFunction]].
- *
- * @param scalarFunction user-defined [[ScalarFunction]] that might be overloaded
- * @param signature actual signature with which the function is called
- * @param returnType actual return type required by the surrounding
- */
-class ScalarFunctionCallGen(
- scalarFunction: ScalarFunction,
- signature: Seq[TypeInformation[_]],
- returnType: TypeInformation[_])
- extends CallGenerator {
-
- override def generate(
- codeGenerator: CodeGenerator,
- operands: Seq[GeneratedExpression])
- : GeneratedExpression = {
- // determine function signature and result class
- val matchingSignature = getSignature(scalarFunction, signature)
- .getOrElse(throw new CodeGenException("No matching signature found."))
- val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
-
- // convert parameters for function (output boxing)
- val parameters = matchingSignature
- .zip(operands)
- .map { case (paramClass, operandExpr) =>
- if (paramClass.isPrimitive) {
- operandExpr
- } else {
- val boxedTypeTerm = boxedTypeTermForTypeInfo(operandExpr.resultType)
- val boxedExpr = codeGenerator.generateOutputFieldBoxing(operandExpr)
- val exprOrNull: String = if (codeGenerator.nullCheck) {
- s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
- } else {
- boxedExpr.resultTerm
- }
- boxedExpr.copy(resultTerm = exprOrNull)
- }
- }
-
- // generate function call
- val functionReference = codeGenerator.addReusableFunction(scalarFunction)
- val resultTypeTerm = if (resultClass.isPrimitive) {
- primitiveTypeTermForTypeInfo(returnType)
- } else {
- boxedTypeTermForTypeInfo(returnType)
- }
- val resultTerm = newName("result")
- val functionCallCode =
- s"""
- |${parameters.map(_.code).mkString("\n")}
- |$resultTypeTerm $resultTerm = $functionReference.eval(
- | ${parameters.map(_.resultTerm).mkString(", ")});
- |""".stripMargin
-
- // convert result of function to internal representation (input unboxing)
- val resultUnboxing = if (resultClass.isPrimitive) {
- codeGenerator.generateNonNullLiteral(returnType, resultTerm)
- } else {
- codeGenerator.generateInputFieldUnboxing(returnType, resultTerm)
- }
- resultUnboxing.copy(code =
- s"""
- |$functionCallCode
- |${resultUnboxing.code}
- |""".stripMargin
- )
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
deleted file mode 100644
index 330e2fe..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
+++ /dev/null
@@ -1,1025 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.codegen.calls
-
-import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
-import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
-import org.apache.calcite.util.BuiltInMethod
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.api.table.codegen.CodeGenUtils._
-import org.apache.flink.api.table.codegen.{CodeGenerator, CodeGenException, GeneratedExpression}
-import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo
-import org.apache.flink.api.table.typeutils.TypeCheckUtils._
-
-object ScalarOperators {
-
- def generateStringConcatOperator(
- nullCheck: Boolean,
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- generateOperatorIfNotNull(nullCheck, STRING_TYPE_INFO, left, right) {
- (leftTerm, rightTerm) => s"$leftTerm + $rightTerm"
- }
- }
-
- def generateArithmeticOperator(
- operator: String,
- nullCheck: Boolean,
- resultType: TypeInformation[_],
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- val leftCasting = numericCasting(left.resultType, resultType)
- val rightCasting = numericCasting(right.resultType, resultType)
- val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-
- generateOperatorIfNotNull(nullCheck, resultType, left, right) {
- (leftTerm, rightTerm) =>
- if (isDecimal(resultType)) {
- s"${leftCasting(leftTerm)}.${arithOpToDecMethod(operator)}(${rightCasting(rightTerm)})"
- } else {
- s"($resultTypeTerm) (${leftCasting(leftTerm)} $operator ${rightCasting(rightTerm)})"
- }
- }
- }
-
- def generateUnaryArithmeticOperator(
- operator: String,
- nullCheck: Boolean,
- resultType: TypeInformation[_],
- operand: GeneratedExpression)
- : GeneratedExpression = {
- generateUnaryOperatorIfNotNull(nullCheck, resultType, operand) {
- (operandTerm) =>
- if (isDecimal(operand.resultType) && operator == "-") {
- s"$operandTerm.negate()"
- } else if (isDecimal(operand.resultType) && operator == "+") {
- s"$operandTerm"
- } else {
- s"$operator($operandTerm)"
- }
- }
- }
-
- def generateEquals(
- nullCheck: Boolean,
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- // numeric types
- if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
- generateComparison("==", nullCheck, left, right)
- }
- // temporal types
- else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
- generateComparison("==", nullCheck, left, right)
- }
- // array types
- else if (isArray(left.resultType) && left.resultType == right.resultType) {
- generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
- (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, $rightTerm)"
- }
- }
- // comparable types of same type
- else if (isComparable(left.resultType) && left.resultType == right.resultType) {
- generateComparison("==", nullCheck, left, right)
- }
- // non comparable types
- else {
- generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
- if (isReference(left)) {
- (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)"
- }
- else if (isReference(right)) {
- (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)"
- }
- else {
- throw new CodeGenException(s"Incomparable types: ${left.resultType} and " +
- s"${right.resultType}")
- }
- }
- }
- }
-
- def generateNotEquals(
- nullCheck: Boolean,
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- // numeric types
- if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
- generateComparison("!=", nullCheck, left, right)
- }
- // temporal types
- else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
- generateComparison("!=", nullCheck, left, right)
- }
- // array types
- else if (isArray(left.resultType) && left.resultType == right.resultType) {
- generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
- (leftTerm, rightTerm) => s"!java.util.Arrays.equals($leftTerm, $rightTerm)"
- }
- }
- // comparable types
- else if (isComparable(left.resultType) && left.resultType == right.resultType) {
- generateComparison("!=", nullCheck, left, right)
- }
- // non-comparable types
- else {
- generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
- if (isReference(left)) {
- (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))"
- }
- else if (isReference(right)) {
- (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))"
- }
- else {
- throw new CodeGenException(s"Incomparable types: ${left.resultType} and " +
- s"${right.resultType}")
- }
- }
- }
- }
-
- /**
- * Generates comparison code for numeric types and comparable types of same type.
- */
- def generateComparison(
- operator: String,
- nullCheck: Boolean,
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
- // left is decimal or both sides are decimal
- if (isDecimal(left.resultType) && isNumeric(right.resultType)) {
- (leftTerm, rightTerm) => {
- val operandCasting = numericCasting(right.resultType, left.resultType)
- s"$leftTerm.compareTo(${operandCasting(rightTerm)}) $operator 0"
- }
- }
- // right is decimal
- else if (isNumeric(left.resultType) && isDecimal(right.resultType)) {
- (leftTerm, rightTerm) => {
- val operandCasting = numericCasting(left.resultType, right.resultType)
- s"${operandCasting(leftTerm)}.compareTo($rightTerm) $operator 0"
- }
- }
- // both sides are numeric
- else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
- (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
- }
- // both sides are temporal of same type
- else if (isTemporal(left.resultType) && left.resultType == right.resultType) {
- (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
- }
- // both sides are boolean
- else if (isBoolean(left.resultType) && left.resultType == right.resultType) {
- operator match {
- case "==" | "!=" => (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
- case _ => throw new CodeGenException(s"Unsupported boolean comparison '$operator'.")
- }
- }
- // both sides are same comparable type
- else if (isComparable(left.resultType) && left.resultType == right.resultType) {
- (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0"
- }
- else {
- throw new CodeGenException(s"Incomparable types: ${left.resultType} and " +
- s"${right.resultType}")
- }
- }
- }
-
- def generateIsNull(
- nullCheck: Boolean,
- operand: GeneratedExpression)
- : GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
- val operatorCode = if (nullCheck) {
- s"""
- |${operand.code}
- |boolean $resultTerm = ${operand.nullTerm};
- |boolean $nullTerm = false;
- |""".stripMargin
- }
- else if (!nullCheck && isReference(operand)) {
- s"""
- |${operand.code}
- |boolean $resultTerm = ${operand.resultTerm} == null;
- |boolean $nullTerm = false;
- |""".stripMargin
- }
- else {
- s"""
- |${operand.code}
- |boolean $resultTerm = false;
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
- }
-
- def generateIsNotNull(
- nullCheck: Boolean,
- operand: GeneratedExpression)
- : GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
- val operatorCode = if (nullCheck) {
- s"""
- |${operand.code}
- |boolean $resultTerm = !${operand.nullTerm};
- |boolean $nullTerm = false;
- |""".stripMargin
- }
- else if (!nullCheck && isReference(operand)) {
- s"""
- |${operand.code}
- |boolean $resultTerm = ${operand.resultTerm} != null;
- |boolean $nullTerm = false;
- |""".stripMargin
- }
- else {
- s"""
- |${operand.code}
- |boolean $resultTerm = true;
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
- }
-
- def generateAnd(
- nullCheck: Boolean,
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
-
- val operatorCode = if (nullCheck) {
- // Three-valued logic:
- // no Unknown -> Two-valued logic
- // True && Unknown -> Unknown
- // False && Unknown -> False
- // Unknown && True -> Unknown
- // Unknown && False -> False
- // Unknown && Unknown -> Unknown
- s"""
- |${left.code}
- |${right.code}
- |boolean $resultTerm;
- |boolean $nullTerm;
- |if (!${left.nullTerm} && !${right.nullTerm}) {
- | $resultTerm = ${left.resultTerm} && ${right.resultTerm};
- | $nullTerm = false;
- |}
- |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
- | $resultTerm = false;
- | $nullTerm = true;
- |}
- |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
- | $resultTerm = false;
- | $nullTerm = false;
- |}
- |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
- | $resultTerm = false;
- | $nullTerm = true;
- |}
- |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
- | $resultTerm = false;
- | $nullTerm = false;
- |}
- |else {
- | $resultTerm = false;
- | $nullTerm = true;
- |}
- |""".stripMargin
- }
- else {
- s"""
- |${left.code}
- |${right.code}
- |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
- }
-
- def generateOr(
- nullCheck: Boolean,
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
-
- val operatorCode = if (nullCheck) {
- // Three-valued logic:
- // no Unknown -> Two-valued logic
- // True && Unknown -> True
- // False && Unknown -> Unknown
- // Unknown && True -> True
- // Unknown && False -> Unknown
- // Unknown && Unknown -> Unknown
- s"""
- |${left.code}
- |${right.code}
- |boolean $resultTerm;
- |boolean $nullTerm;
- |if (!${left.nullTerm} && !${right.nullTerm}) {
- | $resultTerm = ${left.resultTerm} || ${right.resultTerm};
- | $nullTerm = false;
- |}
- |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) {
- | $resultTerm = true;
- | $nullTerm = false;
- |}
- |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) {
- | $resultTerm = false;
- | $nullTerm = true;
- |}
- |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) {
- | $resultTerm = true;
- | $nullTerm = false;
- |}
- |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) {
- | $resultTerm = false;
- | $nullTerm = true;
- |}
- |else {
- | $resultTerm = false;
- | $nullTerm = true;
- |}
- |""".stripMargin
- }
- else {
- s"""
- |${left.code}
- |${right.code}
- |boolean $resultTerm = ${left.resultTerm} || ${right.resultTerm};
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO)
- }
-
- def generateNot(
- nullCheck: Boolean,
- operand: GeneratedExpression)
- : GeneratedExpression = {
- // Three-valued logic:
- // no Unknown -> Two-valued logic
- // Unknown -> Unknown
- generateUnaryOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, operand) {
- (operandTerm) => s"!($operandTerm)"
- }
- }
-
- def generateIsTrue(operand: GeneratedExpression): GeneratedExpression = {
- GeneratedExpression(
- operand.resultTerm, // unknown is always false by default
- GeneratedExpression.NEVER_NULL,
- operand.code,
- BOOLEAN_TYPE_INFO)
- }
-
- def generateIsNotTrue(operand: GeneratedExpression): GeneratedExpression = {
- GeneratedExpression(
- s"(!${operand.resultTerm})", // unknown is always false by default
- GeneratedExpression.NEVER_NULL,
- operand.code,
- BOOLEAN_TYPE_INFO)
- }
-
- def generateIsFalse(operand: GeneratedExpression): GeneratedExpression = {
- GeneratedExpression(
- s"(!${operand.resultTerm} && !${operand.nullTerm})",
- GeneratedExpression.NEVER_NULL,
- operand.code,
- BOOLEAN_TYPE_INFO)
- }
-
- def generateIsNotFalse(operand: GeneratedExpression): GeneratedExpression = {
- GeneratedExpression(
- s"(${operand.resultTerm} || ${operand.nullTerm})",
- GeneratedExpression.NEVER_NULL,
- operand.code,
- BOOLEAN_TYPE_INFO)
- }
-
- def generateCast(
- nullCheck: Boolean,
- operand: GeneratedExpression,
- targetType: TypeInformation[_])
- : GeneratedExpression = (operand.resultType, targetType) match {
- // identity casting
- case (fromTp, toTp) if fromTp == toTp =>
- operand
-
- // Date/Time/Timestamp -> String
- case (dtt: SqlTimeTypeInfo[_], STRING_TYPE_INFO) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"${internalToTimePointCode(dtt, operandTerm)}.toString()"
- }
-
- // Interval Months -> String
- case (TimeIntervalTypeInfo.INTERVAL_MONTHS, STRING_TYPE_INFO) =>
- val method = qualifyMethod(BuiltInMethod.INTERVAL_YEAR_MONTH_TO_STRING.method)
- val timeUnitRange = qualifyEnum(TimeUnitRange.YEAR_TO_MONTH)
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"$method($operandTerm, $timeUnitRange)"
- }
-
- // Interval Millis -> String
- case (TimeIntervalTypeInfo.INTERVAL_MILLIS, STRING_TYPE_INFO) =>
- val method = qualifyMethod(BuiltInMethod.INTERVAL_DAY_TIME_TO_STRING.method)
- val timeUnitRange = qualifyEnum(TimeUnitRange.DAY_TO_SECOND)
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"$method($operandTerm, $timeUnitRange, 3)" // milli second precision
- }
-
- // Object array -> String
- case (_:ObjectArrayTypeInfo[_, _], STRING_TYPE_INFO) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"java.util.Arrays.deepToString($operandTerm)"
- }
-
- // Primitive array -> String
- case (_:PrimitiveArrayTypeInfo[_], STRING_TYPE_INFO) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"java.util.Arrays.toString($operandTerm)"
- }
-
- // * (not Date/Time/Timestamp) -> String
- case (_, STRING_TYPE_INFO) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s""" "" + $operandTerm"""
- }
-
- // * -> Character
- case (_, CHAR_TYPE_INFO) =>
- throw new CodeGenException("Character type not supported.")
-
- // String -> NUMERIC TYPE (not Character), Boolean
- case (STRING_TYPE_INFO, _: NumericTypeInfo[_])
- | (STRING_TYPE_INFO, BOOLEAN_TYPE_INFO) =>
- val wrapperClass = targetType.getTypeClass.getCanonicalName
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"$wrapperClass.valueOf($operandTerm)"
- }
-
- // String -> BigDecimal
- case (STRING_TYPE_INFO, BIG_DEC_TYPE_INFO) =>
- val wrapperClass = targetType.getTypeClass.getCanonicalName
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"new $wrapperClass($operandTerm)"
- }
-
- // String -> Date
- case (STRING_TYPE_INFO, SqlTimeTypeInfo.DATE) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_DATE.method)}($operandTerm)"
- }
-
- // String -> Time
- case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIME) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_TIME.method)}($operandTerm)"
- }
-
- // String -> Timestamp
- case (STRING_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"${qualifyMethod(BuiltInMethod.STRING_TO_TIMESTAMP.method)}" +
- s"($operandTerm)"
- }
-
- // Boolean -> NUMERIC TYPE
- case (BOOLEAN_TYPE_INFO, nti: NumericTypeInfo[_]) =>
- val targetTypeTerm = primitiveTypeTermForTypeInfo(nti)
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"($targetTypeTerm) ($operandTerm ? 1 : 0)"
- }
-
- // Boolean -> BigDecimal
- case (BOOLEAN_TYPE_INFO, BIG_DEC_TYPE_INFO) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"$operandTerm ? java.math.BigDecimal.ONE : java.math.BigDecimal.ZERO"
- }
-
- // NUMERIC TYPE -> Boolean
- case (_: NumericTypeInfo[_], BOOLEAN_TYPE_INFO) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"$operandTerm != 0"
- }
-
- // BigDecimal -> Boolean
- case (BIG_DEC_TYPE_INFO, BOOLEAN_TYPE_INFO) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"$operandTerm.compareTo(java.math.BigDecimal.ZERO) != 0"
- }
-
- // NUMERIC TYPE, BigDecimal -> NUMERIC TYPE, BigDecimal
- case (_: NumericTypeInfo[_], _: NumericTypeInfo[_])
- | (BIG_DEC_TYPE_INFO, _: NumericTypeInfo[_])
- | (_: NumericTypeInfo[_], BIG_DEC_TYPE_INFO) =>
- val operandCasting = numericCasting(operand.resultType, targetType)
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"${operandCasting(operandTerm)}"
- }
-
- // Date -> Timestamp
- case (SqlTimeTypeInfo.DATE, SqlTimeTypeInfo.TIMESTAMP) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) =>
- s"$operandTerm * ${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY"
- }
-
- // Timestamp -> Date
- case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.DATE) =>
- val targetTypeTerm = primitiveTypeTermForTypeInfo(targetType)
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) =>
- s"($targetTypeTerm) ($operandTerm / " +
- s"${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY)"
- }
-
- // Time -> Timestamp
- case (SqlTimeTypeInfo.TIME, SqlTimeTypeInfo.TIMESTAMP) =>
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) => s"$operandTerm"
- }
-
- // Timestamp -> Time
- case (SqlTimeTypeInfo.TIMESTAMP, SqlTimeTypeInfo.TIME) =>
- val targetTypeTerm = primitiveTypeTermForTypeInfo(targetType)
- generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
- (operandTerm) =>
- s"($targetTypeTerm) ($operandTerm % " +
- s"${classOf[DateTimeUtils].getCanonicalName}.MILLIS_PER_DAY)"
- }
-
- // internal temporal casting
- // Date -> Integer
- // Time -> Integer
- // Timestamp -> Long
- // Integer -> Date
- // Integer -> Time
- // Long -> Timestamp
- // Integer -> Interval Months
- // Long -> Interval Millis
- // Interval Months -> Integer
- // Interval Millis -> Long
- case (SqlTimeTypeInfo.DATE, INT_TYPE_INFO) |
- (SqlTimeTypeInfo.TIME, INT_TYPE_INFO) |
- (SqlTimeTypeInfo.TIMESTAMP, LONG_TYPE_INFO) |
- (INT_TYPE_INFO, SqlTimeTypeInfo.DATE) |
- (INT_TYPE_INFO, SqlTimeTypeInfo.TIME) |
- (LONG_TYPE_INFO, SqlTimeTypeInfo.TIMESTAMP) |
- (INT_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MONTHS) |
- (LONG_TYPE_INFO, TimeIntervalTypeInfo.INTERVAL_MILLIS) |
- (TimeIntervalTypeInfo.INTERVAL_MONTHS, INT_TYPE_INFO) |
- (TimeIntervalTypeInfo.INTERVAL_MILLIS, LONG_TYPE_INFO) =>
- internalExprCasting(operand, targetType)
-
- // internal reinterpretation of temporal types
- // Date, Time, Interval Months -> Long
- case (SqlTimeTypeInfo.DATE, LONG_TYPE_INFO)
- | (SqlTimeTypeInfo.TIME, LONG_TYPE_INFO)
- | (TimeIntervalTypeInfo.INTERVAL_MONTHS, LONG_TYPE_INFO) =>
- internalExprCasting(operand, targetType)
-
- case (from, to) =>
- throw new CodeGenException(s"Unsupported cast from '$from' to '$to'.")
- }
-
- def generateIfElse(
- nullCheck: Boolean,
- operands: Seq[GeneratedExpression],
- resultType: TypeInformation[_],
- i: Int = 0)
- : GeneratedExpression = {
- // else part
- if (i == operands.size - 1) {
- generateCast(nullCheck, operands(i), resultType)
- }
- else {
- // check that the condition is boolean
- // we do not check for null instead we use the default value
- // thus null is false
- requireBoolean(operands(i))
- val condition = operands(i)
- val trueAction = generateCast(nullCheck, operands(i + 1), resultType)
- val falseAction = generateIfElse(nullCheck, operands, resultType, i + 2)
-
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
- val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
-
- val operatorCode = if (nullCheck) {
- s"""
- |${condition.code}
- |$resultTypeTerm $resultTerm;
- |boolean $nullTerm;
- |if (${condition.resultTerm}) {
- | ${trueAction.code}
- | $resultTerm = ${trueAction.resultTerm};
- | $nullTerm = ${trueAction.nullTerm};
- |}
- |else {
- | ${falseAction.code}
- | $resultTerm = ${falseAction.resultTerm};
- | $nullTerm = ${falseAction.nullTerm};
- |}
- |""".stripMargin
- }
- else {
- s"""
- |${condition.code}
- |$resultTypeTerm $resultTerm;
- |if (${condition.resultTerm}) {
- | ${trueAction.code}
- | $resultTerm = ${trueAction.resultTerm};
- |}
- |else {
- | ${falseAction.code}
- | $resultTerm = ${falseAction.resultTerm};
- |}
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
- }
- }
-
- def generateTemporalPlusMinus(
- plus: Boolean,
- nullCheck: Boolean,
- left: GeneratedExpression,
- right: GeneratedExpression)
- : GeneratedExpression = {
-
- val op = if (plus) "+" else "-"
-
- (left.resultType, right.resultType) match {
- case (l: TimeIntervalTypeInfo[_], r: TimeIntervalTypeInfo[_]) if l == r =>
- generateArithmeticOperator(op, nullCheck, l, left, right)
-
- case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
- generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) {
- (l, r) => s"$l $op ((int) ($r / ${MILLIS_PER_DAY}L))"
- }
-
- case (SqlTimeTypeInfo.DATE, TimeIntervalTypeInfo.INTERVAL_MONTHS) =>
- generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.DATE, left, right) {
- (l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, $op($r))"
- }
-
- case (SqlTimeTypeInfo.TIME, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
- generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIME, left, right) {
- (l, r) => s"$l $op ((int) ($r))"
- }
-
- case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MILLIS) =>
- generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left, right) {
- (l, r) => s"$l $op $r"
- }
-
- case (SqlTimeTypeInfo.TIMESTAMP, TimeIntervalTypeInfo.INTERVAL_MONTHS) =>
- generateOperatorIfNotNull(nullCheck, SqlTimeTypeInfo.TIMESTAMP, left, right) {
- (l, r) => s"${qualifyMethod(BuiltInMethod.ADD_MONTHS.method)}($l, $op($r))"
- }
-
- case _ =>
- throw new CodeGenException("Unsupported temporal arithmetic.")
- }
- }
-
- def generateUnaryIntervalPlusMinus(
- plus: Boolean,
- nullCheck: Boolean,
- operand: GeneratedExpression)
- : GeneratedExpression = {
- val operator = if (plus) "+" else "-"
- generateUnaryArithmeticOperator(operator, nullCheck, operand.resultType, operand)
- }
-
- def generateArray(
- codeGenerator: CodeGenerator,
- resultType: TypeInformation[_],
- elements: Seq[GeneratedExpression])
- : GeneratedExpression = {
- val arrayTerm = codeGenerator.addReusableArray(resultType.getTypeClass, elements.size)
-
- val boxedElements: Seq[GeneratedExpression] = resultType match {
-
- case oati: ObjectArrayTypeInfo[_, _] =>
- // we box the elements to also represent null values
- val boxedTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo)
-
- elements.map { e =>
- val boxedExpr = codeGenerator.generateOutputFieldBoxing(e)
- val exprOrNull: String = if (codeGenerator.nullCheck) {
- s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
- } else {
- boxedExpr.resultTerm
- }
- boxedExpr.copy(resultTerm = exprOrNull)
- }
-
- // no boxing necessary
- case _: PrimitiveArrayTypeInfo[_] => elements
- }
-
- val code = boxedElements
- .zipWithIndex
- .map { case (element, idx) =>
- s"""
- |${element.code}
- |$arrayTerm[$idx] = ${element.resultTerm};
- |""".stripMargin
- }
- .mkString("\n")
-
- GeneratedExpression(arrayTerm, GeneratedExpression.NEVER_NULL, code, resultType)
- }
-
- def generateArrayElementAt(
- codeGenerator: CodeGenerator,
- array: GeneratedExpression,
- index: GeneratedExpression)
- : GeneratedExpression = {
-
- val resultTerm = newName("result")
-
- array.resultType match {
-
- // unbox object array types
- case oati: ObjectArrayTypeInfo[_, _] =>
- // get boxed array element
- val resultTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo)
-
- val arrayAccessCode = if (codeGenerator.nullCheck) {
- s"""
- |${array.code}
- |${index.code}
- |$resultTypeTerm $resultTerm = (${array.nullTerm} || ${index.nullTerm}) ?
- | null : ${array.resultTerm}[${index.resultTerm} - 1];
- |""".stripMargin
- } else {
- s"""
- |${array.code}
- |${index.code}
- |$resultTypeTerm $resultTerm = ${array.resultTerm}[${index.resultTerm} - 1];
- |""".stripMargin
- }
-
- // generate unbox code
- val unboxing = codeGenerator.generateInputFieldUnboxing(oati.getComponentInfo, resultTerm)
-
- unboxing.copy(code =
- s"""
- |$arrayAccessCode
- |${unboxing.code}
- |""".stripMargin
- )
-
- // no unboxing necessary
- case pati: PrimitiveArrayTypeInfo[_] =>
- generateOperatorIfNotNull(codeGenerator.nullCheck, pati.getComponentType, array, index) {
- (leftTerm, rightTerm) => s"$leftTerm[$rightTerm - 1]"
- }
- }
- }
-
- def generateArrayElement(
- codeGenerator: CodeGenerator,
- array: GeneratedExpression)
- : GeneratedExpression = {
-
- val nullTerm = newName("isNull")
- val resultTerm = newName("result")
- val resultType = array.resultType match {
- case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
- case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
- }
- val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
- val defaultValue = primitiveDefaultValue(resultType)
-
- val arrayLengthCode = if (codeGenerator.nullCheck) {
- s"${array.nullTerm} ? 0 : ${array.resultTerm}.length"
- } else {
- s"${array.resultTerm}.length"
- }
-
- val arrayAccessCode = array.resultType match {
- case oati: ObjectArrayTypeInfo[_, _] =>
- // generate unboxing code
- val unboxing = codeGenerator.generateInputFieldUnboxing(
- oati.getComponentInfo,
- s"${array.resultTerm}[0]")
-
- s"""
- |${array.code}
- |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" }
- |$resultTypeTerm $resultTerm;
- |switch ($arrayLengthCode) {
- | case 0:
- | ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" }
- | $resultTerm = $defaultValue;
- | break;
- | case 1:
- | ${unboxing.code}
- | ${if (codeGenerator.nullCheck) s"$nullTerm = ${unboxing.nullTerm};" else "" }
- | $resultTerm = ${unboxing.resultTerm};
- | break;
- | default:
- | throw new RuntimeException("Array has more than one element.");
- |}
- |""".stripMargin
-
- case pati: PrimitiveArrayTypeInfo[_] =>
- s"""
- |${array.code}
- |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" }
- |$resultTypeTerm $resultTerm;
- |switch ($arrayLengthCode) {
- | case 0:
- | ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" }
- | $resultTerm = $defaultValue;
- | break;
- | case 1:
- | ${if (codeGenerator.nullCheck) s"$nullTerm = false;" else "" }
- | $resultTerm = ${array.resultTerm}[0];
- | break;
- | default:
- | throw new RuntimeException("Array has more than one element.");
- |}
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, arrayAccessCode, resultType)
- }
-
- def generateArrayCardinality(
- nullCheck: Boolean,
- array: GeneratedExpression)
- : GeneratedExpression = {
-
- generateUnaryOperatorIfNotNull(nullCheck, INT_TYPE_INFO, array) {
- (operandTerm) => s"${array.resultTerm}.length"
- }
- }
-
- // ----------------------------------------------------------------------------------------------
-
- private def generateUnaryOperatorIfNotNull(
- nullCheck: Boolean,
- resultType: TypeInformation[_],
- operand: GeneratedExpression)
- (expr: (String) => String)
- : GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
- val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
- val defaultValue = primitiveDefaultValue(resultType)
-
- val operatorCode = if (nullCheck) {
- s"""
- |${operand.code}
- |$resultTypeTerm $resultTerm;
- |boolean $nullTerm;
- |if (!${operand.nullTerm}) {
- | $resultTerm = ${expr(operand.resultTerm)};
- | $nullTerm = false;
- |}
- |else {
- | $resultTerm = $defaultValue;
- | $nullTerm = true;
- |}
- |""".stripMargin
- }
- else {
- s"""
- |${operand.code}
- |$resultTypeTerm $resultTerm = ${expr(operand.resultTerm)};
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType)
- }
-
- private def generateOperatorIfNotNull(
- nullCheck: Boolean,
- resultType: TypeInformation[_],
- left: GeneratedExpression,
- right: GeneratedExpression)
- (expr: (String, String) => String)
- : GeneratedExpression = {
- val resultTerm = newName("result")
- val nullTerm = newName("isNull")
- val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
- val defaultValue = primitiveDefaultValue(resultType)
-
- val resultCode = if (nullCheck) {
- s"""
- |${left.code}
- |${right.code}
- |boolean $nullTerm = ${left.nullTerm} || ${right.nullTerm};
- |$resultTypeTerm $resultTerm;
- |if ($nullTerm) {
- | $resultTerm = $defaultValue;
- |}
- |else {
- | $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
- |}
- |""".stripMargin
- }
- else {
- s"""
- |${left.code}
- |${right.code}
- |$resultTypeTerm $resultTerm = ${expr(left.resultTerm, right.resultTerm)};
- |""".stripMargin
- }
-
- GeneratedExpression(resultTerm, nullTerm, resultCode, resultType)
- }
-
- private def internalExprCasting(
- expr: GeneratedExpression,
- typeInfo: TypeInformation[_])
- : GeneratedExpression = {
- GeneratedExpression(expr.resultTerm, expr.nullTerm, expr.code, typeInfo)
- }
-
- private def arithOpToDecMethod(operator: String): String = operator match {
- case "+" => "add"
- case "-" => "subtract"
- case "*" => "multiply"
- case "/" => "divide"
- case "%" => "remainder"
- case _ => throw new CodeGenException("Unsupported decimal arithmetic operator.")
- }
-
- private def numericCasting(
- operandType: TypeInformation[_],
- resultType: TypeInformation[_])
- : (String) => String = {
-
- def decToPrimMethod(targetType: TypeInformation[_]): String = targetType match {
- case BYTE_TYPE_INFO => "byteValueExact"
- case SHORT_TYPE_INFO => "shortValueExact"
- case INT_TYPE_INFO => "intValueExact"
- case LONG_TYPE_INFO => "longValueExact"
- case FLOAT_TYPE_INFO => "floatValue"
- case DOUBLE_TYPE_INFO => "doubleValue"
- case _ => throw new CodeGenException("Unsupported decimal casting type.")
- }
-
- val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
- // no casting necessary
- if (operandType == resultType) {
- (operandTerm) => s"$operandTerm"
- }
- // result type is decimal but numeric operand is not
- else if (isDecimal(resultType) && !isDecimal(operandType) && isNumeric(operandType)) {
- (operandTerm) =>
- s"java.math.BigDecimal.valueOf((${superPrimitive(operandType)}) $operandTerm)"
- }
- // numeric result type is not decimal but operand is
- else if (isNumeric(resultType) && !isDecimal(resultType) && isDecimal(operandType) ) {
- (operandTerm) => s"$operandTerm.${decToPrimMethod(resultType)}()"
- }
- // result type and operand type are numeric but not decimal
- else if (isNumeric(operandType) && isNumeric(resultType)
- && !isDecimal(operandType) && !isDecimal(resultType)) {
- (operandTerm) => s"(($resultTypeTerm) $operandTerm)"
- }
- else {
- throw new CodeGenException(s"Unsupported casting from $operandType to $resultType.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
deleted file mode 100644
index 37e70e4..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.codegen.CodeGenUtils._
-import org.apache.flink.api.table.codegen.GeneratedExpression.NEVER_NULL
-import org.apache.flink.api.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
-import org.apache.flink.api.table.functions.TableFunction
-import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
-
-/**
- * Generates a call to user-defined [[TableFunction]].
- *
- * @param tableFunction user-defined [[TableFunction]] that might be overloaded
- * @param signature actual signature with which the function is called
- * @param returnType actual return type required by the surrounding
- */
-class TableFunctionCallGen(
- tableFunction: TableFunction[_],
- signature: Seq[TypeInformation[_]],
- returnType: TypeInformation[_])
- extends CallGenerator {
-
- override def generate(
- codeGenerator: CodeGenerator,
- operands: Seq[GeneratedExpression])
- : GeneratedExpression = {
- // determine function signature
- val matchingSignature = getSignature(tableFunction, signature)
- .getOrElse(throw new CodeGenException("No matching signature found."))
-
- // convert parameters for function (output boxing)
- val parameters = matchingSignature
- .zip(operands)
- .map { case (paramClass, operandExpr) =>
- if (paramClass.isPrimitive) {
- operandExpr
- } else {
- val boxedTypeTerm = boxedTypeTermForTypeInfo(operandExpr.resultType)
- val boxedExpr = codeGenerator.generateOutputFieldBoxing(operandExpr)
- val exprOrNull: String = if (codeGenerator.nullCheck) {
- s"${boxedExpr.nullTerm} ? null : ($boxedTypeTerm) ${boxedExpr.resultTerm}"
- } else {
- boxedExpr.resultTerm
- }
- boxedExpr.copy(resultTerm = exprOrNull)
- }
- }
-
- // generate function call
- val functionReference = codeGenerator.addReusableFunction(tableFunction)
- val functionCallCode =
- s"""
- |${parameters.map(_.code).mkString("\n")}
- |$functionReference.clear();
- |$functionReference.eval(${parameters.map(_.resultTerm).mkString(", ")});
- |""".stripMargin
-
- // has no result
- GeneratedExpression(
- functionReference,
- NEVER_NULL,
- functionCallCode,
- returnType)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGen.scala
deleted file mode 100644
index 678016b..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGen.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen.calls
-
-import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, TRAILING}
-import org.apache.calcite.util.BuiltInMethod
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO
-import org.apache.flink.api.table.codegen.CodeGenUtils._
-import org.apache.flink.api.table.codegen.calls.CallGenerator._
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
-
-/**
- * Generates a TRIM function call.
- *
- * First operand: trim mode (see [[org.apache.calcite.sql.fun.SqlTrimFunction.Flag]])
- * Second operand: String to be removed
- * Third operand: String to be trimmed
- */
-class TrimCallGen extends CallGenerator {
-
- override def generate(
- codeGenerator: CodeGenerator,
- operands: Seq[GeneratedExpression])
- : GeneratedExpression = {
- generateCallIfArgsNotNull(codeGenerator.nullCheck, STRING_TYPE_INFO, operands) {
- (terms) =>
- val leading = compareEnum(terms.head, BOTH) || compareEnum(terms.head, LEADING)
- val trailing = compareEnum(terms.head, BOTH) || compareEnum(terms.head, TRAILING)
- s"""
- |${qualifyMethod(BuiltInMethod.TRIM.method)}(
- | $leading, $trailing, ${terms(1)}, ${terms(2)})
- |""".stripMargin
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
deleted file mode 100644
index bb52ad8..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/generated.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.codegen
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-/**
- * Describes a generated expression.
- *
- * @param resultTerm term to access the result of the expression
- * @param nullTerm boolean term that indicates if expression is null
- * @param code code necessary to produce resultTerm and nullTerm
- * @param resultType type of the resultTerm
- */
-case class GeneratedExpression(
- resultTerm: String,
- nullTerm: String,
- code: String,
- resultType: TypeInformation[_])
-
-object GeneratedExpression {
- val ALWAYS_NULL = "true"
- val NEVER_NULL = "false"
- val NO_CODE = ""
-}
-
-case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String)
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
deleted file mode 100644
index b69ac1c..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-package object codegen {
- // Used in ExpressionCodeGenerator because Scala 2.10 reflection is not thread safe. We might
- // have several parallel expression operators in one TaskManager, therefore we need to guard
- // these operations.
- object ReflectionLock
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
deleted file mode 100644
index 0a100dd..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/exceptions.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table
-
-/**
- * Exception for all errors occurring during expression parsing.
- */
-case class ExpressionParserException(msg: String) extends RuntimeException(msg)
-
-/**
- * Exception for all errors occurring during sql parsing.
- */
-case class SqlParserException(
- msg: String,
- cause: Throwable)
- extends RuntimeException(msg, cause) {
-
- def this(msg: String) = this(msg, null)
-
-}
-
-/**
- * General Exception for all errors during table handling.
- */
-case class TableException(
- msg: String,
- cause: Throwable)
- extends RuntimeException(msg, cause) {
-
- def this(msg: String) = this(msg, null)
-
-}
-
-object TableException {
- def apply(msg: String): TableException = new TableException(msg)
-}
-
-/**
- * Exception for all errors occurring during validation phase.
- */
-case class ValidationException(
- msg: String,
- cause: Throwable)
- extends RuntimeException(msg, cause) {
-
- def this(msg: String) = this(msg, null)
-
-}
-
-object ValidationException {
- def apply(msg: String): ValidationException = new ValidationException(msg)
-}
-
-/**
- * Exception for unwanted method calling on unresolved expression.
- */
-case class UnresolvedException(msg: String) extends RuntimeException(msg)
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
deleted file mode 100644
index c284bd3..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/Expression.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.tools.RelBuilder
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.trees.TreeNode
-import org.apache.flink.api.table.validate.{ValidationResult, ValidationSuccess}
-
-abstract class Expression extends TreeNode[Expression] {
- /**
- * Returns the [[TypeInformation]] for evaluating this expression.
- * It is sometimes not available until the expression is valid.
- */
- private[flink] def resultType: TypeInformation[_]
-
- /**
- * One pass validation of the expression tree in post order.
- */
- private[flink] lazy val valid: Boolean = childrenValid && validateInput().isSuccess
-
- private[flink] def childrenValid: Boolean = children.forall(_.valid)
-
- /**
- * Check input data types, inputs number or other properties specified by this expression.
- * Return `ValidationSuccess` if it pass the check,
- * or `ValidationFailure` with supplement message explaining the error.
- * Note: we should only call this method until `childrenValid == true`
- */
- private[flink] def validateInput(): ValidationResult = ValidationSuccess
-
- /**
- * Convert Expression to its counterpart in Calcite, i.e. RexNode
- */
- private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
- throw new UnsupportedOperationException(
- s"${this.getClass.getName} cannot be transformed to RexNode"
- )
-
- private[flink] def checkEquals(other: Expression): Boolean = {
- if (this.getClass != other.getClass) {
- false
- } else {
- def checkEquality(elements1: Seq[Any], elements2: Seq[Any]): Boolean = {
- elements1.length == elements2.length && elements1.zip(elements2).forall {
- case (e1: Expression, e2: Expression) => e1.checkEquals(e2)
- case (t1: Seq[_], t2: Seq[_]) => checkEquality(t1, t2)
- case (i1, i2) => i1 == i2
- }
- }
- val elements1 = this.productIterator.toSeq
- val elements2 = other.productIterator.toSeq
- checkEquality(elements1, elements2)
- }
- }
-}
-
-abstract class BinaryExpression extends Expression {
- private[flink] def left: Expression
- private[flink] def right: Expression
- private[flink] def children = Seq(left, right)
-}
-
-abstract class UnaryExpression extends Expression {
- private[flink] def child: Expression
- private[flink] def children = Seq(child)
-}
-
-abstract class LeafExpression extends Expression {
- private[flink] val children = Nil
-}