You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/26 03:15:10 UTC

[GitHub] asfgit closed pull request #7152: [FLINK-10958] [table] Add overload support for user defined function

asfgit closed pull request #7152: [FLINK-10958] [table] Add overload support for user defined function
URL: https://github.com/apache/flink/pull/7152
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index c9a27036797..4faa6ed05ce 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -151,17 +151,17 @@ object UserDefinedFunctionUtils {
           // match parameters of signature to actual parameters
           methodSignature.length == signatures.length &&
             signatures.zipWithIndex.forall { case (clazz, i) =>
-              parameterTypeEquals(methodSignature(i), clazz)
+              parameterTypeApplicable(methodSignature(i), clazz)
           }
         case cur if cur.isVarArgs =>
           val signatures = cur.getParameterTypes
           methodSignature.zipWithIndex.forall {
             // non-varargs
             case (clazz, i) if i < signatures.length - 1  =>
-              parameterTypeEquals(clazz, signatures(i))
+              parameterTypeApplicable(clazz, signatures(i))
             // varargs
             case (clazz, i) if i >= signatures.length - 1 =>
-              parameterTypeEquals(clazz, signatures.last.getComponentType)
+              parameterTypeApplicable(clazz, signatures.last.getComponentType)
           } || (methodSignature.isEmpty && signatures.length == 1) // empty varargs
     }
 
@@ -171,14 +171,45 @@ object UserDefinedFunctionUtils {
       fixedMethodsCount > 0 && !cur.isVarArgs ||
       fixedMethodsCount == 0 && cur.isVarArgs
     }
+    val maximallySpecific = if (found.length > 1) {
+      implicit val methodOrdering = new scala.Ordering[Method] {
+        override def compare(x: Method, y: Method): Int = {
+          def specificThan(left: Method, right: Method) = {
+            // left parameter type is more specific than right parameter type
+            left.getParameterTypes.zip(right.getParameterTypes).forall {
+              case (leftParameterType, rightParameterType) =>
+                parameterTypeApplicable(leftParameterType, rightParameterType)
+            } &&
+            // non-equal
+            left.getParameterTypes.zip(right.getParameterTypes).exists {
+              case (leftParameterType, rightParameterType) =>
+                !parameterTypeEquals(leftParameterType, rightParameterType)
+            }
+          }
+
+          if (specificThan(x, y)) {
+            1
+          } else if (specificThan(y, x)) {
+            -1
+          } else {
+            0
+          }
+        }
+      }
+
+      val max = found.max
+      found.filter(methodOrdering.compare(max, _) == 0)
+    } else {
+      found
+    }
 
     // check if there is a Scala varargs annotation
