You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/04/17 19:53:31 UTC

flink git commit: [FLINK-8366] [table] Fix UpsertTableSink tests.

Repository: flink
Updated Branches:
  refs/heads/release-1.5 c7e915fe7 -> bea431f13


[FLINK-8366] [table] Fix UpsertTableSink tests.

This closes #5244.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bea431f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bea431f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bea431f1

Branch: refs/heads/release-1.5
Commit: bea431f131c52f636881e86dee2fb195ab56db9e
Parents: c7e915f
Author: 军长 <he...@alibaba-inc.com>
Authored: Fri Jan 5 16:53:31 2018 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Apr 17 21:53:19 2018 +0200

----------------------------------------------------------------------
 .../table/runtime/stream/table/TableSinkITCase.scala     | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bea431f1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index bda823e..3085c36 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -63,7 +63,7 @@ class TableSinkITCase extends AbstractTestBase {
     val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
     tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
 
-    val results = input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
+    input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
       .where('a < 3 || 'a > 19)
       .select('c, 't, 'b)
       .insertInto("targetTable")
@@ -96,7 +96,7 @@ class TableSinkITCase extends AbstractTestBase {
       .assignAscendingTimestamps(_._2)
       .map(x => x).setParallelism(4) // increase DOP to 4
 
-    val results = input.toTable(tEnv, 'a, 'b.rowtime, 'c)
+    input.toTable(tEnv, 'a, 'b.rowtime, 'c)
       .where('a < 5 || 'a > 17)
       .select('c, 'b)
       .writeToSink(new CsvTableSink(path))
@@ -677,11 +677,10 @@ object RowCollector {
   /** Converts a list of upsert messages into a list of final results. */
   def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): List[String] = {
 
-    def getKeys(r: Row): List[String] =
-      keys.foldLeft(List[String]())((k, i) => r.getField(i).toString :: k)
+    def getKeys(r: Row): Row = Row.project(r, keys)
 
-    val upserted = results.foldLeft(Map[String, String]()){ (o: Map[String, String], r) =>
-      val key = getKeys(r.f1).mkString("")
+    val upserted = results.foldLeft(Map[Row, String]()){ (o: Map[Row, String], r) =>
+      val key = getKeys(r.f1)
       if (r.f0) {
         o + (key -> r.f1.toString)
       } else {