You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 11:19:32 UTC
[flink] 02/05: [FLINK-13107][table-planner-blink] Fix Bug when
convert DataType which has primitiveClass as convensionClass to
TypeInformation.
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d3f39d07f5cb4dd4393a517f662329cbde254cdc
Author: beyond1920 <be...@126.com>
AuthorDate: Mon Jul 8 12:02:19 2019 +0800
[FLINK-13107][table-planner-blink] Fix Bug when convert DataType which has primitiveClass as convensionClass to TypeInformation.
---
.../table/runtime/batch/table/CalcITCase.scala | 2 --
.../runtime/batch/table/CorrelateITCase.scala | 4 ---
.../table/runtime/batch/table/JoinITCase.scala | 2 --
.../table/runtime/stream/table/CalcITCase.scala | 2 --
.../table/runtime/stream/table/JoinITCase.scala | 2 --
.../table/types/TypeInfoDataTypeConverter.java | 29 +++++++++++++++++++++-
6 files changed, 28 insertions(+), 13 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 9cf54d6..a8bbd7e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -305,8 +305,6 @@ class CalcITCase extends BatchTestBase {
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
- // TODO
- @Ignore("Type question, should be fixed later.")
@Test
def testUserDefinedScalarFunction() {
registerFunction("hashCode", HashCode)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
index 61132be..250cca5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
@@ -177,8 +177,6 @@ class CorrelateITCase extends BatchTestBase {
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
- // TODO
- @Ignore("Type question, should be fixed later.")
@Test
def testUserDefinedTableFunctionWithScalarFunctionInCondition(): Unit = {
val in = testData.as('a, 'b, 'c)
@@ -353,8 +351,6 @@ class CorrelateITCase extends BatchTestBase {
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
- // TODO
- @Ignore("Type question, should be fixed later.")
@Test
def testTableFunctionCollectorOpenClose(): Unit = {
val t = testData.as('a, 'b, 'c)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
index 44752b0..ea5c275 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
@@ -54,8 +54,6 @@ class JoinITCase extends BatchTestBase {
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
- // TODO
- @Ignore("Type question, should be fixed later.")
@Test
def testJoin1(): Unit = {
val ds1 = CollectionBatchExecTable.getSmall3TupleDataSet(tEnv, "a, b, c")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index 612f735..adeaa8c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -248,8 +248,6 @@ class CalcITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
assertEquals(expected.sorted, sink.getAppendResults.sorted)
}
- // TODO
- @Ignore("Type question, should be fixed later.")
@Test
def testMultipleUserDefinedFunctions(): Unit = {
tEnv.registerFunction("RichFunc1", new RichFunc1)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
index 07373e3..413efa1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
@@ -241,8 +241,6 @@ class JoinITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
// Proctime window output uncertain results, so assert has been ignored here.
}
- // TODO
- @Ignore("Type question, should be fixed later.")
@Test
def testInnerJoin(): Unit = {
val ds1 = env.fromCollection(smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
index e0c9ebb..26d4dec 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.types;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.MultisetTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
@@ -40,6 +41,10 @@ import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
import org.apache.flink.table.typeutils.DecimalTypeInfo;
import org.apache.flink.table.typeutils.MapViewTypeInfo;
import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
import static org.apache.flink.table.types.PlannerTypeUtils.isPrimitive;
@@ -61,10 +66,32 @@ import static org.apache.flink.table.types.PlannerTypeUtils.isPrimitive;
*/
@Deprecated
public class TypeInfoDataTypeConverter {
+ private static final Map<String, TypeInformation<?>> primitiveDataTypeTypeInfoMap = new HashMap<>();
+
+ static {
+ addDefaultTypeInfo(boolean.class, Types.BOOLEAN);
+ addDefaultTypeInfo(byte.class, Types.BYTE);
+ addDefaultTypeInfo(short.class, Types.SHORT);
+ addDefaultTypeInfo(int.class, Types.INT);
+ addDefaultTypeInfo(long.class, Types.LONG);
+ addDefaultTypeInfo(float.class, Types.FLOAT);
+ addDefaultTypeInfo(double.class, Types.DOUBLE);
+ }
+
+ private static void addDefaultTypeInfo(Class<?> clazz, TypeInformation<?> typeInformation) {
+ Preconditions.checkArgument(clazz.isPrimitive());
+ primitiveDataTypeTypeInfoMap.put(clazz.getName(), typeInformation);
+ }
public static TypeInformation<?> fromDataTypeToTypeInfo(DataType dataType) {
- LogicalType logicalType = fromDataTypeToLogicalType(dataType);
Class<?> clazz = dataType.getConversionClass();
+ if (clazz.isPrimitive()) {
+ final TypeInformation<?> foundTypeInfo = primitiveDataTypeTypeInfoMap.get(clazz.getName());
+ if (foundTypeInfo != null) {
+ return foundTypeInfo;
+ }
+ }
+ LogicalType logicalType = fromDataTypeToLogicalType(dataType);
switch (logicalType.getTypeRoot()) {
case DECIMAL:
DecimalType decimalType = (DecimalType) logicalType;