You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/15 06:17:30 UTC

[GitHub] [spark] viirya opened a new pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

viirya opened a new pull request #35521:
URL: https://github.com/apache/spark/pull/35521


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   This change proposes to change how streaming `dropDuplicates` handles keys in its states. Currently if no watermark column in dedup keys, the states are kept in the operator's state store and won't be removed at all. This change proposes to remove states which are expired on watermark. 
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   Currently streaming `dropDuplicates` simply stores all states forever if no watermark-attached key exists.
   
   For streaming stateful operator, this seems counterintuitive behavior if we consider other stateful operations such as streaming joining.
   
   It also means the number of state will grow up infinitely as we observe in real application. But it doesn't make sense to add a time column into dedup keys because we don't really deduplicate rows by [key, time] but only by [key].
   
   More reasonable streaming dedup seems to remove out-of-watermark states if the input data (not key) has watermark. Note that, unfortunately, this will be a behavior change, for streaming queries with dedup operation.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   Yes. The state value format will be changed for streaming dedup. This is also a behavior change for streaming dedup.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   Added test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039946618


   > How it could happen? IIUC, watermark predicate should be watermark column <= current watermark (max event time seen in last batch?). When no out of order events, isn't a input row's watermark column always > current watermark? (i.e. watermark predicate is false)? Why it will be evicted immediately? Won't it be evicted in next batch?
   
   Yeah my bad I simplified on explanation too much. That is effectively evicted in next batch because we calculate the watermark at the end of micro-batch. How it would be helpful if the state row keeps around a single batch?
   
   We have to think thoughtfully about "when" we can evict the state row safely. Suppose the input rows having all duplications keep coming with timestamp. With event time semantic and set of the watermark gap we can deduplicate a group of input rows, but once the state row is evicted out we will produce a new output and put to state, which makes the output be indeterministic. How long it will deduplicate the events depends on the event time value of the first event.
   
   We still need to try hard to keep the output of batch query and streaming query be same. Letting state grow indefinitely would bring the same output (unless they leverage other columns than specified in dropDuplicate), so we need to start from there, and find a way how to deal with state growing with minimizing on hurting the output, say, tolerable way.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1051552062


   >> Suppose we have an event E2 timestamped as 12:00 as input and there was an event E1 timestamped as 11:50.
   
   > For TTL, it is also possibly that E1 is evicted before when E2 is processed. It depends on TTL value.
   
   No one would set TTL as less than 10 mins as it doesn't effectively deduplicate events. It will be something like several hours (even days - [the example of Flink's blog post took 7 days](https://flink.apache.org/2019/05/19/state-ttl.html)), and it doesn't harm much since state grows based on cardinality of grouping keys for deduplication.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039946618


   > How it could happen? IIUC, watermark predicate should be watermark column <= current watermark (max event time seen in last batch?). When no out of order events, isn't a input row's watermark column always > current watermark? (i.e. watermark predicate is false)? Why it will be evicted immediately? Won't it be evicted in next batch?
   
   Yeah my bad I simplified on explanation too much. That is effectively evicted in next batch because we calculate the watermark at the end of micro-batch. How it would be helpful if the state row keeps around a single batch?
   
   We have to think thoughtfully about "when" we can evict the state row safely. Suppose the input rows having all duplications keep coming with timestamp. With event time semantic and set of the watermark gap we can deduplicate a group of input rows, but once the state row is evicted out we will produce a new output and put to state, which makes the output be indeterministic. How long it will deduplicate the events depends on the event time value of the first event.
   
   We still need to try hard to keep the output of batch query and streaming query be same. Letting state grow indefinitely would bring the same output (unless they leverage other columns than specified in dropDuplicate), so we need to start from there, and find a way how to deal with state growing with minimizing on hurting the output.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1051550441


   No, they are quite different.
   
   Suppose we have just seen event A at event time 12:00. Is the goal of deduplication to remove deduplicated events having event time "before 12:00"? No. The main goal on deduplicate is to deduplicate "future events" (with "older events as well" since we allow late events).
   
   Ideally saying, we have to deduplicate "all" older events and future events, but former requires infinite watermark gap (or unbounded size of state), and latter requires unbounded size on state. While the threshold on former can be defined via watermark gap, the threshold on latter should be defined via TTL, not watermark gap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039922686


   I agree about the problem description, but I'd like to see more thoughtful solution.
   
   Suppose we have perfect watermark with no delay allowance, and there is no event being out of order, then streaming dedup will do nothing on deduplication because effectively it will register the row in the state and evict it immediately. This will happen if you use watermark but use it like the semantic of "processing time".
   
   Personally, for this case, applying TTL against state row would be more sense to me. If we don't want to enforce watermark for the functionality, then we will end up with wall/processing time for TTL which may fall into indeterministic result, but setting TTL as huge interval like 2 hours would be acceptable tolerating such behavior. If we want to enforce watermark to let TTL work (TTL working with event time column), we may be even able to produce deterministic result except late events.
   
   Reference:
   https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/select-distinct/
   https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-state-ttl
   
   Note that Flink only supports processing time (wall time) semantic for state TTL, if I understand correctly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039922686






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1052459861


   Well, that's a view of point. For watermark, it can be interpreted to say, an event is to deduplicate "future events" and "late older events". The event will be evicted based on watermark gap. Before it's evicted, if any newer event with same deduplication keys comes in, it can replace the previous event and continue to deduplicate future events.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039922686


   I agree about the problem description, but I'd like to see more thoughtful solution.
   
   Suppose we have perfect watermark with no delay allowance, and there is no event being out of order, then streaming dedup will do nothing on deduplication because effectively it will register the row in the state and evict it immediately.
   
   Personally, for this case, applying TTL against state row would be more sense to me. If we don't want to enforce watermark for the functionality, then we will end up with wall/processing time for TTL which may fall into indeterministic result, but setting TTL as huge interval like 2 hours would be acceptable tolerating such behavior. If we want to enforce watermark to let TTL work (TTL working with event time column), we may even produce deterministic result except late events.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1043728126


   The semantic of watermark gap is an allowance of "late events". The semantic of TTL here is an allowance of "events in near future". The guaranteeing of both are quite opposite.
   
   Suppose we have an event E2 timestamped as 12:00 as input and there was an event E1 timestamped as 11:50. When E2 is processed, the availability of E1 is totally depending on the advance of watermark. E1 may have evicted before, which leads E2 to be provided as an output. E1 may have retained as well, depending on the watermark gap and the advance of watermark. That is not guarded by the guaranteeing of the watermark.
   
   With TTL & event time semantic, if the TTL is set to 30 mins, it is guaranteed by the semantic of watermark that E1 is available when E2 is processed. The earliest time E1 can be evicted is at 12:20. It is possible that E1 lives more than the TTL (for a specific single batch) and deduplicates more events than we expect (the guaranteeing of watermark is one way as we documented in SS guide doc) - we could make it be strict (via having comparison logic), or leave the loose guaranteeing as it is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1057624236


   My point is, don't explain the behavior in technical aspect for public API but explain the behavior in end-users perspective and be sure it makes sense. If you fail to explain how things will work "consistently" in point of end users' view, it is going to fail.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039933470


   > Suppose we have perfect watermark with no delay allowance, and there is no event being out of order, then streaming dedup will do nothing on deduplication because effectively it will register the row in the state and evict it immediately. This will happen if you use watermark but use it like the semantic of "processing time".
   
   How it could happen? IIUC, watermark predicate should be watermark column <= current watermark (max event time seen in last batch?). When no out of order events, isn't a input row's watermark column always > current watermark? (i.e. watermark predicate is false)? Why it will be evicted immediately? Won't it be evicted in next batch?
   
   TTL may be a solution here. Just watermark seems more commonly used in Structured Streaming operators, do we have any stateful operators with TTL? Or we need to introduce a state TTL mechanism for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039899629






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya edited a comment on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
viirya edited a comment on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039933470


   > Suppose we have perfect watermark with no delay allowance, and there is no event being out of order, then streaming dedup will do nothing on deduplication because effectively it will register the row in the state and evict it immediately. This will happen if you use watermark but use it like the semantic of "processing time".
   
   How it could happen? IIUC, watermark predicate should be watermark column <= current watermark (max event time seen in last batch?). When no out of order events, isn't a input row's watermark column always > current watermark? (i.e. watermark predicate is false)? Why it will be evicted immediately? Won't it be evicted in next batch?
   
   TTL may be a solution here. Just watermark seems more commonly used in Structured Streaming operators, do we have any stateful operators with TTL? Or we need to introduce a state TTL mechanism for this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1055115143


   Close this in favor of TTL approach. Might take some time (if any) to work on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1052837080


   It does guarantee nothing about future events, no? If you don't agree with my point, please elaborate how it "semantically" guarantees the old events and future events, with various watermarks (lagging, perfect).
   
   Please focus how your proposal semantically makes sense in point of user's side. How to implement should be considered after that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1051552062


   >> Suppose we have an event E2 timestamped as 12:00 as input and there was an event E1 timestamped as 11:50.
   
   > For TTL, it is also possibly that E1 is evicted before when E2 is processed. It depends on TTL value.
   
   No one would set TTL as less than 10 mins as it doesn't effectively deduplicate events. It will be something like several hours (even days), and it doesn't harm much since state grows based on cardinality of grouping keys for deduplication.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039922686


   I agree about the problem description, but I'd like to see more thoughtful solution.
   
   Suppose we have perfect watermark with no delay allowance, and there is no event being out of order, then streaming dedup will do nothing on deduplication because effectively it will register the row in the state and evict it immediately. This will happen if you use watermark but use it like the semantic of "processing time".
   
   Personally, for this case, applying TTL against state row would be more sense to me. If we don't want to enforce watermark for the functionality, then we will end up with wall/processing time for TTL which may fall into indeterministic result, but setting TTL as huge interval like 2 hours would be acceptable tolerating such behavior. If we want to enforce watermark to let TTL work (TTL working with event time column), we may even produce deterministic result except late events.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039922686


   I agree about the problem description, but I'd like to see more thoughtful solution.
   
   Suppose we have perfect watermark with no delay allowance, and there is no event being out of order, then streaming dedup will do nothing on deduplication because effectively it will register the row in the state and evict it immediately. This will happen if you use watermark but use it like the semantic of "processing time".
   
   Personally, for this case, applying TTL against state row would be more sense to me. If we don't want to enforce watermark for the functionality, then we will end up with wall/processing time for TTL which may fall into indeterministic result, but setting TTL as huge interval like 2 hours would be acceptable tolerating such behavior. If we want to enforce watermark to let TTL work (TTL working with event time column), we may even produce deterministic result except late events.
   
   Reference:
   https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/select-distinct/
   https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-state-ttl
   
   Note that Flink only supports processing time (wall time) semantic for state TTL, if I understand correctly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039933470


   > Suppose we have perfect watermark with no delay allowance, and there is no event being out of order, then streaming dedup will do nothing on deduplication because effectively it will register the row in the state and evict it immediately. This will happen if you use watermark but use it like the semantic of "processing time".
   
   How it could happen? IIUC, watermark predicate should be watermark column <= current watermark (max event time seen in last batch?). When no out of order events, isn't a input row's watermark column always > current watermark? (i.e. watermark predicate is false)? Why it will be evicted immediately? Won't it be evicted in next batch?
   
   TTL may be a solution here. Just watermark seems more commonly used in Structured Streaming operators, do we have any stateful operators with TTL?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039951780


   > TTL may be a solution here. Just watermark seems more commonly used in Structured Streaming operators, do we have any stateful operators with TTL? Or we need to introduce a state TTL mechanism for this?
   
   This depends on whether we want to bring the functionality globally, or only specific to dropDuplicate.
   
   If we assume only on dropDuplicate, I could roughly sketch the high-level idea (DISCLAIMER: not guaranteed to work). 
   
   Applying TTL with event time could be considered as updating the event time of state row with maximum event time among duplicated rows so far, plus specified TTL. Since we have to make the API be same between batch and streaming, it may need to be a config instead of parameter of the API. I don't like it but producing the same API between batch and streaming is the top level concern.
   
   I'm not pushing aggressively on my idea. I'm definitely open to better idea which semantically makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039922686






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1043728126


   The semantic of watermark gap is an allowance of "late events". The semantic of TTL here is an allowance of "events in near future". The guaranteeing of both are quite opposite.
   
   Suppose we have an event E2 timestamped as 12:00 as input and there was an event E1 timestamped as 11:50. When E2 is processed, the availability of E1 is totally depending on the advance of watermark. E1 may have evicted before, which leads E2 to be provided as an output. That is not guarded by the guaranteeing of the watermark.
   
   With TTL & event time semantic, if the TTL is set to 30 mins, it is guaranteed by the semantic of watermark that E1 is available when E2 is processed. The earliest time E1 can be evicted is at 12:20. It is possible that E1 lives more than the TTL (for a specific single batch) and deduplicates more events than we expect (the guaranteeing of watermark is one way as we documented in SS guide doc) - we could make it be strict (via having comparison logic), or leave the loose guaranteeing as it is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1055096906


   What's "guarantees the old events and future events"? The sentence is meaningless from my point of view. I think I already express clearly my point of view. I agree that TTL makes more sense on deduplication for users so I support the TTL approach. In case if you still want to argue, I'm open to discuss in emails. If not, let's focus on how to move forward and fix the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya closed pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
viirya closed pull request #35521:
URL: https://github.com/apache/spark/pull/35521


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039951780


   > TTL may be a solution here. Just watermark seems more commonly used in Structured Streaming operators, do we have any stateful operators with TTL? Or we need to introduce a state TTL mechanism for this?
   
   This depends on whether we want to bring the functionality globally, or only specific to dropDuplicate.
   
   If we assume only on dropDuplicate, I could roughly sketch the high-level idea (DISCLAIMER: not guaranteed to work). 
   
   Applying TTL with event time could be considered as updating the event time of state row with maximum event time among duplicated rows so far, plus specified TTL. (Yes, this is very similar with session window.) Since we have to make the API be same between batch and streaming, it may need to be a config instead of parameter of the API. I don't like it but producing the same API between batch and streaming is the top level concern.
   
   I'm not pushing aggressively on my idea. I'm definitely open to better idea which semantically makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR edited a comment on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
HeartSaVioR edited a comment on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039951780


   > TTL may be a solution here. Just watermark seems more commonly used in Structured Streaming operators, do we have any stateful operators with TTL? Or we need to introduce a state TTL mechanism for this?
   
   This depends on whether we want to bring the functionality globally, or only specific to dropDuplicate.
   
   If we assume only on dropDuplicate, I could roughly sketch the high-level idea (DISCLAIMER: not guaranteed to work). 
   
   Applying TTL with event time could be considered as updating the event time of state row with maximum event time among duplicated rows so far, plus specified TTL. (Yes, this is very similar with session window, except we don't do merging windows.) Since we have to make the API be same between batch and streaming, it may need to be a config instead of parameter of the API. I don't like it but producing the same API between batch and streaming is the top level concern.
   
   I'm not pushing aggressively on my idea. I'm definitely open to better idea which semantically makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1039899629


   I'm not sure if this can get a consensus, actually, because this will be a behavior change. But the current behavior looks a bit weird to me, and we seen keeping growing states in production application due to this. So would like to propose this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1042730217


   > We have to think thoughtfully about "when" we can evict the state row safely. Suppose the input rows having all duplications keep coming with timestamp. With event time semantic and set of the watermark gap we can deduplicate a group of input rows, but once the state row is evicted out we will produce a new output and put to state, which makes the output be indeterministic. How long it will deduplicate the events depends on the event time value of the first event.
   
   This is a good point. Even with watermark, cannot we update the event time in state store with maximum event time among duplicated rows so far too? The difference is which one is used to decide when to evict the state row, TTL or watermark.
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #35521: [SPARK-38212][SS] Remove out-of-watermark states for streaming dedup

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #35521:
URL: https://github.com/apache/spark/pull/35521#issuecomment-1051416556


   Isn't the difference here is what it depends on for evicting behavior? One is watermark gap and one is TTL. For TTL, it is also possibly that E1 is evicted before when E2 is processed. It depends on TTL value. For watermark gap case, it depends on watermark gap value.
   
   But I agree that TTL is much easier to infer for deduplication. I might need to take some time to revise this to TTL based approach.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org