You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/11/03 09:46:19 UTC

flink git commit: [FLINK-6226] [table] Add tests for UDFs with Byte, Short, and Float arguments.

Repository: flink
Updated Branches:
  refs/heads/release-1.3 4c6b6c29d -> 310817035


[FLINK-6226] [table] Add tests for UDFs with Byte, Short, and Float arguments.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/31081703
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/31081703
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/31081703

Branch: refs/heads/release-1.3
Commit: 310817035fef1d843b37e00ccd4a32efffaad3dc
Parents: 4c6b6c2
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Nov 2 21:10:03 2017 +0100
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Nov 3 10:41:57 2017 +0100

----------------------------------------------------------------------
 .../UserDefinedScalarFunctionTest.scala         | 28 ++++++++++++++++++--
 .../utils/UserDefinedScalarFunctions.scala      |  6 +++++
 .../DataSetUserDefinedFunctionITCase.scala      | 23 +++++++++++++++-
 .../table/utils/UserDefinedTableFunctions.scala | 12 ++++++++-
 4 files changed, 65 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/31081703/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
index 56cdf3c..df83f7c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
@@ -48,6 +48,24 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
       "43")
 
     testAllApis(
+      Func1('f11),
+      "Func1(f11)",
+      "Func1(f11)",
+      "4")
+
+    testAllApis(
+      Func1('f12),
+      "Func1(f12)",
+      "Func1(f12)",
+      "4")
+
+    testAllApis(
+      Func1('f13),
+      "Func1(f13)",
+      "Func1(f13)",
+      "4.0")
+
+    testAllApis(
       Func2('f0, 'f1, 'f3),
       "Func2(f0, f1, f3)",
       "Func2(f0, f1, f3)",
@@ -360,7 +378,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
   // ----------------------------------------------------------------------------------------------
 
   override def testData: Any = {
-    val testData = new Row(11)
+    val testData = new Row(14)
     testData.setField(0, 42)
     testData.setField(1, "Test")
     testData.setField(2, null)
@@ -372,6 +390,9 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
     testData.setField(8, 1000L)
     testData.setField(9, Seq("Hello", "World"))
     testData.setField(10, Array[Integer](1, 2, null))
+    testData.setField(11, 3.toByte)
+    testData.setField(12, 3.toShort)
+    testData.setField(13, 3.toFloat)
     testData
   }
 
@@ -387,7 +408,10 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
       Types.INTERVAL_MONTHS,
       Types.INTERVAL_MILLIS,
       TypeInformation.of(classOf[Seq[String]]),
-      BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
+      BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO,
+      Types.BYTE,
+      Types.SHORT,
+      Types.FLOAT
     ).asInstanceOf[TypeInformation[Any]]
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/31081703/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
index 5285569..9535cdf 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
@@ -41,6 +41,12 @@ object Func1 extends ScalarFunction {
   def eval(index: Integer): Integer = {
     index + 1
   }
+
+  def eval(b: Byte): Byte = (b + 1).toByte
+
+  def eval(s: Short): Short = (s + 1).toShort
+
+  def eval(f: Float): Float = f + 1
 }
 
 object Func2 extends ScalarFunction {

http://git-wip-us.apache.org/repos/asf/flink/blob/31081703/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
index 1860d3c..9755596 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.table.api.java.utils.UserDefinedTableFunctions.JavaTableFunc0
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.api.{TableEnvironment, Types, ValidationException}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsClusterTestBase
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
@@ -229,6 +229,27 @@ class DataSetUserDefinedFunctionITCase(
   }
 
   @Test
+  def testByteShortFloatArguments(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    val in = testData(env).toTable(tableEnv).as('a, 'b, 'c)
+    val tFunc = new TableFunc4
+
+    val result = in
+      .select('a.cast(Types.BYTE) as 'a, 'a.cast(Types.SHORT) as 'b, 'b.cast(Types.FLOAT) as 'c)
+      .join(tFunc('a, 'b, 'c) as ('a2, 'b2, 'c2))
+      .toDataSet[Row]
+
+    val results = result.collect()
+    val expected = Seq(
+      "1,1,1.0,Byte=1,Short=1,Float=1.0",
+      "2,2,2.0,Byte=2,Short=2,Float=2.0",
+      "3,3,2.0,Byte=3,Short=3,Float=2.0",
+      "4,4,3.0,Byte=4,Short=4,Float=3.0").mkString("\n")
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testUserDefinedTableFunctionWithParameter(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env)

http://git-wip-us.apache.org/repos/asf/flink/blob/31081703/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
index d0ffade..e1af23b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/UserDefinedTableFunctions.scala
@@ -22,7 +22,7 @@ import java.lang.Boolean
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.tuple.Tuple3
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.functions.{FunctionContext, TableFunction}
 import org.apache.flink.types.Row
 import org.junit.Assert
@@ -109,6 +109,16 @@ class TableFunc3(data: String, conf: Map[String, String]) extends TableFunction[
   }
 }
 
+class TableFunc4 extends TableFunction[Row] {
+  def eval(b: Byte, s: Short, f: Float): Unit = {
+    collect(Row.of("Byte=" + b, "Short=" + s, "Float=" + f))
+  }
+
+  override def getResultType: TypeInformation[Row] = {
+    new RowTypeInfo(Types.STRING, Types.STRING, Types.STRING)
+  }
+}
+
 class HierarchyTableFunction extends SplittableTableFunction[Boolean, Integer] {
   def eval(user: String) {
     if (user.contains("#")) {