You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/02 02:09:06 UTC
[flink] 02/03: [FLINK-13290][table-planner-blink] SinkCodeGenerator
should not compare row type field names
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5b3b1890ea55b7791cb13d9a17551e48d3cbd567
Author: Jark Wu <im...@gmail.com>
AuthorDate: Tue Jul 30 18:17:01 2019 +0800
[FLINK-13290][table-planner-blink] SinkCodeGenerator should not compare row type field names
---
.../table/planner/codegen/SinkCodeGenerator.scala | 13 +++++-----
.../runtime/stream/table/TableSinkITCase.scala | 30 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 6 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index e972ad7..21ea4f6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -34,6 +34,7 @@ import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory
import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo
import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.areTypesCompatible
import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
import org.apache.flink.types.Row
@@ -108,11 +109,10 @@ object SinkCodeGenerator {
val inputTerm = CodeGenUtils.DEFAULT_INPUT1_TERM
var afterIndexModify = inputTerm
val fieldIndexProcessCode =
- if (getCompositeTypes(convertOutputType).map(fromTypeInfoToLogicalType) sameElements
- inputTypeInfo.getFieldTypes.map(fromTypeInfoToLogicalType)) {
+ if (!resultType.isInstanceOf[PojoTypeInfo[_]]) {
""
} else {
- // field index change (pojo)
+ // field index may change (pojo)
val mapping = convertOutputType match {
case ct: CompositeType[_] => ct.getFieldNames.map {
name =>
@@ -223,9 +223,10 @@ object SinkCodeGenerator {
case (fieldTypeInfo, i) =>
val requestedTypeInfo = tt.getTypeAt(i)
validateFieldType(requestedTypeInfo)
- if (fromTypeInfoToLogicalType(fieldTypeInfo) !=
- fromTypeInfoToLogicalType(requestedTypeInfo) &&
- !requestedTypeInfo.isInstanceOf[GenericTypeInfo[Object]]) {
+ if (!areTypesCompatible(
+ fromTypeInfoToLogicalType(fieldTypeInfo),
+ fromTypeInfoToLogicalType(requestedTypeInfo)) &&
+ !requestedTypeInfo.isInstanceOf[GenericTypeInfo[Object]]) {
val fieldNames = tt.getFieldNames
throw new TableException(s"Result field '${fieldNames(i)}' does not match requested" +
s" type. Requested: $requestedTypeInfo; Actual: $fieldTypeInfo")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
index 0ab3480..c013308 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableSinkITCase.scala
@@ -151,6 +151,36 @@ class TableSinkITCase extends AbstractTestBase {
assertEquals(expected, result)
}
+
+ @Test
+ def testAppendSinkWithNestedRow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.getConfig.enableObjectReuse()
+ val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
+
+ val t = env.fromCollection(smallTupleData3)
+ .toTable(tEnv, 'id, 'num, 'text)
+ tEnv.registerTable("src", t)
+
+ val sink = new TestingAppendTableSink()
+ tEnv.registerTableSink(
+ "appendSink",
+ sink.configure(
+ Array[String]("t", "item"),
+ Array[TypeInformation[_]](Types.INT(), Types.ROW(Types.LONG, Types.STRING()))))
+
+ tEnv.sqlUpdate("INSERT INTO appendSink SELECT id, ROW(num, text) FROM src")
+
+ env.execute()
+
+ val result = sink.getAppendResults.sorted
+ val expected = List(
+ "1,1,Hi",
+ "2,2,Hello",
+ "3,2,Hello world").sorted
+ assertEquals(expected, result)
+ }
+
@Test
def testAppendSinkOnAppendTableForInnerJoin(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment