You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/03/04 01:55:18 UTC

[GitHub] [arrow-datafusion] viirya opened a new pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

viirya opened a new pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921


   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #1920.
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r820040751



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       By introducing tracking consumers, and a `mem_used()` method for all consumers, we need each memory-consuming operator implementation to acquire memory and release it eagerly when it's not used. 
   For example, the external sorter releases its requester's memory to zero and transfer that to a tracking consumer. 
   
   >How do we know when other consumers will release? If no consumers release?
   
   Each task is processing a finite partition of data. If no consumers release and cause a stuck, I prefer to treat it as a potential bug, and we should fix it instead of letting it slip through our fingers. We are not dealing with a black box of operators like UDFs, but all controlled physical operators in the engine. 
   
   Spilling others in the same task won't make the case easier. It has limited scope and will also cause chained window operators in Spark task to deteriorate performance badly, resulting in more spills and even crash or triggering another speculative task.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r820040751



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       By introducing tracking consumers, and a `mem_used()` method for all consumers, we need each memory-consuming operator implementation to acquire memory and release it eagerly when it's not used. 
   For example, the external sorter releases its requester's memory to zero and transfer that to a tracking consumer. 
   
   >How do we know when other consumers will release? If no consumers release?
   
   Each task is processing a finite partition of data. If no consumers release and cause a stuck, I prefer to treat it as a potential bug, and we should fix it instead of letting it slip through our fingers. We are not dealing with a black box of operators like UDFs, but all controlled physical operators in the engine. 
   
   Spilling others in the same task won't make the case easier. It has limited scope and will also cause chained window operators in Spark task to deteriorate performance badly, resulting in more spills and even crash or another speculative task.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] viirya commented on pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#issuecomment-1060010051


   Thanks @yjshen. Sounds good to me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] viirya commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r819795845



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       Oh, it's interesting. I'm surprised that you can find some related code quickly, so are you familiar with Spark codebase and also maybe work on it too? 😄 
   
   Spark codebase is quite complicated nowadays. Actually I've not read the code you link to. What in my mind when I looked at `can_grow_directly`, is where one memory consumer acquires execution memory ([code](https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L138)). If the consumer cannot get as much as it needs at first, the manager will actively ask other consumers (in the same task) to do spilling. No waiting here. Actually this is better mode in my mind for the consumer model. I actually plan to make similar changes to this part of datafusion.
   
   For the code you quoted, I just quickly read it. It seems to be for different purpose. It is used to manage memory pools on executors for tasks. So it is reasonable that once a task asks for more memory from a memory pool. If there is no enough memory in the pool and it's in 1/2n (n is number of tasks, not consumers here) situation, the task will be blocked infinitely until other tasks free up memory.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r820028000



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       And yes, the very initial memory managing was modeled after Spark. The [initial design doc]( https://docs.google.com/document/d/1BT5HH-2sKq-Jxo51PNE6l9NNd_F-FyyYcyC3SKTnkIA/edit#heading=h.ims7dd49jei1) which is quite similar to Spark's with a prototyping https://github.com/yjshen/arrow-datafusion/pull/3. But we got more chances to analyze the OOM and tuning pains in Spark, and head to the current approach gradually




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] viirya commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r819360219



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       I don't hit something like infinite wait. But as I'm looking at the code, it's a bit strange to wait forever here.
   
   How long it waits for is unpredictable and seems it could be possibly a long time. It seems more reasonable to have a timeout to prevent it? Even most of time we won't reach the timeout.
   
   Yea, current value is picked arbitrarily. Just to know the idea. I will make it configurable and use a longer default value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] viirya commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r819206633



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       Should we make it configurable from `MemoryManagerConfig`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] viirya commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r820035491



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       BTW, I must say that I'm not expert in DataFusion codebase so I'm sorry if misunderstand some points about DataFusion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] viirya commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r820135554



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       > Each task is processing a finite partition of data. If no consumers release and cause a stuck, I prefer to treat it as a potential bug, and we should fix it instead of letting it slip through our fingers. We are not dealing with a black box of operators like UDFs, but all controlled physical operators in the engine.
   
   I must say that you don't convince me (because basically you're unable to answer the question) as I don't think the dynamic during consumers interaction is understood here and should be treated like that. It looks like an issue in the model, not a potential bug in some consumers. As seems I cannot also convince you at the point so the discussion will be endless. I don't prepare to continue on it. So I'd close this, leave this as is, and thanks for above discussion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r819304060



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       Is this wait causing infinite wait in your query? If so, is it a possible leak somewhere else that exists, and we should fix it?
   
   The primary reason it's waiting forever here for a notify signal is to avoid repeated self-spilling with little memory share, producing many spill files and degrading performance. 
   
   If a consumer cannot get at least 1/2n of memory among the total, perhaps blocking the thread and yielding computational resources to others is better? Then the huge consumers can progress faster. Either they trigger self-spilling or finish their jobs. 
   
   Working with a minimum memory is not ideal because it will harm the overall query processing throughput. Even if the repeated spilling consumer gets enough memory later, it will need a lot of effort reading spills and merging partial results. 
   
   We are working with an assumption of a batch engine currently. May one day, we can have a customized task scheduler, self-tuning based on CPU memory usages, and even with an adaptive partition size tuning capability.
   
   Nevertheless, if we must add a timeout, please also print a warning log, as this could be a symptom of a potential bug. And yes, I think to make it a configurable wait is necessary. And please choose a longer timeout as default if possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r819384921



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       It seems Spark is still taking [infinite wait](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala#L140) in the 1/2n situation. Do you think we will have some new cases to deal with or limitations in the current design? I'm not implying we should strictly follow Spark's way, since the model is different (such as we forbid triggering others to spill, and we tries to share memory similarly among all consumers), but since you are a Spark committer, I might be asking the right person ☺️




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] viirya commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r820035032



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       > And if we block a consumer belongs to a task, we are blocking the task for one partition as well, no?
   
   You don't get my point above. There are two difference situations, one is between tasks and one is between consumers in same task. In Spark codebase, a task could be blocked when it cannot requires necessary memory and it will wait infinitely for other tasks to release memory (i.e., the code you quoted). This makes sense. One task could be blocked due to insufficient memory and it just waits for other tasks to finish and release memory. It also doesn't make sense to ask other tasks spilling for the task.
   
   For the consumers in same task, Spark doesn't do blocking for a consumer which cannot require its memory but actively asks other consumers (and also the consumer itself but at the last one) to do spilling and release memory if possible.
   
   The point is not whether block the consumer but when it will be unblocked. The code here seems to block a consumer and passively wait infinitely for other consumers. How do we know when other consumers will release? If no consumers release? Then the task is just stuck there. All consumers in same task have the same goal: finish the task. They should help others finish their jobs, by spilling and releasing memory when possible if other consumers need.  Maybe there are some concerns I don't get so far, but that's why I think the current model looks strange to me.
   
   I'm not arguing about the tracking consumers and requesting consumers. This looks clever to me too. So your reply above I have no question about it. But seems it's not related to the issue I'm talking in this PR.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
