You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/29 12:09:18 UTC

[GitHub] [flink] lincoln-lil opened a new pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

lincoln-lil opened a new pull request #17605:
URL: https://github.com/apache/flink/pull/17605


   ## What is the purpose of the change
   
   Fix the exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction
   
   ## Verifying this change
   
   This change is already covered by RankHarnessTest
   
   ## Does this pull request potentially affect one of the following parts:
   
       - Dependencies (does it add or upgrade a dependency): (no)
       - The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
       - The serializers: (no )
       - The runtime per-record code paths (performance sensitive): (no)
       - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
       - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "955692359",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 2e9cea2efeb7e9f4722275a20f8551f27833fd95 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 10ca9cf225390e479797cac385c89c3b45eb2cb1 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 10ca9cf225390e479797cac385c89c3b45eb2cb1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "955692359",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "958731075",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 0056d2cc65fb1d0ff971c0394a32fd65972ff148 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "955692359",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "958731075",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 0056d2cc65fb1d0ff971c0394a32fd65972ff148 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r741602604



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -75,6 +77,13 @@
     private final InternalTypeInfo<RowData> rowKeyType;
     private final long cacheSize;
 
+    // flag to skip records with non-exist error instead to fail, true by default.
+    private final boolean lenient = true;
+
+    // data converter for logging only.
+    private final DataStructureConverter rowConverter;

Review comment:
       Good catch! transient is better.

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +355,46 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newSortKey,
+            RowData newRow,
+            int newRank,
+            RowData oldSortKey,
+            RankRow oldRow,
+            int oldRank,
+            Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            RowData curSortKey = entry.getKey();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        checkArgument(0 == sortKeyComparator.compare(curSortKey, oldSortKey));

Review comment:
       It's a wrong check here, and  it's unnecessary after think it over, so I'll remove it.

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -75,6 +77,13 @@
     private final InternalTypeInfo<RowData> rowKeyType;
     private final long cacheSize;
 
+    // flag to skip records with non-exist error instead to fail, true by default.
+    private final boolean lenient = true;
+
+    // data converter for logging only.
+    private final DataStructureConverter rowConverter;

Review comment:
       Good catch! transient is better.

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +355,46 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newSortKey,
+            RowData newRow,
+            int newRank,
+            RowData oldSortKey,
+            RankRow oldRow,
+            int oldRank,
+            Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            RowData curSortKey = entry.getKey();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        checkArgument(0 == sortKeyComparator.compare(curSortKey, oldSortKey));

Review comment:
       It's a wrong check here, and  it's unnecessary after think it over, so I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c208976e1df1e97ab7951395e8be0c1f205a170a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643) 
   * 2e9cea2efeb7e9f4722275a20f8551f27833fd95 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r741602604



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -75,6 +77,13 @@
     private final InternalTypeInfo<RowData> rowKeyType;
     private final long cacheSize;
 
+    // flag to skip records with non-exist error instead to fail, true by default.
+    private final boolean lenient = true;
+
+    // data converter for logging only.
+    private final DataStructureConverter rowConverter;

Review comment:
       Good catch! transient is better.

##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +355,46 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newSortKey,
+            RowData newRow,
+            int newRank,
+            RowData oldSortKey,
+            RankRow oldRow,
+            int oldRank,
+            Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            RowData curSortKey = entry.getKey();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        checkArgument(0 == sortKeyComparator.compare(curSortKey, oldSortKey));

Review comment:
       It's a wrong check here, and  it's unnecessary after think it over, so I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil commented on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-958731075






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #17605:
URL: https://github.com/apache/flink/pull/17605


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 10ca9cf225390e479797cac385c89c3b45eb2cb1 Azure: [CANCELED](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641) 
   * c208976e1df1e97ab7951395e8be0c1f205a170a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954909178


    Some tests failed, I'll fix it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2e9cea2efeb7e9f4722275a20f8551f27833fd95 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tsreaper commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r741595404



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +355,46 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newSortKey,
+            RowData newRow,
+            int newRank,
+            RowData oldSortKey,
+            RankRow oldRow,
+            int oldRank,
+            Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            RowData curSortKey = entry.getKey();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        checkArgument(0 == sortKeyComparator.compare(curSortKey, oldSortKey));

