You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:30:37 UTC

[GitHub] [flink] zjffdu opened a new pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

zjffdu opened a new pull request #12346:
URL: https://github.com/apache/flink/pull/12346


   ## What is the purpose of the change
   
   This PR fix the issue of wrong output in sql client's table mode of stream output.
   
   ## Brief change log
   
   This PR add a new method `fieldsEquals` in class `Row`, and use this method to compare rows in sql-client. 
   
   
   ## Verifying this change
   
   Verify it manully, the output is this sql statement is now correct.
   ```
   SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;
   ```
   
   Output
   ```
   Alice                         1
   Greg                         1
   Bob                          2
   ```
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: ( no)
     - The serializers: ( no )
     - The runtime per-record code paths (performance sensitive): (no )
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: ( no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? ( no)
     - If yes, how is the feature documented? (not applicable )
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zjffdu commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
zjffdu commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r434371197



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       @godfreyhe Is #12199 going to be merged in 1.11 ? For me, this is an api chagne, it would be better to stablize it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r434378233



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       @zjffdu , I want to merge https://github.com/apache/flink/pull/12199 into 1.11 [said], but 1.11 is in  feature freeze since last week. This pr will only be merged into master.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r436510285



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       No, just remove the flag that is encoded in `Row` and purely depend on the `Boolean` of `Tuple2`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r434346939



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       @zjffdu I looked into this topic again in depth. I think your fix is not enough. Because `rowPositionCache.remove(row);` needs to ignore the row kind. I wonder if we can fix the problem in a different layer. @godfreyhe how about we erase the row kind for `Tuple2<Boolean, Row>`? In the future, we should have a `tableEnv.toDataStream(ChangelogMode): Row` so the `Tuple2` case will not be needed. Until then we should not encode the deletion twice in the tuple and in the row. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zjffdu commented on pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
zjffdu commented on pull request #12346:
URL: https://github.com/apache/flink/pull/12346#issuecomment-634055640






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r436704980



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       Sorry, maybe I was not specific enough. I didn't mean to remove the field but I meant reseting the value of `RowKind`. All rows returned by `Tuple2<Boolean, Row>` should have `Row.ofKind(RowKind.INSERT, ...)`. And users should rely on the boolean flag instead. I hope this makes it clearer? otherwise I can also assign this ticket to myself to speed things up?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r434498685



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       Then we should just erase the changeflag in `Tuple2<Boolean, Row>` as fix for 1.11. @zjffdu  can you update this PR?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r436568215



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       This field is valid in most locations. Just not for `Tuple2<Boolean, Row>` generation, these path should be touched. Not the other non-legacy paths. Where is `Tuple2<>` created? there is some MapFunction in the code base doing this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr closed pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
twalthr closed pull request #12346:
URL: https://github.com/apache/flink/pull/12346


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
twalthr commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r436568215



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       This field is valid in most locations. Just not for `Tuple2<Boolean, Row>` generation, these path should be touched. Not the other non-legacy paths. Where is `Tuple2<>` created, there is some MapFunction in the code base doing this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #12346:
URL: https://github.com/apache/flink/pull/12346#issuecomment-634057278






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r434354823



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       @twalthr for now, we can erase the row kind for `Tuple2<Boolean, Row>`. After https://github.com/apache/flink/pull/12199 merged, I want to refactor the select logic in sql client. Then `Tuple2` is not needed, we can use `Row` for all cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zjffdu commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
zjffdu commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r437868981



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       ping @twalthr 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zjffdu commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
zjffdu commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r435045007



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       @twalthr Do you mean to change `Tuple2<Boolean, Row>` to `Row` ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zjffdu commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
zjffdu commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r436532416



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       @twalthr  Is it safe to do that ? I see many places use this field.
   ![image](https://user-images.githubusercontent.com/164491/84008773-e2060f00-a9a4-11ea-8f10-0b239a183d49.png)
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12346:
URL: https://github.com/apache/flink/pull/12346#issuecomment-634065440






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zjffdu commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
zjffdu commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r436745610



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       @twalthr Thanks for the clarification, I have updated the PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] zjffdu commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
zjffdu commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r436577175



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       Sorry @twalthr I don't understand, if this fields is valid and necessary, then how can I remove this field in `Row`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #12346:
URL: https://github.com/apache/flink/pull/12346#issuecomment-634065440






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] twalthr commented on pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
twalthr commented on pull request #12346:
URL: https://github.com/apache/flink/pull/12346#issuecomment-641854266


   Thanks @zjffdu. I will add a test and merge this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r434378233



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       @zjffdu , I want to merge https://github.com/apache/flink/pull/12199 into 1.11, but 1.11 is in  feature freeze since last week. This pr will only be merged into master.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] godfreyhe commented on a change in pull request #12346: [FLINK-17944][sql-client] Wrong output in sql client's table mode

Posted by GitBox <gi...@apache.org>.
godfreyhe commented on a change in pull request #12346:
URL: https://github.com/apache/flink/pull/12346#discussion_r434354823



##########
File path: flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/result/MaterializedCollectStreamResult.java
##########
@@ -224,7 +224,7 @@ private void processDelete(Row row) {
 		}
 
 		for (int i = startSearchPos; i >= validRowPosition; i--) {
-			if (materializedTable.get(i).equals(row)) {
+			if (materializedTable.get(i).fieldsEquals(row)) {
 				materializedTable.remove(i);
 				rowPositionCache.remove(row);

Review comment:
       @twalthr for now, we can erase the row kind for `Tuple2<Boolean, Row>`. After https://github.com/apache/flink/pull/12199 merged, I want to refactor the select logic in sql client based on TableResult#collect(). Then `Tuple2` is not needed, we can use `Row` for all cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org