You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2018/04/17 19:53:31 UTC
flink git commit: [FLINK-8366] [table] Fix UpsertTableSink tests.
Repository: flink
Updated Branches:
refs/heads/release-1.5 c7e915fe7 -> bea431f13
[FLINK-8366] [table] Fix UpsertTableSink tests.
This closes #5244.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bea431f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bea431f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bea431f1
Branch: refs/heads/release-1.5
Commit: bea431f131c52f636881e86dee2fb195ab56db9e
Parents: c7e915f
Author: 军长 <he...@alibaba-inc.com>
Authored: Fri Jan 5 16:53:31 2018 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Apr 17 21:53:19 2018 +0200
----------------------------------------------------------------------
.../table/runtime/stream/table/TableSinkITCase.scala | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bea431f1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
index bda823e..3085c36 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala
@@ -63,7 +63,7 @@ class TableSinkITCase extends AbstractTestBase {
val sink = new MemoryTableSinkUtil.UnsafeMemoryAppendTableSink
tEnv.registerTableSink("targetTable", fieldNames, fieldTypes, sink)
- val results = input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
+ input.toTable(tEnv, 'a, 'b, 'c, 't.rowtime)
.where('a < 3 || 'a > 19)
.select('c, 't, 'b)
.insertInto("targetTable")
@@ -96,7 +96,7 @@ class TableSinkITCase extends AbstractTestBase {
.assignAscendingTimestamps(_._2)
.map(x => x).setParallelism(4) // increase DOP to 4
- val results = input.toTable(tEnv, 'a, 'b.rowtime, 'c)
+ input.toTable(tEnv, 'a, 'b.rowtime, 'c)
.where('a < 5 || 'a > 17)
.select('c, 'b)
.writeToSink(new CsvTableSink(path))
@@ -677,11 +677,10 @@ object RowCollector {
/** Converts a list of upsert messages into a list of final results. */
def upsertResults(results: List[JTuple2[JBool, Row]], keys: Array[Int]): List[String] = {
- def getKeys(r: Row): List[String] =
- keys.foldLeft(List[String]())((k, i) => r.getField(i).toString :: k)
+ def getKeys(r: Row): Row = Row.project(r, keys)
- val upserted = results.foldLeft(Map[String, String]()){ (o: Map[String, String], r) =>
- val key = getKeys(r.f1).mkString("")
+ val upserted = results.foldLeft(Map[Row, String]()){ (o: Map[Row, String], r) =>
+ val key = getKeys(r.f1)
if (r.f0) {
o + (key -> r.f1.toString)
} else {