You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/07/24 08:01:47 UTC

[flink] branch release-1.11 updated: [FLINK-18632][table-planner-blink] Assign the missing RowKind when toRetractStream with POJO type

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new f8b98cb  [FLINK-18632][table-planner-blink] Assign the missing RowKind when toRetractStream with POJO type
f8b98cb is described below

commit f8b98cbc4e10e69ff0ea08388224f4994de474d7
Author: lzy3261944 <50...@qq.com>
AuthorDate: Fri Jul 24 15:20:59 2020 +0800

    [FLINK-18632][table-planner-blink] Assign the missing RowKind when toRetractStream with POJO type
    
    Co-authored-by: luoziyu <zi...@narvii.com>
    
    This closes #12955
---
 .../table/planner/codegen/SinkCodeGenerator.scala  |  1 +
 .../stream/sql/StreamTableEnvironmentITCase.scala  | 23 ++++++++++++++++++++++
 2 files changed, 24 insertions(+)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
index efe906b..f46bd7c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/SinkCodeGenerator.scala
@@ -94,6 +94,7 @@ object SinkCodeGenerator {
         afterIndexModify = CodeGenUtils.newName("afterIndexModify")
         s"""
            |${conversion.code}
+           |${conversion.resultTerm}.setRowKind(${inputTerm}.getRowKind());
            |${classOf[RowData].getCanonicalName} $afterIndexModify = ${conversion.resultTerm};
            |""".stripMargin
       case _ =>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
index 3248965..1282aba 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
@@ -114,4 +114,27 @@ class StreamTableEnvironmentITCase extends StreamingTestBase {
       "(true,Person{name='Jack', age=3})")
     assertEquals(expected.sorted, sink.getResults.sorted)
   }
+
+  @Test
+  def testRetractMsgWithPojoType(): Unit = {
+    val orders = env.fromCollection(Seq(
+      new Order(1L, new ProductItem("beer", 10L), 1),
+      new Order(1L, new ProductItem("beer", 10L), 2)
+    ))
+
+    val table = tEnv.fromDataStream(orders, 'user, 'product, 'amount)
+
+    val sink = new StringSink[(Boolean, Order)]()
+    tEnv.sqlQuery(s"""|SELECT user, product, sum(amount) as amount
+                      |FROM $table
+                      |GROUP BY user, product
+                      |""".stripMargin).toRetractStream[Order].addSink(sink)
+    env.execute()
+
+    val expected = List(
+      "(true,Order{user=1, product='Product{name='beer', id=10}', amount=1})",
+      "(false,Order{user=1, product='Product{name='beer', id=10}', amount=1})",
+      "(true,Order{user=1, product='Product{name='beer', id=10}', amount=3})")
+    assertEquals(expected.sorted, sink.getResults.sorted)
+  }
 }