You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/24 08:01:47 UTC
[flink] branch release-1.11 updated:
[FLINK-18632][table-planner-blink] Assign the missing RowKind when
toRetractStream with POJO type
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new f8b98cb [FLINK-18632][table-planner-blink] Assign the missing RowKind when toRetractStream with POJO type
f8b98cb is described below
commit f8b98cbc4e10e69ff0ea08388224f4994de474d7
Author: lzy3261944 <50...@qq.com>
AuthorDate: Fri Jul 24 15:20:59 2020 +0800
[FLINK-18632][table-planner-blink] Assign the missing RowKind when toRetractStream with POJO type
Co-authored-by: luoziyu <zi...@narvii.com>
This closes #12955
---
.../table/planner/codegen/SinkCodeGenerator.scala | 1 +
.../stream/sql/StreamTableEnvironmentITCase.scala | 23 ++++++++++++++++++++++
2 files changed, 24 insertions(+)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index efe906b..f46bd7c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -94,6 +94,7 @@ object SinkCodeGenerator {
afterIndexModify = CodeGenUtils.newName("afterIndexModify")
s"""
|${conversion.code}
+ |${conversion.resultTerm}.setRowKind(${inputTerm}.getRowKind());
|${classOf[RowData].getCanonicalName} $afterIndexModify = ${conversion.resultTerm};
|""".stripMargin
case _ =>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
index 3248965..1282aba 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
@@ -114,4 +114,27 @@ class StreamTableEnvironmentITCase extends StreamingTestBase {
"(true,Person{name='Jack', age=3})")
assertEquals(expected.sorted, sink.getResults.sorted)
}
+
+ @Test
+ def testRetractMsgWithPojoType(): Unit = {
+ val orders = env.fromCollection(Seq(
+ new Order(1L, new ProductItem("beer", 10L), 1),
+ new Order(1L, new ProductItem("beer", 10L), 2)
+ ))
+
+ val table = tEnv.fromDataStream(orders, 'user, 'product, 'amount)
+
+ val sink = new StringSink[(Boolean, Order)]()
+ tEnv.sqlQuery(s"""|SELECT user, product, sum(amount) as amount
+ |FROM $table
+ |GROUP BY user, product
+ |""".stripMargin).toRetractStream[Order].addSink(sink)
+ env.execute()
+
+ val expected = List(
+ "(true,Order{user=1, product='Product{name='beer', id=10}', amount=1})",
+ "(false,Order{user=1, product='Product{name='beer', id=10}', amount=1})",
+ "(true,Order{user=1, product='Product{name='beer', id=10}', amount=3})")
+ assertEquals(expected.sorted, sink.getResults.sorted)
+ }
}