You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 11:19:32 UTC

[flink] 02/05: [FLINK-13107][table-planner-blink] Fix Bug when convert DataType which has primitiveClass as convensionClass to TypeInformation.

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d3f39d07f5cb4dd4393a517f662329cbde254cdc
Author: beyond1920 <be...@126.com>
AuthorDate: Mon Jul 8 12:02:19 2019 +0800

    [FLINK-13107][table-planner-blink] Fix Bug when convert DataType which has primitiveClass as convensionClass to TypeInformation.
---
 .../table/runtime/batch/table/CalcITCase.scala     |  2 --
 .../runtime/batch/table/CorrelateITCase.scala      |  4 ---
 .../table/runtime/batch/table/JoinITCase.scala     |  2 --
 .../table/runtime/stream/table/CalcITCase.scala    |  2 --
 .../table/runtime/stream/table/JoinITCase.scala    |  2 --
 .../table/types/TypeInfoDataTypeConverter.java     | 29 +++++++++++++++++++++-
 6 files changed, 28 insertions(+), 13 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
index 9cf54d6..a8bbd7e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
@@ -305,8 +305,6 @@ class CalcITCase extends BatchTestBase {
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // TODO
-  @Ignore("Type question, should be fixed later.")
   @Test
   def testUserDefinedScalarFunction() {
     registerFunction("hashCode", HashCode)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
index 61132be..250cca5 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
@@ -177,8 +177,6 @@ class CorrelateITCase extends BatchTestBase {
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // TODO
-  @Ignore("Type question, should be fixed later.")
   @Test
   def testUserDefinedTableFunctionWithScalarFunctionInCondition(): Unit = {
     val in = testData.as('a, 'b, 'c)
@@ -353,8 +351,6 @@ class CorrelateITCase extends BatchTestBase {
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // TODO
-  @Ignore("Type question, should be fixed later.")
   @Test
   def testTableFunctionCollectorOpenClose(): Unit = {
     val t = testData.as('a, 'b, 'c)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
index 44752b0..ea5c275 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/JoinITCase.scala
@@ -54,8 +54,6 @@ class JoinITCase extends BatchTestBase {
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  // TODO
-  @Ignore("Type question, should be fixed later.")
   @Test
   def testJoin1(): Unit = {
     val ds1 = CollectionBatchExecTable.getSmall3TupleDataSet(tEnv, "a, b, c")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index 612f735..adeaa8c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -248,8 +248,6 @@ class CalcITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     assertEquals(expected.sorted, sink.getAppendResults.sorted)
   }
 
-  // TODO
-  @Ignore("Type question, should be fixed later.")
   @Test
   def testMultipleUserDefinedFunctions(): Unit = {
     tEnv.registerFunction("RichFunc1", new RichFunc1)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
index 07373e3..413efa1 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/JoinITCase.scala
@@ -241,8 +241,6 @@ class JoinITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode
     // Proctime window output uncertain results, so assert has been ignored here.
   }
 
-  // TODO
-  @Ignore("Type question, should be fixed later.")
   @Test
   def testInnerJoin(): Unit = {
     val ds1 = env.fromCollection(smallTupleData3).toTable(tEnv, 'a, 'b, 'c)
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
index e0c9ebb..26d4dec 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.types;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.MapTypeInfo;
 import org.apache.flink.api.java.typeutils.MultisetTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
@@ -40,6 +41,10 @@ import org.apache.flink.table.typeutils.BinaryStringTypeInfo;
 import org.apache.flink.table.typeutils.DecimalTypeInfo;
 import org.apache.flink.table.typeutils.MapViewTypeInfo;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
 import static org.apache.flink.table.types.PlannerTypeUtils.isPrimitive;
@@ -61,10 +66,32 @@ import static org.apache.flink.table.types.PlannerTypeUtils.isPrimitive;
  */
 @Deprecated
 public class TypeInfoDataTypeConverter {
+	private static final Map<String, TypeInformation<?>> primitiveDataTypeTypeInfoMap = new HashMap<>();
+
+	static {
+		addDefaultTypeInfo(boolean.class, Types.BOOLEAN);
+		addDefaultTypeInfo(byte.class, Types.BYTE);
+		addDefaultTypeInfo(short.class, Types.SHORT);
+		addDefaultTypeInfo(int.class, Types.INT);
+		addDefaultTypeInfo(long.class, Types.LONG);
+		addDefaultTypeInfo(float.class, Types.FLOAT);
+		addDefaultTypeInfo(double.class, Types.DOUBLE);
+	}
+
+	private static void addDefaultTypeInfo(Class<?> clazz, TypeInformation<?> typeInformation) {
+		Preconditions.checkArgument(clazz.isPrimitive());
+		primitiveDataTypeTypeInfoMap.put(clazz.getName(), typeInformation);
+	}
 
 	public static TypeInformation<?> fromDataTypeToTypeInfo(DataType dataType) {
-		LogicalType logicalType = fromDataTypeToLogicalType(dataType);
 		Class<?> clazz = dataType.getConversionClass();
+		if (clazz.isPrimitive()) {
+			final TypeInformation<?> foundTypeInfo = primitiveDataTypeTypeInfoMap.get(clazz.getName());
+			if (foundTypeInfo != null) {
+				return foundTypeInfo;
+			}
+		}
+		LogicalType logicalType = fromDataTypeToLogicalType(dataType);
 		switch (logicalType.getTypeRoot()) {
 			case DECIMAL:
 				DecimalType decimalType = (DecimalType) logicalType;