You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/05/15 11:46:38 UTC
flink git commit: [FLINK-6579] [table] Add proper support for
BasicArrayTypeInfo
Repository: flink
Updated Branches:
refs/heads/master c86f46cdc -> 3cd95f026
[FLINK-6579] [table] Add proper support for BasicArrayTypeInfo
This closes #3902.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3cd95f02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3cd95f02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3cd95f02
Branch: refs/heads/master
Commit: 3cd95f0264beaea4c542421f962739c03e250f59
Parents: c86f46c
Author: twalthr <tw...@apache.org>
Authored: Mon May 15 11:38:19 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon May 15 13:45:12 2017 +0200
----------------------------------------------------------------------
.../flink/table/codegen/CodeGenerator.scala | 8 +-
.../table/codegen/calls/ScalarOperators.scala | 129 +++++++++++--------
.../apache/flink/table/expressions/array.scala | 11 +-
.../flink/table/expressions/comparison.scala | 7 +-
.../flink/table/typeutils/TypeCheckUtils.scala | 4 +-
.../api/java/batch/TableEnvironmentITCase.java | 44 ++++---
.../java/utils/UserDefinedScalarFunctions.java | 7 +
.../scala/batch/TableEnvironmentITCase.scala | 9 +-
.../flink/table/expressions/ArrayTypeTest.scala | 50 ++++++-
.../UserDefinedScalarFunctionTest.scala | 19 ++-
10 files changed, 194 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3cd95f02/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 036889f..52a9dcd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -29,7 +29,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.io.GenericInputFormat
-import org.apache.flink.api.common.typeinfo.{AtomicType, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo._
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils._
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
@@ -1522,13 +1522,15 @@ class CodeGenerator(
case ITEM =>
operands.head.resultType match {
- case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] =>
+ case _: ObjectArrayTypeInfo[_, _] |
+ _: BasicArrayTypeInfo[_, _] |
+ _: PrimitiveArrayTypeInfo[_] =>
val array = operands.head
val index = operands(1)
requireInteger(index)
generateArrayElementAt(this, array, index)
- case map: MapTypeInfo[_, _] =>
+ case _: MapTypeInfo[_, _] =>
val key = operands(1)
generateMapGet(this, operands.head, key)
http://git-wip-us.apache.org/repos/asf/flink/blob/3cd95f02/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 0c5baa6..b5ebe51 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -21,7 +21,7 @@ import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
import org.apache.calcite.util.BuiltInMethod
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo._
import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
@@ -93,7 +93,8 @@ object ScalarOperators {
generateComparison("==", nullCheck, left, right)
}
// array types
- else if (isArray(left.resultType) && left.resultType == right.resultType) {
+ else if (isArray(left.resultType) &&
+ left.resultType.getTypeClass == right.resultType.getTypeClass) {
generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
(leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, $rightTerm)"
}
@@ -133,7 +134,8 @@ object ScalarOperators {
generateComparison("!=", nullCheck, left, right)
}
// array types
- else if (isArray(left.resultType) && left.resultType == right.resultType) {
+ else if (isArray(left.resultType) &&
+ left.resultType.getTypeClass == right.resultType.getTypeClass) {
generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
(leftTerm, rightTerm) => s"!java.util.Arrays.equals($leftTerm, $rightTerm)"
}
@@ -456,6 +458,11 @@ object ScalarOperators {
case (fromTp, toTp) if fromTp == toTp =>
operand
+ // array identity casting
+ // (e.g. for Integer[] that can be ObjectArrayTypeInfo or BasicArrayTypeInfo)
+ case (fromTp, toTp) if isArray(fromTp) && fromTp.getTypeClass == toTp.getTypeClass =>
+ operand
+
// Date/Time/Timestamp -> String
case (dtt: SqlTimeTypeInfo[_], STRING_TYPE_INFO) =>
generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
@@ -479,13 +486,13 @@ object ScalarOperators {
}
// Object array -> String
- case (_:ObjectArrayTypeInfo[_, _], STRING_TYPE_INFO) =>
+ case (_: ObjectArrayTypeInfo[_, _] | _: BasicArrayTypeInfo[_, _], STRING_TYPE_INFO) =>
generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
(operandTerm) => s"java.util.Arrays.deepToString($operandTerm)"
}
// Primitive array -> String
- case (_:PrimitiveArrayTypeInfo[_], STRING_TYPE_INFO) =>
+ case (_: PrimitiveArrayTypeInfo[_], STRING_TYPE_INFO) =>
generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
(operandTerm) => s"java.util.Arrays.toString($operandTerm)"
}
@@ -792,37 +799,45 @@ object ScalarOperators {
val resultTerm = newName("result")
+ def unboxArrayElement(componentInfo: TypeInformation[_]): GeneratedExpression = {
+ // get boxed array element
+ val resultTypeTerm = boxedTypeTermForTypeInfo(componentInfo)
+
+ val arrayAccessCode = if (codeGenerator.nullCheck) {
+ s"""
+ |${array.code}
+ |${index.code}
+ |$resultTypeTerm $resultTerm = (${array.nullTerm} || ${index.nullTerm}) ?
+ | null : ${array.resultTerm}[${index.resultTerm} - 1];
+ |""".stripMargin
+ } else {
+ s"""
+ |${array.code}
+ |${index.code}
+ |$resultTypeTerm $resultTerm = ${array.resultTerm}[${index.resultTerm} - 1];
+ |""".stripMargin
+ }
+
+ // generate unbox code
+ val unboxing = codeGenerator.generateInputFieldUnboxing(componentInfo, resultTerm)
+
+ unboxing.copy(code =
+ s"""
+ |$arrayAccessCode
+ |${unboxing.code}
+ |""".stripMargin
+ )
+ }
+
array.resultType match {
// unbox object array types
case oati: ObjectArrayTypeInfo[_, _] =>
- // get boxed array element
- val resultTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo)
+ unboxArrayElement(oati.getComponentInfo)
- val arrayAccessCode = if (codeGenerator.nullCheck) {
- s"""
- |${array.code}
- |${index.code}
- |$resultTypeTerm $resultTerm = (${array.nullTerm} || ${index.nullTerm}) ?
- | null : ${array.resultTerm}[${index.resultTerm} - 1];
- |""".stripMargin
- } else {
- s"""
- |${array.code}
- |${index.code}
- |$resultTypeTerm $resultTerm = ${array.resultTerm}[${index.resultTerm} - 1];
- |""".stripMargin
- }
-
- // generate unbox code
- val unboxing = codeGenerator.generateInputFieldUnboxing(oati.getComponentInfo, resultTerm)
-
- unboxing.copy(code =
- s"""
- |$arrayAccessCode
- |${unboxing.code}
- |""".stripMargin
- )
+ // unbox basic array types
+ case bati: BasicArrayTypeInfo[_, _] =>
+ unboxArrayElement(bati.getComponentInfo)
// no unboxing necessary
case pati: PrimitiveArrayTypeInfo[_] =>
@@ -841,6 +856,7 @@ object ScalarOperators {
val resultTerm = newName("result")
val resultType = array.resultType match {
case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+ case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
}
val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
@@ -852,31 +868,38 @@ object ScalarOperators {
s"${array.resultTerm}.length"
}
+ def unboxArrayElement(componentInfo: TypeInformation[_]): String = {
+ // generate unboxing code
+ val unboxing = codeGenerator.generateInputFieldUnboxing(
+ componentInfo,
+ s"${array.resultTerm}[0]")
+
+ s"""
+ |${array.code}
+ |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" }
+ |$resultTypeTerm $resultTerm;
+ |switch ($arrayLengthCode) {
+ | case 0:
+ | ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" }
+ | $resultTerm = $defaultValue;
+ | break;
+ | case 1:
+ | ${unboxing.code}
+ | ${if (codeGenerator.nullCheck) s"$nullTerm = ${unboxing.nullTerm};" else "" }
+ | $resultTerm = ${unboxing.resultTerm};
+ | break;
+ | default:
+ | throw new RuntimeException("Array has more than one element.");
+ |}
+ |""".stripMargin
+ }
+
val arrayAccessCode = array.resultType match {
case oati: ObjectArrayTypeInfo[_, _] =>
- // generate unboxing code
- val unboxing = codeGenerator.generateInputFieldUnboxing(
- oati.getComponentInfo,
- s"${array.resultTerm}[0]")
+ unboxArrayElement(oati.getComponentInfo)
- s"""
- |${array.code}
- |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" }
- |$resultTypeTerm $resultTerm;
- |switch ($arrayLengthCode) {
- | case 0:
- | ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" }
- | $resultTerm = $defaultValue;
- | break;
- | case 1:
- | ${unboxing.code}
- | ${if (codeGenerator.nullCheck) s"$nullTerm = ${unboxing.nullTerm};" else "" }
- | $resultTerm = ${unboxing.resultTerm};
- | break;
- | default:
- | throw new RuntimeException("Array has more than one element.");
- |}
- |""".stripMargin
+ case bati: BasicArrayTypeInfo[_, _] =>
+ unboxArrayElement(bati.getComponentInfo)
case pati: PrimitiveArrayTypeInfo[_] =>
s"""
http://git-wip-us.apache.org/repos/asf/flink/blob/3cd95f02/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
index 7446228..7211733 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
@@ -22,9 +22,10 @@ import org.apache.calcite.rex.RexNode
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo}
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.typeutils.TypeCheckUtils.isArray
import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
import scala.collection.JavaConverters._
@@ -75,12 +76,13 @@ case class ArrayElementAt(array: Expression, index: Expression) extends Expressi
override private[flink] def resultType = array.resultType match {
case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+ case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
}
override private[flink] def validateInput(): ValidationResult = {
array.resultType match {
- case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] =>
+ case ati: TypeInformation[_] if isArray(ati) =>
if (index.resultType == INT_TYPE_INFO) {
// check for common user mistake
index match {
@@ -114,7 +116,7 @@ case class ArrayCardinality(array: Expression) extends Expression {
override private[flink] def validateInput(): ValidationResult = {
array.resultType match {
- case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => ValidationSuccess
+ case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
case other@_ => ValidationFailure(s"Array expected but was '$other'.")
}
}
@@ -134,12 +136,13 @@ case class ArrayElement(array: Expression) extends Expression {
override private[flink] def resultType = array.resultType match {
case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+ case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
}
override private[flink] def validateInput(): ValidationResult = {
array.resultType match {
- case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => ValidationSuccess
+ case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
case other@_ => ValidationFailure(s"Array expected but was '$other'.")
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3cd95f02/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
index 0c7e57c..562521b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
@@ -22,7 +22,8 @@ import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.fun.SqlStdOperatorTable
import org.apache.calcite.tools.RelBuilder
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.table.typeutils.TypeCheckUtils.{isComparable, isNumeric}
+import org.apache.flink.table.typeutils.TypeCheckUtils
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isComparable, isNumeric}
import org.apache.flink.table.validate._
import scala.collection.JavaConversions._
@@ -56,6 +57,8 @@ case class EqualTo(left: Expression, right: Expression) extends BinaryComparison
(left.resultType, right.resultType) match {
case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
case (lType, rType) if lType == rType => ValidationSuccess
+ case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
+ ValidationSuccess
case (lType, rType) =>
ValidationFailure(s"Equality predicate on incompatible types: $lType and $rType")
}
@@ -70,6 +73,8 @@ case class NotEqualTo(left: Expression, right: Expression) extends BinaryCompari
(left.resultType, right.resultType) match {
case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess
case (lType, rType) if lType == rType => ValidationSuccess
+ case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass =>
+ ValidationSuccess
case (lType, rType) =>
ValidationFailure(s"Inequality predicate on incompatible types: $lType and $rType")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3cd95f02/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index fea8c2a..0676b8a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -69,8 +69,8 @@ object TypeCheckUtils {
def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
case _: ObjectArrayTypeInfo[_, _] |
- _: PrimitiveArrayTypeInfo[_] |
- _: BasicArrayTypeInfo[_, _] => true
+ _: BasicArrayTypeInfo[_, _] |
+ _: PrimitiveArrayTypeInfo[_] => true
case _ => false
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3cd95f02/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index 81c60b4..15c6f8a 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -243,24 +243,25 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
List<SmallPojo> data = new ArrayList<>();
- data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
- data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
- data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
+ data.add(new SmallPojo("Peter", 28, 4000.00, "Sales", new Integer[] {42}));
+ data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering", new Integer[] {}));
+ data.add(new SmallPojo("Lucy", 42, 6000.00, "HR", new Integer[] {1, 2, 3}));
Table table = tableEnv
.fromDataSet(env.fromCollection(data),
"department AS a, " +
"age AS b, " +
"salary AS c, " +
- "name AS d")
- .select("a, b, c, d");
+ "name AS d," +
+ "roles as e")
+ .select("a, b, c, d, e");
DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
List<Row> results = ds.collect();
String expected =
- "Sales,28,4000.0,Peter\n" +
- "Engineering,56,10000.0,Anna\n" +
- "HR,42,6000.0,Lucy\n";
+ "Sales,28,4000.0,Peter,[42]\n" +
+ "Engineering,56,10000.0,Anna,[]\n" +
+ "HR,42,6000.0,Lucy,[1, 2, 3]\n";
compareResultAsText(results, expected);
}
@@ -297,24 +298,25 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
List<SmallPojo> data = new ArrayList<>();
- data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
- data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
- data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
+ data.add(new SmallPojo("Peter", 28, 4000.00, "Sales", new Integer[] {42}));
+ data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering", new Integer[] {}));
+ data.add(new SmallPojo("Lucy", 42, 6000.00, "HR", new Integer[] {1, 2, 3}));
Table table = tableEnv
.fromDataSet(env.fromCollection(data),
"department AS a, " +
"age AS b, " +
"salary AS c, " +
- "name AS d")
- .select("a, b, c, d");
+ "name AS d," +
+ "roles AS e")
+ .select("a, b, c, d, e");
DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
List<SmallPojo2> results = ds.collect();
String expected =
- "Sales,28,4000.0,Peter\n" +
- "Engineering,56,10000.0,Anna\n" +
- "HR,42,6000.0,Lucy\n";
+ "Sales,28,4000.0,Peter,[42]\n" +
+ "Engineering,56,10000.0,Anna,[]\n" +
+ "HR,42,6000.0,Lucy,[1, 2, 3]\n";
compareResultAsText(results, expected);
}
@@ -487,17 +489,19 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
public SmallPojo() { }
- public SmallPojo(String name, int age, double salary, String department) {
+ public SmallPojo(String name, int age, double salary, String department, Integer[] roles) {
this.name = name;
this.age = age;
this.salary = salary;
this.department = department;
+ this.roles = roles;
}
public String name;
public int age;
public double salary;
public String department;
+ public Integer[] roles;
}
@SuppressWarnings("unused")
@@ -580,21 +584,23 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase {
public SmallPojo2() { }
- public SmallPojo2(String a, int b, double c, String d) {
+ public SmallPojo2(String a, int b, double c, String d, Integer[] e) {
this.a = a;
this.b = b;
this.c = c;
this.d = d;
+ this.e = e;
}
public String a;
public int b;
public double c;
public String d;
+ public Integer[] e;
@Override
public String toString() {
- return a + "," + b + "," + c + "," + d;
+ return a + "," + b + "," + c + "," + d + "," + Arrays.toString(e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3cd95f02/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
index 56f866d..1e5fabe 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.table.api.java.utils;
+import java.util.Arrays;
import org.apache.flink.table.functions.ScalarFunction;
public class UserDefinedScalarFunctions {
@@ -53,4 +54,10 @@ public class UserDefinedScalarFunctions {
}
}
+ public static class JavaFunc4 extends ScalarFunction {
+ public String eval(Integer[] a, String[] b) {
+ return Arrays.toString(a) + " and " + Arrays.toString(b);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3cd95f02/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index 0c2505a..9e6dd33 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -22,18 +22,17 @@ import java.util
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
+import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
import org.apache.flink.table.api.{TableEnvironment, TableException}
-import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit.Assert.assertTrue
import org.junit._
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
-import org.junit.Assert.assertTrue
import scala.collection.JavaConverters._
http://git-wip-us.apache.org/repos/asf/flink/blob/3cd95f02/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
index 72b5ab8..297c21a 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.expressions
import java.sql.Date
-import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.types.Row
@@ -248,6 +248,12 @@ class ArrayTypeTest extends ExpressionTestBase {
"f4.at(2).at(2)",
"f4[2][2]",
"null")
+
+ testAllApis(
+ 'f11.at(1),
+ "f11.at(1)",
+ "f11[1]",
+ "1")
}
@Test
@@ -265,6 +271,12 @@ class ArrayTypeTest extends ExpressionTestBase {
"CARDINALITY(f4)",
"null")
+ testAllApis(
+ 'f11.cardinality(),
+ "f11.cardinality()",
+ "CARDINALITY(f11)",
+ "1")
+
// element
testAllApis(
'f9.element(),
@@ -290,6 +302,12 @@ class ArrayTypeTest extends ExpressionTestBase {
"ELEMENT(f4)",
"null")
+ testAllApis(
+ 'f11.element(),
+ "f11.element()",
+ "ELEMENT(f11)",
+ "1")
+
// comparison
testAllApis(
'f2 === 'f5.at(1),
@@ -320,6 +338,30 @@ class ArrayTypeTest extends ExpressionTestBase {
"f2 !== f7",
"f2 <> f7",
"true")
+
+ testAllApis(
+ 'f11 === 'f11,
+ "f11 === f11",
+ "f11 = f11",
+ "true")
+
+ testAllApis(
+ 'f11 === 'f9,
+ "f11 === f9",
+ "f11 = f9",
+ "true")
+
+ testAllApis(
+ 'f11 !== 'f11,
+ "f11 !== f11",
+ "f11 <> f11",
+ "false")
+
+ testAllApis(
+ 'f11 !== 'f9,
+ "f11 !== f9",
+ "f11 <> f9",
+ "false")
}
// ----------------------------------------------------------------------------------------------
@@ -327,7 +369,7 @@ class ArrayTypeTest extends ExpressionTestBase {
case class MyCaseClass(string: String, int: Int)
override def testData: Any = {
- val testData = new Row(11)
+ val testData = new Row(12)
testData.setField(0, null)
testData.setField(1, 42)
testData.setField(2, Array(1, 2, 3))
@@ -339,6 +381,7 @@ class ArrayTypeTest extends ExpressionTestBase {
testData.setField(8, Array(4.0))
testData.setField(9, Array[Integer](1))
testData.setField(10, Array[Integer]())
+ testData.setField(11, Array[Integer](1))
testData
}
@@ -354,7 +397,8 @@ class ArrayTypeTest extends ExpressionTestBase {
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
ObjectArrayTypeInfo.getInfoFor(Types.INT),
- ObjectArrayTypeInfo.getInfoFor(Types.INT)
+ ObjectArrayTypeInfo.getInfoFor(Types.INT),
+ BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
).asInstanceOf[TypeInformation[Any]]
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3cd95f02/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
index 91cce0c..1e7d580 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
@@ -21,11 +21,11 @@ package org.apache.flink.table.expressions
import java.sql.{Date, Time, Timestamp}
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.types.Row
import org.apache.flink.table.api.{Types, ValidationException}
-import org.apache.flink.table.api.java.utils.UserDefinedScalarFunctions.{JavaFunc0, JavaFunc1, JavaFunc2, JavaFunc3}
+import org.apache.flink.table.api.java.utils.UserDefinedScalarFunctions._
import org.apache.flink.table.api.scala._
import org.apache.flink.table.expressions.utils._
import org.apache.flink.table.functions.ScalarFunction
@@ -263,6 +263,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
def testJavaBoxedPrimitives(): Unit = {
val JavaFunc0 = new JavaFunc0()
val JavaFunc1 = new JavaFunc1()
+ val JavaFunc4 = new JavaFunc4()
testAllApis(
JavaFunc0('f8),
@@ -288,6 +289,13 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
"JavaFunc1(Null(TIME), 15, Null(TIMESTAMP))",
"JavaFunc1(NULL, 15, NULL)",
"null and 15 and null")
+
+ testAllApis(
+ JavaFunc4('f10, array("a", "b", "c")),
+ "JavaFunc4(f10, array('a', 'b', 'c'))",
+ "JavaFunc4(f10, array['a', 'b', 'c'])",
+ "[1, 2, null] and [a, b, c]"
+ )
}
@Test
@@ -317,7 +325,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
// ----------------------------------------------------------------------------------------------
override def testData: Any = {
- val testData = new Row(10)
+ val testData = new Row(11)
testData.setField(0, 42)
testData.setField(1, "Test")
testData.setField(2, null)
@@ -328,6 +336,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
testData.setField(7, 12)
testData.setField(8, 1000L)
testData.setField(9, Seq("Hello", "World"))
+ testData.setField(10, Array[Integer](1, 2, null))
testData
}
@@ -342,7 +351,8 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
Types.SQL_TIMESTAMP,
Types.INTERVAL_MONTHS,
Types.INTERVAL_MILLIS,
- TypeInformation.of(classOf[Seq[String]])
+ TypeInformation.of(classOf[Seq[String]]),
+ BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
).asInstanceOf[TypeInformation[Any]]
}
@@ -368,6 +378,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase {
"JavaFunc1" -> new JavaFunc1,
"JavaFunc2" -> new JavaFunc2,
"JavaFunc3" -> new JavaFunc3,
+ "JavaFunc4" -> new JavaFunc4,
"RichFunc0" -> new RichFunc0,
"RichFunc1" -> new RichFunc1,
"RichFunc2" -> new RichFunc2