Review comment:
       Why is this true? It might be that the `oldSortKey` is unique and after the change it does not exist anymore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "955692359",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "958731075",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "c16aa625ec99f0e29bc9d768f027c9ee993b1b25",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c16aa625ec99f0e29bc9d768f027c9ee993b1b25",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0056d2cc65fb1d0ff971c0394a32fd65972ff148 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838) 
   * c16aa625ec99f0e29bc9d768f027c9ee993b1b25 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #17605:
URL: https://github.com/apache/flink/pull/17605


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot commented on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954692477


   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 10ca9cf225390e479797cac385c89c3b45eb2cb1 (Fri Oct 29 12:11:53 UTC 2021)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
    * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-24704).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work.
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "955692359",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2e9cea2efeb7e9f4722275a20f8551f27833fd95 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653) 
   * 0056d2cc65fb1d0ff971c0394a32fd65972ff148 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r742502276



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +357,37 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newRow, int newRank, RankRow oldRow, int oldRank, Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        collectUpdateBefore(out, oldRow.row, oldRank);
+                    } else {
+                        collectUpdateBefore(out, prevRow, currentRank);
+                        collectUpdateAfter(out, prevRow, currentRank - 1);
+                        if (currentRank == newRank) {
+                            collectUpdateAfter(out, newRow, currentRank);
+                        }
+                    }
+                }
+                prevRow = currentRow;
+            }
+        }
+    }

Review comment:
       Cool!  It's more simpler and readable.  A small change is to emit the UB of old row before the following changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r742500845



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +357,37 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newRow, int newRank, RankRow oldRow, int oldRank, Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        collectUpdateBefore(out, oldRow.row, oldRank);
+                    } else {
+                        collectUpdateBefore(out, prevRow, currentRank);
+                        collectUpdateAfter(out, prevRow, currentRank - 1);
+                        if (currentRank == newRank) {
+                            collectUpdateAfter(out, newRow, currentRank);
+                        }
+                    }
+                }

Review comment:
       Good catch!  will add case to cover it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "955692359",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 2e9cea2efeb7e9f4722275a20f8551f27833fd95 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "955692359",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "958731075",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 0056d2cc65fb1d0ff971c0394a32fd65972ff148 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tsreaper commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r741595404



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +355,46 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newSortKey,
+            RowData newRow,
+            int newRank,
+            RowData oldSortKey,
+            RankRow oldRow,
+            int oldRank,
+            Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            RowData curSortKey = entry.getKey();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        checkArgument(0 == sortKeyComparator.compare(curSortKey, oldSortKey));

Review comment:
       Why is this true? It might be that the `oldSortKey` is unique and after the change it does not exist anymore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi merged pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
JingsongLi merged pull request #17605:
URL: https://github.com/apache/flink/pull/17605


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tsreaper commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r742497844



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +357,37 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newRow, int newRank, RankRow oldRow, int oldRank, Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        collectUpdateBefore(out, oldRow.row, oldRank);
+                    } else {
+                        collectUpdateBefore(out, prevRow, currentRank);
+                        collectUpdateAfter(out, prevRow, currentRank - 1);
+                        if (currentRank == newRank) {
+                            collectUpdateAfter(out, newRow, currentRank);
+                        }
+                    }
+                }
+                prevRow = currentRow;
+            }
+        }
+    }