yjshen commented on pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#issuecomment-1059932325


   Hi @viirya, sorry that my comments may have caused some misunderstandings. There is a possibility that the pointed out infinity wait may happen under some certain circumstances. And thanks for offering the fix again. I suggest we keep the infinity hanging here as a red light and get a immediate failure once we trigger it. And we could find the root cause earlier, maybe at its first appearance. We may even need a redesign if no quick fix possible. Thanks again for your efforts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] yjshen commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
yjshen commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r820027341



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       And if we block a consumer belongs to a task, we are blocking the task for one partition as well, no? 
   
   > Actually this is better mode in my mind for the consumer model. I actually plan to make similar changes to this part of datafusion.
   
   Spilling other consumers of the same task in Spark is not well defined and also not supported by all consumers (you can search `trigger != this` to check). That's why I think it brings complexity over the gains, and we gradually clearer the current approach: having two kinds of consumers, tracking consumers and requesting consumers, to clarity memory usage pattern. Besides, Spark also suffers in memory controlling that users always need to tune its memoryFraction by reserving more memory for the unamed ones. 
   
   Given external sort as an example, it's a requesting consumer as it's accumulating incoming records (and this is where n from 1/2n from, we only count for requesting consumers while spilling), and when it finished sort and start to output records, it release all it's memory and surrender the memory to a tracking consumer. That's why I am asking if a inifite waiting is witnessed at the first place. Since the case should be rare and a big chance to be a bug if so.
   
   Please refer to https://github.com/apache/arrow-datafusion/issues/587, discussions in https://github.com/apache/arrow-datafusion/pull/1526 and https://github.com/apache/arrow-datafusion/pull/1691 for more background if you like. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] viirya closed pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
viirya closed pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] viirya commented on a change in pull request #1921: Add timeout to can_grow_directly when waiting for the Condvar.

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #1921:
URL: https://github.com/apache/arrow-datafusion/pull/1921#discussion_r820135554



##########
File path: datafusion/src/execution/memory_manager.rs
##########
@@ -340,7 +341,13 @@ impl MemoryManager {
             } else if current < min_per_rqt {
                 // if we cannot acquire at lease 1/2n memory, just wait for others
                 // to spill instead spill self frequently with limited total mem
-                self.cv.wait(&mut rqt_current_used);
+                let timeout = self
+                    .cv
+                    .wait_for(&mut rqt_current_used, Duration::from_secs(5));

Review comment:
       > Each task is processing a finite partition of data. If no consumers release and cause a stuck, I prefer to treat it as a potential bug, and we should fix it instead of letting it slip through our fingers. We are not dealing with a black box of operators like UDFs, but all controlled physical operators in the engine.
   
   I must say that you don't convince me as I don't think the dynamic during consumers interaction is understood here and should be treated like that. It looks like an issue in the model, not a potential bug in some consumers. As seems I cannot also convince you at the point so the discussion will be endless. I don't prepare to continue on it. So I'd close this, leave this as is, and thanks for above discussion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org