You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/16 14:52:55 UTC

[GitHub] twalthr closed pull request #3880: [FLINK-6457] [table] Clean up ScalarFunction and TableFunction interface

twalthr closed pull request #3880: [FLINK-6457] [table] Clean up ScalarFunction and TableFunction interface
URL: https://github.com/apache/flink/pull/3880
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 9f50f0c806a..1aab7ae0993 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -68,6 +68,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.mutable.HashMap
 import _root_.scala.annotation.varargs
+import _root_.scala.util.{Try, Success, Failure}
 
 /**
   * The abstract base class for batch and stream TableEnvironments.
@@ -340,10 +341,11 @@ abstract class TableEnvironment(val config: TableConfig) {
     // check if class could be instantiated
     checkForInstantiation(function.getClass)
 
-    val typeInfo: TypeInformation[_] = if (function.getResultType != null) {
-      function.getResultType
-    } else {
-      implicitly[TypeInformation[T]]
+    val typeInfo: TypeInformation[_] = Try {
+      function.getClass.getDeclaredMethod("getResultType")
+    } match {
+      case Success(m) => m.invoke(function).asInstanceOf[TypeInformation[_]]
+      case Failure(_) => implicitly[TypeInformation[T]]
     }
 
     // register in Table API
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala
new file mode 100644
index 00000000000..d781e76071d
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/ScalarFunctionConversions.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.table.expressions.{Expression, ScalarFunctionCall}
+import org.apache.flink.table.functions.ScalarFunction
+
+/**
+  * Holds methods to convert a [[ScalarFunction]] call
+  * in the Scala Table API into a [[ScalarFunctionCall]].
+  *
+  * @param sf The ScalarFunction to convert.
+  */
+class ScalarFunctionConversions(sf: ScalarFunction) {
+  /**
+    * Creates a [[ScalarFunctionCall]] in Scala Table API.
+    *
+    * @param params actual parameters of function
+    * @return [[Expression]] in form of a [[ScalarFunctionCall]]
+    */
+  final def apply(params: Expression*): Expression = {
+    ScalarFunctionCall(sf, params)
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
index 692876fe2da..292e241077a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
@@ -24,6 +24,8 @@ import org.apache.flink.table.expressions._
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
 
+import scala.util.{Failure, Success, Try}
+
 /**
   * Holds methods to convert a [[TableFunction]] call in the Scala Table API into a [[Table]].
   *
@@ -39,7 +41,12 @@ class TableFunctionConversions[T](tf: TableFunction[T]) {
     */
   final def apply(args: Expression*)(implicit typeInfo: TypeInformation[T]): Table = {
 
-    val resultType = if (tf.getResultType == null) typeInfo else tf.getResultType
+    val resultType: TypeInformation[_] = Try {
+      tf.getClass.getDeclaredMethod("getResultType")
+    } match {
+      case Success(m) => m.invoke(tf).asInstanceOf[TypeInformation[_]]
+      case Failure(_) => typeInfo
+    }
 
     new Table(
       tableEnv = null, // Table environment will be set later.
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
index e8a201777fa..b9f3aa56bee 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala.DataSet
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
 import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
-import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
 
 import _root_.scala.language.implicitConversions
 
@@ -93,4 +93,8 @@ package object scala extends ImplicitExpressionConversions {
   implicit def tableFunctionCall2Table[T](tf: TableFunction[T]): TableFunctionConversions[T] = {
     new TableFunctionConversions[T](tf)
   }
+
+  implicit def scalarFunction2Call(sf: ScalarFunction): ScalarFunctionConversions = {
+    new ScalarFunctionConversions(sf)
+  }
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala
index d01cf68478c..9093fe0582a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/ScalarFunction.scala
@@ -18,11 +18,7 @@
 
 package org.apache.flink.table.functions
 
-import org.apache.flink.api.common.functions.InvalidTypesException
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.expressions.{Expression, ScalarFunctionCall}
 
 /**
   * Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one,
@@ -37,64 +33,13 @@ import org.apache.flink.table.expressions.{Expression, ScalarFunctionCall}
   * By default the result type of an evaluation method is determined by Flink's type extraction
   * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
   * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type
-  * can be manually defined by overriding [[getResultType()]].
+  * can be manually defined by implementing a function as
+  *
+  * def getResultType(signature: Array[Class[_]]): TypeInformation[_]
   *
   * Internally, the Table/SQL API code generation works with primitive values as much as possible.
   * If a user-defined scalar function should not introduce much overhead during runtime, it is
   * recommended to declare parameters and result types as primitive types instead of their boxed
   * classes. DATE/TIME is equal to int, TIMESTAMP is equal to long.
   */
-abstract class ScalarFunction extends UserDefinedFunction {
-
-  /**
-    * Creates a call to a [[ScalarFunction]] in Scala Table API.
-    *
-    * @param params actual parameters of function
-    * @return [[Expression]] in form of a [[ScalarFunctionCall]]
-    */
-  final def apply(params: Expression*): Expression = {
-    ScalarFunctionCall(this, params)
-  }
-
-  override def toString: String = getClass.getCanonicalName
-
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Returns the result type of the evaluation method with a given signature.
-    *
-    * This method needs to be overriden in case Flink's type extraction facilities are not
-    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
-    * method. Flink's type extraction facilities can handle basic types or
-    * simple POJOs but might be wrong for more complex, custom, or composite types.
-    *
-    * @param signature signature of the method the return type needs to be determined
-    * @return [[TypeInformation]] of result type or null if Flink should determine the type
-    */
-  def getResultType(signature: Array[Class[_]]): TypeInformation[_] = null
-
-  /**
-    * Returns [[TypeInformation]] about the operands of the evaluation method with a given
-    * signature.
-    *
-    * In order to perform operand type inference in SQL (especially when NULL is used) it might be
-    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
-    * By default Flink's type extraction facilities are used for this but might be wrong for
-    * more complex, custom, or composite types.
-    *
-    * @param signature signature of the method the operand types need to be determined
-    * @return [[TypeInformation]] of  operand types
-    */
-  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
-    signature.map { c =>
-      try {
-        TypeExtractor.getForClass(c)
-      } catch {
-        case ite: InvalidTypesException =>
-          throw new ValidationException(
-            s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +
-            s"automatically determined. Please provide type information manually.")
-      }
-    }
-  }
-}
+abstract class ScalarFunction extends UserDefinedFunction
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
index 53543494f78..dd3139567f4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
@@ -34,7 +34,9 @@ import org.apache.flink.util.Collector
   * By default the result type of an evaluation method is determined by Flink's type extraction
   * facilities. This is sufficient for basic types or simple POJOs but might be wrong for more
   * complex, custom, or composite types. In these cases [[TypeInformation]] of the result type
-  * can be manually defined by overriding [[getResultType()]].
+  * can be manually defined by implementing a function as
+  *
+  * def getResultType(): TypeInformation[_]
   *
   * Internally, the Table/SQL API code generation works with primitive values as much as possible.
   * If a user-defined table function should not introduce much overhead during runtime, it is
@@ -77,11 +79,6 @@ import org.apache.flink.util.Collector
   * @tparam T The type of the output row
   */
 abstract class TableFunction[T] extends UserDefinedFunction {
-
-  override def toString: String = getClass.getCanonicalName
-
-  // ----------------------------------------------------------------------------------------------
-
   /**
     * Emit an output row.
     *
@@ -91,8 +88,6 @@ abstract class TableFunction[T] extends UserDefinedFunction {
     collector.collect(row)
   }
 
-  // ----------------------------------------------------------------------------------------------
-
   /**
     * The code generated collector used to emit row.
     */
@@ -104,19 +99,4 @@ abstract class TableFunction[T] extends UserDefinedFunction {
   private[flink] final def setCollector(collector: Collector[T]): Unit = {
     this.collector = collector
   }
-
-  // ----------------------------------------------------------------------------------------------
-
-  /**
-    * Returns the result type of the evaluation method with a given signature.
-    *
-    * This method needs to be overriden in case Flink's type extraction facilities are not
-    * sufficient to extract the [[TypeInformation]] based on the return type of the evaluation
-    * method. Flink's type extraction facilities can handle basic types or
-    * simple POJOs but might be wrong for more complex, custom, or composite types.
-    *
-    * @return [[TypeInformation]] of result type or null if Flink should determine the type
-    */
-  def getResultType: TypeInformation[T] = null
-
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
index e9e01eeed79..2ebff44b4ed 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/UserDefinedFunction.scala
@@ -24,6 +24,9 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.serialize
   * or aggregation functions.
   */
 abstract class UserDefinedFunction extends Serializable {
+
+  override def toString: String = getClass.getCanonicalName
+
   /**
     * Setup method for user-defined function. It can be used for initialization work.
     *
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index bbfa3aa93ef..d22ef88429c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -108,7 +108,7 @@ object ScalarSqlFunction {
         val foundSignature = getEvalMethodSignature(scalarFunction, operandTypeInfo)
           .getOrElse(throw new ValidationException(s"Operand types of could not be inferred."))
 
-        val inferredTypes = scalarFunction
+        val inferredTypes = UserDefinedFunctionUtils
           .getParameterTypes(foundSignature)
           .map(typeFactory.createTypeFromTypeInfo)
 
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 11174decc48..703e204797a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -38,6 +38,8 @@ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, Tabl
 import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
 import org.apache.flink.util.InstantiationUtil
 
+import scala.util.{Failure, Success, Try}
+
 object UserDefinedFunctionUtils {
 
   /**
@@ -337,9 +339,15 @@ object UserDefinedFunctionUtils {
       signature: Array[Class[_]])
     : TypeInformation[_] = {
 
-    val userDefinedTypeInfo = function.getResultType(signature)
-    if (userDefinedTypeInfo != null) {
-      userDefinedTypeInfo
+    val userDefinedTypeInfo: Option[TypeInformation[_]] = Try {
+      function.getClass.getDeclaredMethod("getResultType", classOf[Array[Class[_]]])
+    } match {
+      case Success(m) => Some(m.invoke(function, signature).asInstanceOf[TypeInformation[_]])
+      case Failure(_) => None
+    }
+
+    if (userDefinedTypeInfo.isDefined) {
+      userDefinedTypeInfo.get
     } else {
       try {
         TypeExtractor.getForClass(getResultTypeClassOfScalarFunction(function, signature))
@@ -499,4 +507,29 @@ object UserDefinedFunctionUtils {
       }
     }
   }
+
+  /**
+    * Returns [[TypeInformation]] about the operands of the evaluation method with a given
+    * signature.
+    *
+    * In order to perform operand type inference in SQL (especially when NULL is used) it might be
+    * necessary to determine the parameter [[TypeInformation]] of an evaluation method.
+    * By default Flink's type extraction facilities are used for this but might be wrong for
+    * more complex, custom, or composite types.
+    *
+    * @param signature signature of the method the operand types need to be determined
+    * @return [[TypeInformation]] of  operand types
+    */
+  def getParameterTypes(signature: Array[Class[_]]): Array[TypeInformation[_]] = {
+    signature.map { c =>
+      try {
+        TypeExtractor.getForClass(c)
+      } catch {
+        case ite: InvalidTypesException =>
+          throw new ValidationException(
+            s"Parameter types of scalar function '${this.getClass.getCanonicalName}' cannot be " +
+              s"automatically determined. Please provide type information manually.")
+      }
+    }
+  }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala
index f5f5ff1fc7f..15300f3461c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CompositeFlatteningTest.scala
@@ -140,7 +140,7 @@ object CompositeFlatteningTest {
       TestCaseClass("hello", 42)
     }
 
-    override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+    def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
       createTypeInformation[TestCaseClass]
     }
   }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
index 528556968bf..62cf74e0c82 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
@@ -104,7 +104,7 @@ object Func10 extends ScalarFunction {
     c
   }
 
-  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+  def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
     Types.SQL_TIMESTAMP
   }
 }
@@ -120,7 +120,7 @@ object Func12 extends ScalarFunction {
     a
   }
 
-  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+  def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
     Types.INTERVAL_MILLIS
   }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
index d0ffade253c..4672385faaa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
@@ -68,7 +68,7 @@ class TableFunc2 extends TableFunction[Row] {
     }
   }
 
-  override def getResultType: TypeInformation[Row] = {
+  def getResultType: TypeInformation[Row] = {
     new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
                     BasicTypeInfo.INT_TYPE_INFO)
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services