Review comment:
       This piece of algorithm seems awkward to me. Consider modifying it to:
   ```java
   while (iterator.hasNext() && currentRank < newRank) {
       // ...
       while (rowKeyIter.hasNext()) {
           // ...
           if (oldRank <= currentRank) {
               collectUpdateBefore(out, currentRow, currentRank);
               collectUpdateAfter(out, currentRow, currentRank - 1);
           }
       }
   }
   collectUpdateBefore(out, oldRow.row, oldRank);
   collectUpdateAfter(out, newRow, newRank);
   ```
   so that there is no `prevRow` thingy. It is misleading to see a `prevRow` and a `currentRank` sending within the same message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r741566910



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -75,6 +77,13 @@
     private final InternalTypeInfo<RowData> rowKeyType;
     private final long cacheSize;
 
+    // flag to skip records with non-exist error instead to fail, true by default.
+    private final boolean lenient = true;
+
+    // data converter for logging only.
+    private final DataStructureConverter rowConverter;

Review comment:
       This will break compatibility. We can just introduce two transient fields.

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
##########
@@ -220,4 +221,75 @@ class RankHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode) {
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
     testHarness.close()
   }
+
+  @Test
+  def testUpdateRankWithRowNumber(): Unit = {
+    val data = new mutable.MutableList[(String, Int, Int)]
+    val t = env.fromCollection(data).toTable(tEnv, 'word, 'cnt, 'type)
+    tEnv.createTemporaryView("T", t)
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
+
+    val sql =
+      """
+        |SELECT word, cnt, rank_num
+        |FROM (
+        |  SELECT word, cnt,
+        |      ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
+        |  FROM (
+        |     select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by word, type
+        |   )
+        |  )
+        |WHERE rank_num <= 6
+      """.stripMargin
+
+    val t1 = tEnv.sqlQuery(sql)
+
+    val testHarness = createHarnessTester(
+      t1.toRetractStream[Row],
+      "Rank(strategy=[UpdateFastStrategy")
+    val assertor = new RowDataHarnessAssertor(
+      Array(
+        DataTypes.STRING().getLogicalType,
+        DataTypes.INT().getLogicalType,
+        DataTypes.INT().getLogicalType,
+        DataTypes.BIGINT().getLogicalType))
+
+    testHarness.open()
+
+    testHarness.processElement(binaryRecord(INSERT, "a", 1: JInt, 100: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "b", 1: JInt, 90: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "c", 1: JInt, 90: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "d", 1: JInt, 80: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "e", 1: JInt, 80: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "f", 1: JInt, 70: JInt))
+
+    testHarness.processElement(binaryRecord(UPDATE_AFTER, "b", 1: JInt, 10: JInt))
+

Review comment:
       Just left one line is OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tsreaper commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r741595404



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +355,46 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newSortKey,
+            RowData newRow,
+            int newRank,
+            RowData oldSortKey,
+            RankRow oldRow,
+            int oldRank,
+            Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            RowData curSortKey = entry.getKey();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        checkArgument(0 == sortKeyComparator.compare(curSortKey, oldSortKey));

Review comment:
       Why is this true? It might be that the `oldSortKey` is unique and after the change it does not exist anymore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "955692359",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0056d2cc65fb1d0ff971c0394a32fd65972ff148 Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tsreaper commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r742497844



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +357,37 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newRow, int newRank, RankRow oldRow, int oldRank, Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        collectUpdateBefore(out, oldRow.row, oldRank);
+                    } else {
+                        collectUpdateBefore(out, prevRow, currentRank);
+                        collectUpdateAfter(out, prevRow, currentRank - 1);
+                        if (currentRank == newRank) {
+                            collectUpdateAfter(out, newRow, currentRank);
+                        }
+                    }
+                }
+                prevRow = currentRow;
+            }
+        }
+    }

Review comment:
       This piece of algorithm seems awkward to me. Consider modifying it to:
   ```java
   while (iterator.hasNext() && currentRank < newRank) {
       // ...
       while (rowKeyIter.hasNext()) {
           // ...
           if (oldRank <= currentRank) {
               collectUpdateBefore(out, currentRow, currentRank + 1);
               collectUpdateAfter(out, currentRow, currentRank);
           }
       }
   }
   collectUpdateBefore(out, oldRow.row, oldRank);
   collectUpdateAfter(out, newRow, newRank);
   ```
   so that there is no `prevRow` thingy. It is misleading to see a `prevRow` and a `currentRank` sending within the same message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tsreaper commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r742492674



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +357,37 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newRow, int newRank, RankRow oldRow, int oldRank, Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        collectUpdateBefore(out, oldRow.row, oldRank);
+                    } else {
+                        collectUpdateBefore(out, prevRow, currentRank);
+                        collectUpdateAfter(out, prevRow, currentRank - 1);
+                        if (currentRank == newRank) {
+                            collectUpdateAfter(out, newRow, currentRank);
+                        }
+                    }
+                }

Review comment:
       What if `newSortKey > oldSortKey` but `oldRank == currentRank`? The update after message will be lost. Please also add a test about this after fixing it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 10ca9cf225390e479797cac385c89c3b45eb2cb1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641) 
   * c208976e1df1e97ab7951395e8be0c1f205a170a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil commented on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954909178


   Tests failure because runtime context not initialized, I'll fix it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "955692359",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "958731075",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "c16aa625ec99f0e29bc9d768f027c9ee993b1b25",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25905",
       "triggerID" : "c16aa625ec99f0e29bc9d768f027c9ee993b1b25",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c16aa625ec99f0e29bc9d768f027c9ee993b1b25 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25905) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r741603426



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +355,46 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newSortKey,
+            RowData newRow,
+            int newRank,
+            RowData oldSortKey,
+            RankRow oldRow,
+            int oldRank,
+            Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            RowData curSortKey = entry.getKey();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        checkArgument(0 == sortKeyComparator.compare(curSortKey, oldSortKey));

Review comment:
       It's a wrong check here, and  it's unnecessary after think it over, so I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tsreaper commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r741595404



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +355,46 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newSortKey,
+            RowData newRow,
+            int newRank,
+            RowData oldSortKey,
+            RankRow oldRow,
+            int oldRank,
+            Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            RowData curSortKey = entry.getKey();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        checkArgument(0 == sortKeyComparator.compare(curSortKey, oldSortKey));

Review comment:
       Why is this true? It might be that the `oldSortKey` is unique and after the change it does not exist anymore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r742502402



##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
##########
@@ -220,4 +221,74 @@ class RankHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode) {
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
     testHarness.close()
   }
+
+  @Test
+  def testUpdateRankWithRowNumber(): Unit = {

Review comment:
       Thanks for your review, I'll update it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c208976e1df1e97ab7951395e8be0c1f205a170a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil commented on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-958731075


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r741566910



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -75,6 +77,13 @@
     private final InternalTypeInfo<RowData> rowKeyType;
     private final long cacheSize;
 
+    // flag to skip records with non-exist error instead to fail, true by default.
+    private final boolean lenient = true;
+
+    // data converter for logging only.
+    private final DataStructureConverter rowConverter;

Review comment:
       This will break compatibility. We can just introduce two transient fields.

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
##########
@@ -220,4 +221,75 @@ class RankHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode) {
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
     testHarness.close()
   }
