You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/02 02:09:06 UTC

[flink] 02/03: [FLINK-13290][table-planner-blink] SinkCodeGenerator should not compare row type field names

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b3b1890ea55b7791cb13d9a17551e48d3cbd567
Author: Jark Wu <im...@gmail.com>
AuthorDate: Tue Jul 30 18:17:01 2019 +0800

    [FLINK-13290][table-planner-blink] SinkCodeGenerator should not compare row type field names
---
 .../table/planner/codegen/SinkCodeGenerator.scala  | 13 +++++-----
 .../runtime/stream/table/TableSinkITCase.scala     | 30 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 6 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index e972ad7..21ea4f6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -34,6 +34,7 @@ import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
 import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.areTypesCompatible
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.types.Row
@@ -108,11 +109,10 @@ object SinkCodeGenerator {
     val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
     var afterIndexModify = inputTerm
     val fieldIndexProcessCode =
-      if (getCompositeTypes(convertOutputType).map(fromTypeInfoToLogicalType) sameElements
-          inputTypeInfo.getFieldTypes.map(fromTypeInfoToLogicalType)) {
+      if (!resultType.isInstanceOf[PojoTypeInfo[_]]) {
         ""
       } else {
-        // field index change (pojo)
+        // field index may change (pojo)
         val mapping = convertOutputType match {
           case ct: CompositeType[_] => ct.getFieldNames.map {
             name =>
@@ -223,9 +223,10 @@ object SinkCodeGenerator {
           case (fieldTypeInfo, i) =>
             val requestedTypeInfo = tt.getTypeAt(i)
             validateFieldType(requestedTypeInfo)
-            if (fromTypeInfoToLogicalType(fieldTypeInfo) !=
-                fromTypeInfoToLogicalType(requestedTypeInfo) &&
-                !requestedTypeInfo.isInstanceOf[GenericTypeInfo[Object]]) {
+            if (!areTypesCompatible(
+              fromTypeInfoToLogicalType(fieldTypeInfo),
+              fromTypeInfoToLogicalType(requestedTypeInfo)) &&
+              !requestedTypeInfo.isInstanceOf[GenericTypeInfo[Object]]) {
               val fieldNames = tt.getFieldNames
               throw new TableException(s"Result field '${fieldNames(i)}' does not match requested" +
                   s" type. Requested: $requestedTypeInfo; Actual: $fieldTypeInfo")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index 0ab3480..c013308 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -151,6 +151,36 @@ class TableSinkITCase extends AbstractTestBase {
     assertEquals(expected, result)
   }
 
+
+  @Test
+  def testAppendSinkWithNestedRow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.getConfig.enableObjectReuse()
+    val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+
+    val t = env.fromCollection(smallTupleData3)
+      .toTable(tEnv, 'id, 'num, 'text)
+    tEnv.registerTable("src", t)
+
+    val sink = new TestingAppendTableSink()
+    tEnv.registerTableSink(
+      "appendSink",
+      sink.configure(
+        Array[String]("t", "item"),
+        Array[TypeInformation[_]](Types.INT(), Types.ROW(Types.LONG, Types.STRING()))))
+
+    tEnv.sqlUpdate("INSERT INTO appendSink SELECT id, ROW(num, text) FROM src")
+
+    env.execute()
+
+    val result = sink.getAppendResults.sorted
+    val expected = List(
+      "1,1,Hi",
+      "2,2,Hello",
+      "3,2,Hello world").sorted
+    assertEquals(expected, result)
+  }
+
   @Test
   def testAppendSinkOnAppendTableForInnerJoin(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment