You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2020/06/10 09:44:59 UTC

[flink] branch release-1.11 updated: [FLINK-17944][sql-client] Wrong output in SQL Client's table mode

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new d36a2b3  [FLINK-17944][sql-client] Wrong output in SQL Client's table mode
d36a2b3 is described below

commit d36a2b3f02e9e31ab61d296ae76c3cffd48b4569
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue May 26 22:01:39 2020 +0800

    [FLINK-17944][sql-client] Wrong output in SQL Client's table mode
    
    This is a temporary workaround until we don't use Tuple2<Boolean, Row>
    to represent changelogs anymore.
    
    This closes #12346.
---
 .../local/result/MaterializedCollectStreamResult.java     |  5 +++++
 .../local/result/MaterializedCollectStreamResultTest.java | 15 ++++++++-------
 2 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
index 40dd676..2aec400 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import java.net.InetAddress;
 import java.util.ArrayList;
@@ -186,6 +187,10 @@ public class MaterializedCollectStreamResult<C> extends CollectStreamResult<C> i
 	@Override
 	protected void processRecord(Tuple2<Boolean, Row> change) {
 		synchronized (resultLock) {
+			// Always set the RowKind to INSERT, so that we can compare rows correctly (RowKind will be ignored),
+			// just use the Boolean of Tuple2<Boolean, Row> to figure out whether it is insert or delete.
+			change.f1.setKind(RowKind.INSERT);
+
 			// insert
 			if (change.f0) {
 				processInsert(change.f1);
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
index 0abd0d5..e31372d 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResultTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 
 import org.junit.Test;
 
@@ -57,10 +58,10 @@ public class MaterializedCollectStreamResultTest {
 
 			result.isRetrieving = true;
 
-			result.processRecord(Tuple2.of(true, Row.of("A", 1)));
-			result.processRecord(Tuple2.of(true, Row.of("B", 1)));
-			result.processRecord(Tuple2.of(true, Row.of("A", 1)));
-			result.processRecord(Tuple2.of(true, Row.of("C", 2)));
+			result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "A", 1)));
+			result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "B", 1)));
+			result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "A", 1)));
+			result.processRecord(Tuple2.of(true, Row.ofKind(RowKind.INSERT, "C", 2)));
 
 			assertEquals(TypedResult.payload(4), result.snapshot(1));
 
@@ -69,7 +70,7 @@ public class MaterializedCollectStreamResultTest {
 			assertEquals(Collections.singletonList(Row.of("A", 1)), result.retrievePage(3));
 			assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(4));
 
-			result.processRecord(Tuple2.of(false, Row.of("A", 1)));
+			result.processRecord(Tuple2.of(false, Row.ofKind(RowKind.UPDATE_BEFORE, "A", 1)));
 
 			assertEquals(TypedResult.payload(3), result.snapshot(1));
 
@@ -77,8 +78,8 @@ public class MaterializedCollectStreamResultTest {
 			assertEquals(Collections.singletonList(Row.of("B", 1)), result.retrievePage(2));
 			assertEquals(Collections.singletonList(Row.of("C", 2)), result.retrievePage(3));
 
-			result.processRecord(Tuple2.of(false, Row.of("C", 2)));
-			result.processRecord(Tuple2.of(false, Row.of("A", 1)));
+			result.processRecord(Tuple2.of(false, Row.ofKind(RowKind.UPDATE_BEFORE, "C", 2)));
+			result.processRecord(Tuple2.of(false, Row.ofKind(RowKind.UPDATE_BEFORE, "A", 1)));
 
 			assertEquals(TypedResult.payload(1), result.snapshot(1));