+
+  @Test
+  def testUpdateRankWithRowNumber(): Unit = {
+    val data = new mutable.MutableList[(String, Int, Int)]
+    val t = env.fromCollection(data).toTable(tEnv, 'word, 'cnt, 'type)
+    tEnv.createTemporaryView("T", t)
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
+
+    val sql =
+      """
+        |SELECT word, cnt, rank_num
+        |FROM (
+        |  SELECT word, cnt,
+        |      ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
+        |  FROM (
+        |     select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by word, type
+        |   )
+        |  )
+        |WHERE rank_num <= 6
+      """.stripMargin
+
+    val t1 = tEnv.sqlQuery(sql)
+
+    val testHarness = createHarnessTester(
+      t1.toRetractStream[Row],
+      "Rank(strategy=[UpdateFastStrategy")
+    val assertor = new RowDataHarnessAssertor(
+      Array(
+        DataTypes.STRING().getLogicalType,
+        DataTypes.INT().getLogicalType,
+        DataTypes.INT().getLogicalType,
+        DataTypes.BIGINT().getLogicalType))
+
+    testHarness.open()
+
+    testHarness.processElement(binaryRecord(INSERT, "a", 1: JInt, 100: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "b", 1: JInt, 90: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "c", 1: JInt, 90: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "d", 1: JInt, 80: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "e", 1: JInt, 80: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "f", 1: JInt, 70: JInt))
+
+    testHarness.processElement(binaryRecord(UPDATE_AFTER, "b", 1: JInt, 10: JInt))
+

Review comment:
       Just left one line is OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tsreaper commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r742494069



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -312,6 +357,37 @@ private void processElementWithRowNumber(RowData inputRow, Collector<RowData> ou
                 "Failed to find the sortKey, rowkey in the buffer. This should never happen");
     }
 
+    private void emitRecordsWithRowNumberIgnoreStateError(
+            RowData newRow, int newRank, RankRow oldRow, int oldRank, Collector<RowData> out) {
+        Iterator<Map.Entry<RowData, Collection<RowData>>> iterator = buffer.entrySet().iterator();
+        int currentRank = 0;
+        RowData currentRow = null;
+        RowData prevRow = null;
+
+        while (iterator.hasNext() && currentRank <= newRank) {
+            Map.Entry<RowData, Collection<RowData>> entry = iterator.next();
+            Collection<RowData> rowKeys = entry.getValue();
+            Iterator<RowData> rowKeyIter = rowKeys.iterator();
+            while (rowKeyIter.hasNext()) {
+                RowData rowKey = rowKeyIter.next();
+                currentRank += 1;
+                currentRow = rowKeyMap.get(rowKey).row;
+                if (oldRank <= currentRank) {
+                    if (currentRank == oldRank) {
+                        collectUpdateBefore(out, oldRow.row, oldRank);
+                    } else {
+                        collectUpdateBefore(out, prevRow, currentRank);
+                        collectUpdateAfter(out, prevRow, currentRank - 1);
+                        if (currentRank == newRank) {
+                            collectUpdateAfter(out, newRow, currentRank);
+                        }
+                    }
+                }

Review comment:
       It seems that a more proper solution to this is not to send any message if `oldRank == currentRank`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil commented on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-955692359


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "955692359",
       "triggerType" : "MANUAL"
     } ]
   }-->
   ## CI report:
   
   * 2e9cea2efeb7e9f4722275a20f8551f27833fd95 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 10ca9cf225390e479797cac385c89c3b45eb2cb1 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641) 
   * c208976e1df1e97ab7951395e8be0c1f205a170a Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * c208976e1df1e97ab7951395e8be0c1f205a170a Azure: [FAILURE](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643) 
   * 2e9cea2efeb7e9f4722275a20f8551f27833fd95 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "955692359",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838",
       "triggerID" : "958731075",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "c16aa625ec99f0e29bc9d768f027c9ee993b1b25",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25905",
       "triggerID" : "c16aa625ec99f0e29bc9d768f027c9ee993b1b25",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0056d2cc65fb1d0ff971c0394a32fd65972ff148 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25838) 
   * c16aa625ec99f0e29bc9d768f027c9ee993b1b25 Azure: [PENDING](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25905) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] tsreaper commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
tsreaper commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r742501383



##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
##########
@@ -220,4 +221,74 @@ class RankHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode) {
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
     testHarness.close()
   }
+
+  @Test
+  def testUpdateRankWithRowNumber(): Unit = {

Review comment:
       Add more test scenarios.
   1. Calculate top 5 but there are more than 5 candidates.
   2. Sort key drops but ranking does not change.
   3. Sort key drops but does not drop to the last ranking.
   4. Calculate top 5, 7 candidates, previous rank 3 drops to rank 6 (but it is still "rank 5").




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil commented on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-958731075


   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] JingsongLi commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
JingsongLi commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r741566910



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -75,6 +77,13 @@
     private final InternalTypeInfo<RowData> rowKeyType;
     private final long cacheSize;
 