-    if (found.isEmpty &&
+    if (maximallySpecific.isEmpty &&
       methods.exists { method =>
         val signatures = method.getParameterTypes
         signatures.zipWithIndex.forall {
           case (clazz, i) if i < signatures.length - 1 =>
-            parameterTypeEquals(methodSignature(i), clazz)
+            parameterTypeApplicable(methodSignature(i), clazz)
           case (clazz, i) if i == signatures.length - 1 =>
             clazz.getName.equals("scala.collection.Seq")
         }
@@ -186,11 +217,11 @@ object UserDefinedFunctionUtils {
       throw new ValidationException(
         s"Scala-style variable arguments in '$methodName' methods are not supported. Please " +
           s"add a @scala.annotation.varargs annotation.")
-    } else if (found.length > 1) {
+    } else if (maximallySpecific.length > 1) {
       throw new ValidationException(
         s"Found multiple '$methodName' methods which match the signature.")
     }
-    found.headOption
+    maximallySpecific.headOption
   }
 
   /**
@@ -719,10 +750,14 @@ object UserDefinedFunctionUtils {
     * Compares parameter candidate classes with expected classes. If true, the parameters match.
     * Candidate can be null (acts as a wildcard).
     */
+  private def parameterTypeApplicable(candidate: Class[_], expected: Class[_]): Boolean =
+    parameterTypeEquals(candidate, expected) ||
+      ((expected != null && expected.isAssignableFrom(candidate)) ||
+        expected.isPrimitive && Primitives.wrap(expected).isAssignableFrom(candidate))
+
   private def parameterTypeEquals(candidate: Class[_], expected: Class[_]): Boolean =
   candidate == null ||
     candidate == expected ||
-    expected == classOf[Object] ||
     expected.isPrimitive && Primitives.wrap(expected) == candidate ||
     // time types
     candidate == classOf[Date] && (expected == classOf[Int] || expected == classOf[JInt])  ||
@@ -730,8 +765,7 @@ object UserDefinedFunctionUtils {
     candidate == classOf[Timestamp] && (expected == classOf[Long] || expected == classOf[JLong]) ||
     // arrays
     (candidate.isArray && expected.isArray &&
-      (candidate.getComponentType == expected.getComponentType ||
-        expected.getComponentType == classOf[Object]))
+      (candidate.getComponentType == expected.getComponentType))
 
   /**
     * Creates a [[LogicalTableFunctionCall]] by parsing a String expression.
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
index 912bb047cba..29de7e0f9d1 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -310,6 +310,26 @@ class Func20 extends ScalarFunction {
   }
 }
 
+object Func21 extends ScalarFunction {
+  def eval(p: People): String = {
+    p.name
+  }
+
+  def eval(p: Student): String = {
+    "student#" + p.name
+  }
+}
+
+object Func22 extends ScalarFunction {
+  def eval(a: Array[People]): String = {
+    a.head.name
+  }
+
+  def eval(a: Array[Student]): String = {
+    "student#" + a.head.name
+  }
+}
+
 class SplitUDF(deterministic: Boolean) extends ScalarFunction {
   def eval(x: String, sep: String, index: Int): String = {
     val splits = StringUtils.splitByWholeSeparator(x, sep)
@@ -321,3 +341,9 @@ class SplitUDF(deterministic: Boolean) extends ScalarFunction {
   }
   override def isDeterministic: Boolean = deterministic
 }
+
+class People(val name: String)
+
+class Student(name: String) extends People(name)
+
+class GraduatedStudent(name: String) extends Student(name)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index 9d29018fdb3..e433742fb67 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.expressions.utils.{Func13, RichFunc1, RichFunc2, SplitUDF}
+import org.apache.flink.table.expressions.utils._
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, UserDefinedFunctionTestUtils}
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
@@ -350,4 +350,62 @@ class CalcITCase extends AbstractTestBase {
       "{9=Comment#3}")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testOverload(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val testData = new mutable.MutableList[GraduatedStudent]
+    testData.+=(new GraduatedStudent("Jack#22"))
+    testData.+=(new GraduatedStudent("John#19"))
+    testData.+=(new GraduatedStudent("Anna#44"))
+    testData.+=(new GraduatedStudent("nosharp"))
+
+    val t = env.fromCollection(testData).toTable(tEnv).as('a)
+
+    val result = t.select(Func21('a))
+
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "student#Jack#22",
+      "student#John#19",
+      "student#Anna#44",
+      "student#nosharp"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testOverloadWithArray(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+
+    val testData = new mutable.MutableList[Array[GraduatedStudent]]
+    testData.+=(Array(new GraduatedStudent("Jack#22")))
+    testData.+=(Array(new GraduatedStudent("John#19")))
+    testData.+=(Array(new GraduatedStudent("Anna#44")))
+    testData.+=(Array(new GraduatedStudent("nosharp")))
+
+    val t = env.fromCollection(testData).toTable(tEnv).as('a)
+
+    val result = t.select(Func22('a))
+
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "student#Jack#22",
+      "student#John#19",
+      "student#Anna#44",
+      "student#nosharp"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services