+    // flag to skip records with non-exist error instead to fail, true by default.
+    private final boolean lenient = true;
+
+    // data converter for logging only.
+    private final DataStructureConverter rowConverter;

Review comment:
       This will break compatibility. We can just introduce two transient fields.

##########
File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
##########
@@ -220,4 +221,75 @@ class RankHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode) {
     assertor.assertOutputEqualsSorted("result mismatch", expectedOutput, result)
     testHarness.close()
   }
+
+  @Test
+  def testUpdateRankWithRowNumber(): Unit = {
+    val data = new mutable.MutableList[(String, Int, Int)]
+    val t = env.fromCollection(data).toTable(tEnv, 'word, 'cnt, 'type)
+    tEnv.createTemporaryView("T", t)
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
+
+    val sql =
+      """
+        |SELECT word, cnt, rank_num
+        |FROM (
+        |  SELECT word, cnt,
+        |      ROW_NUMBER() OVER (PARTITION BY type ORDER BY cnt DESC) as rank_num
+        |  FROM (
+        |     select word, type, sum(cnt) filter (where cnt > 0) cnt from T group by word, type
+        |   )
+        |  )
+        |WHERE rank_num <= 6
+      """.stripMargin
+
+    val t1 = tEnv.sqlQuery(sql)
+
+    val testHarness = createHarnessTester(
+      t1.toRetractStream[Row],
+      "Rank(strategy=[UpdateFastStrategy")
+    val assertor = new RowDataHarnessAssertor(
+      Array(
+        DataTypes.STRING().getLogicalType,
+        DataTypes.INT().getLogicalType,
+        DataTypes.INT().getLogicalType,
+        DataTypes.BIGINT().getLogicalType))
+
+    testHarness.open()
+
+    testHarness.processElement(binaryRecord(INSERT, "a", 1: JInt, 100: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "b", 1: JInt, 90: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "c", 1: JInt, 90: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "d", 1: JInt, 80: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "e", 1: JInt, 80: JInt))
+    testHarness.processElement(binaryRecord(INSERT, "f", 1: JInt, 70: JInt))
+
+    testHarness.processElement(binaryRecord(UPDATE_AFTER, "b", 1: JInt, 10: JInt))
+

Review comment:
       Just left one line is OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] flinkbot edited a comment on pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on pull request #17605:
URL: https://github.com/apache/flink/pull/17605#issuecomment-954693023


   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25641",
       "triggerID" : "10ca9cf225390e479797cac385c89c3b45eb2cb1",
       "triggerType" : "PUSH"
     }, {
       "hash" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25643",
       "triggerID" : "c208976e1df1e97ab7951395e8be0c1f205a170a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "triggerType" : "PUSH"
     }, {
       "hash" : "2e9cea2efeb7e9f4722275a20f8551f27833fd95",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653",
       "triggerID" : "955692359",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0056d2cc65fb1d0ff971c0394a32fd65972ff148",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 2e9cea2efeb7e9f4722275a20f8551f27833fd95 Azure: [SUCCESS](https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=25653) 
   * 0056d2cc65fb1d0ff971c0394a32fd65972ff148 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [flink] lincoln-lil commented on a change in pull request #17605: [FLINK-24704][table] Fix exception when the input record loses monotonicity on the sort key field of UpdatableTopNFunction

Posted by GitBox <gi...@apache.org>.
lincoln-lil commented on a change in pull request #17605:
URL: https://github.com/apache/flink/pull/17605#discussion_r741602604



##########
File path: flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/UpdatableTopNFunction.java
##########
@@ -75,6 +77,13 @@
     private final InternalTypeInfo<RowData> rowKeyType;
     private final long cacheSize;
 
+    // flag to skip records with non-exist error instead to fail, true by default.
+    private final boolean lenient = true;
+
+    // data converter for logging only.
+    private final DataStructureConverter rowConverter;

Review comment:
       Good catch! transient is better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org