You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/01 20:31:33 UTC

[GitHub] [beam] lukecwik opened a new pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

lukecwik opened a new pull request #11590:
URL: https://github.com/apache/beam/pull/11590


   Existing performance suffered because of the use of timed waits and also due to the increase in number of "threading" objects being invoked.
   
   Using the benchmark from https://issues.apache.org/jira/browse/BEAM-8944?focusedCommentId=17074641&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17074641
   
   Improved performance from 5.52s down to 1.82s which is faster then the ThreadPoolExecutor with 12 threads (the default being used before) but still slower then the ThreadPoolExecutor with 1 thread.
   
   The prior performance was:
   ```
   <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 0x7feee655c8d0> uses 5.52247905731
            584051 function calls in 5.495 seconds
   
      Ordered by: internal time
   
      ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       86250    4.386    0.000    4.386    0.000 {method 'acquire' of 'thread.lock' objects}
       19800    0.150    0.000    4.773    0.000 Queue.py:150(get)
       29698    0.123    0.000    0.365    0.000 threading.py:373(notify)
        9900    0.114    0.000    0.114    0.000 {method '__enter__' of 'thread.lock' objects}
        9899    0.106    0.000    0.463    0.000 thread_pool_executor.py:103(accepted_work)
        9188    0.089    0.000    4.095    0.000 threading.py:309(wait)
        9903    0.078    0.000    0.129    0.000 threading.py:260(__init__)
        9900    0.061    0.000    1.092    0.000 thread_pool_executor.py:133(submit)
        9899    0.055    0.000    0.358    0.000 threading.py:576(set)
       38886    0.044    0.000    0.285    0.000 threading.py:300(_is_owned)
        9900    0.041    0.000    0.186    0.000 _base.py:318(__init__)
       28987    0.023    0.000    0.031    0.000 Queue.py:200(_qsize)
        9900    0.022    0.000    0.030    0.000 threading.py:132(__init__)
       38887    0.021    0.000    0.021    0.000 threading.py:64(_note)
        9899    0.019    0.000    0.163    0.000 threading.py:400(notifyAll)
       19799    0.018    0.000    0.023    0.000 Queue.py:208(_get)
       38887    0.016    0.000    0.016    0.000 {method 'release' of 'thread.lock' objects}
        9903    0.016    0.000    0.145    0.000 threading.py:242(Condition)
       19806    0.014    0.000    0.014    0.000 threading.py:59(__init__)
        9900    0.014    0.000    0.127    0.000 threading.py:285(__enter__)
        9188    0.013    0.000    0.090    0.000 threading.py:297(_acquire_restore)
        9900    0.013    0.000    0.043    0.000 threading.py:114(RLock)
        9900    0.011    0.000    0.011    0.000 thread_pool_executor.py:34(__init__)
       38886    0.011    0.000    0.011    0.000 {len}
        9900    0.010    0.000    0.012    0.000 threading.py:288(__exit__)
        9188    0.008    0.000    0.012    0.000 threading.py:294(_release_save)
       19092    0.006    0.000    0.006    0.000 {thread.allocate_lock}
       19799    0.005    0.000    0.005    0.000 {method 'popleft' of 'collections.deque' objects}
        9188    0.004    0.000    0.004    0.000 {method 'append' of 'list' objects}
        9899    0.003    0.000    0.003    0.000 {method 'remove' of 'list' objects}
        9900    0.002    0.000    0.002    0.000 {method '__exit__' of 'thread.lock' objects}
           1    0.000    0.000    0.000    0.000 {thread.start_new_thread}
           1    0.000    0.000    0.000    0.000 threading.py:647(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:717(start)
           1    0.000    0.000    0.000    0.000 thread_pool_executor.py:58(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:620(_newname)
           1    0.000    0.000    0.000    0.000 _weakrefset.py:83(add)
           1    0.000    0.000    0.000    0.000 threading.py:597(wait)
           2    0.000    0.000    0.000    0.000 threading.py:561(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:1142(currentThread)
           2    0.000    0.000    0.000    0.000 threading.py:542(Event)
           1    0.000    0.000    0.000    0.000 threading.py:700(_set_daemon)
           1    0.000    0.000    0.000    0.000 threading.py:1014(daemon)
           2    0.000    0.000    0.000    0.000 threading.py:570(isSet)
           1    0.000    0.000    0.000    0.000 threading.py:999(daemon)
           1    0.000    0.000    0.000    0.000 {thread.get_ident}
           1    0.000    0.000    0.000    0.000 {method 'add' of 'set' objects}
           1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
   ```
   
   The new performance is:
   ```
   <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 0x7f71dd5cacd0> uses 1.82196497917
            504935 function calls in 1.787 seconds
   
      Ordered by: internal time
   
      ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       62233    0.940    0.000    0.940    0.000 {method 'acquire' of 'thread.lock' objects}
        9900    0.142    0.000    1.248    0.000 thread_pool_executor.py:98(submit)
       29698    0.102    0.000    0.364    0.000 threading.py:373(notify)
       19800    0.101    0.000    0.880    0.000 Queue.py:150(get)
        9900    0.068    0.000    0.068    0.000 {method '__enter__' of 'thread.lock' objects}
       40304    0.063    0.000    0.063    0.000 {method 'release' of 'thread.lock' objects}
        9899    0.053    0.000    0.323    0.000 threading.py:479(release)
        9899    0.044    0.000    0.299    0.000 Queue.py:86(qsize)
        9903    0.043    0.000    0.082    0.000 threading.py:260(__init__)
       30407    0.031    0.000    0.221    0.000 threading.py:300(_is_owned)
        9900    0.027    0.000    0.119    0.000 _base.py:318(__init__)
       30407    0.022    0.000    0.027    0.000 Queue.py:200(_qsize)
        9900    0.017    0.000    0.025    0.000 threading.py:132(__init__)
       19799    0.016    0.000    0.019    0.000 Queue.py:208(_get)
        9899    0.015    0.000    0.338    0.000 thread_pool_executor.py:77(assign_work)
       40307    0.015    0.000    0.015    0.000 threading.py:64(_note)
        9900    0.015    0.000    0.019    0.000 threading.py:288(__exit__)
        9903    0.011    0.000    0.093    0.000 threading.py:242(Condition)
       19806    0.010    0.000    0.010    0.000 threading.py:59(__init__)
        9900    0.009    0.000    0.034    0.000 threading.py:114(RLock)
        9900    0.009    0.000    0.077    0.000 threading.py:285(__enter__)
         709    0.007    0.000    0.148    0.000 threading.py:309(wait)
        9900    0.007    0.000    0.007    0.000 thread_pool_executor.py:34(__init__)
       30407    0.005    0.000    0.005    0.000 {len}
        9900    0.004    0.000    0.004    0.000 {method '__exit__' of 'thread.lock' objects}
       10613    0.003    0.000    0.003    0.000 {thread.allocate_lock}
       19799    0.003    0.000    0.003    0.000 {method 'popleft' of 'collections.deque' objects}
        9896    0.002    0.000    0.002    0.000 {method 'remove' of 'list' objects}
         709    0.001    0.000    0.023    0.000 threading.py:297(_acquire_restore)
         709    0.001    0.000    0.002    0.000 threading.py:294(_release_save)
         709    0.000    0.000    0.000    0.000 {method 'append' of 'list' objects}
           1    0.000    0.000    0.000    0.000 {thread.start_new_thread}
           1    0.000    0.000    0.000    0.000 threading.py:717(start)
           1    0.000    0.000    0.000    0.000 threading.py:647(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:620(_newname)
           1    0.000    0.000    0.000    0.000 thread_pool_executor.py:58(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:1142(currentThread)
           1    0.000    0.000    0.000    0.000 _weakrefset.py:83(add)
           1    0.000    0.000    0.000    0.000 threading.py:597(wait)
           1    0.000    0.000    0.000    0.000 threading.py:433(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:700(_set_daemon)
           1    0.000    0.000    0.000    0.000 threading.py:542(Event)
           1    0.000    0.000    0.000    0.000 threading.py:561(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:1014(daemon)
           1    0.000    0.000    0.000    0.000 threading.py:412(Semaphore)
           2    0.000    0.000    0.000    0.000 threading.py:570(isSet)
           1    0.000    0.000    0.000    0.000 threading.py:999(daemon)
           1    0.000    0.000    0.000    0.000 {thread.get_ident}
           1    0.000    0.000    0.000    0.000 {method 'add' of 'set' objects}
           1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
   ```
   
   The max_workers=1 ThreadPoolExecutor is:
   ```
   <concurrent.futures.thread.ThreadPoolExecutor object at 0x7f71dd5e7710> uses 1.03040599823
            436503 function calls in 1.011 seconds
   
      Ordered by: internal time
   
      ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       59160    0.462    0.000    0.462    0.000 {method 'acquire' of 'thread.lock' objects}
        9900    0.114    0.000    0.114    0.000 {method '__enter__' of 'thread.lock' objects}
        9900    0.072    0.000    0.537    0.000 thread.py:128(submit)
        9900    0.045    0.000    0.474    0.000 Queue.py:150(get)
        9900    0.034    0.000    0.167    0.000 threading.py:440(acquire)
        9900    0.034    0.000    0.061    0.000 threading.py:260(__init__)
       19800    0.033    0.000    0.112    0.000 threading.py:373(notify)
        9900    0.031    0.000    0.192    0.000 Queue.py:107(put)
        4890    0.024    0.000    0.269    0.000 threading.py:309(wait)
        9900    0.022    0.000    0.092    0.000 _base.py:318(__init__)
       25561    0.017    0.000    0.017    0.000 {method 'release' of 'thread.lock' objects}
       24690    0.016    0.000    0.091    0.000 threading.py:300(_is_owned)
        9900    0.011    0.000    0.016    0.000 threading.py:132(__init__)
       34590    0.010    0.000    0.010    0.000 threading.py:64(_note)
        9900    0.009    0.000    0.176    0.000 thread.py:141(_adjust_thread_count)
        9900    0.009    0.000    0.070    0.000 threading.py:242(Condition)
        9900    0.007    0.000    0.023    0.000 threading.py:114(RLock)
       14790    0.007    0.000    0.009    0.000 Queue.py:200(_qsize)
       19800    0.007    0.000    0.007    0.000 threading.py:59(__init__)
        9900    0.007    0.000    0.008    0.000 Queue.py:208(_get)
        9900    0.006    0.000    0.010    0.000 threading.py:288(__exit__)
        9900    0.006    0.000    0.007    0.000 Queue.py:204(_put)
        9900    0.006    0.000    0.119    0.000 threading.py:285(__enter__)
        9900    0.006    0.000    0.006    0.000 thread.py:52(__init__)
        9900    0.003    0.000    0.003    0.000 {method '__exit__' of 'thread.lock' objects}
       14790    0.003    0.000    0.003    0.000 {thread.allocate_lock}
        4890    0.003    0.000    0.020    0.000 threading.py:297(_acquire_restore)
        4890    0.003    0.000    0.008    0.000 threading.py:294(_release_save)
       14790    0.002    0.000    0.002    0.000 {len}
        9900    0.001    0.000    0.001    0.000 {method 'popleft' of 'collections.deque' objects}
        9900    0.001    0.000    0.001    0.000 {method 'append' of 'collections.deque' objects}
        4890    0.001    0.000    0.001    0.000 {method 'append' of 'list' objects}
         871    0.000    0.000    0.000    0.000 {method 'remove' of 'list' objects}
           1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
   ```
   
   The max_workers=12 ThreadPoolExecutor is:
   ```
   <concurrent.futures.thread.ThreadPoolExecutor object at 0x7f71dd575d90> uses 2.03954315186
            402533 function calls in 2.002 seconds
   
      Ordered by: internal time
   
      ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       44284    0.878    0.000    0.878    0.000 {method 'acquire' of 'thread.lock' objects}
        9902    0.228    0.000    0.228    0.000 {method '__enter__' of 'thread.lock' objects}
        9900    0.207    0.000    1.342    0.000 thread.py:128(submit)
        9900    0.084    0.000    0.660    0.000 Queue.py:150(get)
       19800    0.084    0.000    0.269    0.000 threading.py:373(notify)
        9900    0.080    0.000    0.351    0.000 threading.py:440(acquire)
       30862    0.069    0.000    0.069    0.000 {method 'release' of 'thread.lock' objects}
        9900    0.054    0.000    0.594    0.000 Queue.py:107(put)
        9904    0.054    0.000    0.102    0.000 threading.py:260(__init__)
        9900    0.032    0.000    0.150    0.000 _base.py:318(__init__)
       20971    0.027    0.000    0.144    0.000 threading.py:300(_is_owned)
        9900    0.022    0.000    0.383    0.000 thread.py:141(_adjust_thread_count)
        9900    0.018    0.000    0.028    0.000 threading.py:132(__init__)
       30871    0.017    0.000    0.017    0.000 threading.py:64(_note)
        9904    0.016    0.000    0.118    0.000 threading.py:242(Condition)
        9902    0.015    0.000    0.244    0.000 threading.py:285(__enter__)
        9902    0.014    0.000    0.020    0.000 threading.py:288(__exit__)
       19808    0.013    0.000    0.013    0.000 threading.py:59(__init__)
       11069    0.013    0.000    0.016    0.000 Queue.py:200(_qsize)
        9900    0.013    0.000    0.041    0.000 threading.py:114(RLock)
        1171    0.011    0.000    0.234    0.000 threading.py:309(wait)
        9900    0.011    0.000    0.012    0.000 Queue.py:204(_put)
        9900    0.011    0.000    0.012    0.000 Queue.py:208(_get)
        9900    0.008    0.000    0.008    0.000 thread.py:52(__init__)
        9902    0.007    0.000    0.007    0.000 {method '__exit__' of 'thread.lock' objects}
       11075    0.005    0.000    0.005    0.000 {thread.allocate_lock}
       11071    0.003    0.000    0.003    0.000 {len}
        9891    0.002    0.000    0.002    0.000 {method 'remove' of 'list' objects}
        9900    0.002    0.000    0.002    0.000 {method 'popleft' of 'collections.deque' objects}
        9900    0.002    0.000    0.002    0.000 {method 'append' of 'collections.deque' objects}
        1171    0.001    0.000    0.041    0.000 threading.py:297(_acquire_restore)
        1171    0.001    0.000    0.004    0.000 threading.py:294(_release_save)
        1171    0.000    0.000    0.000    0.000 {method 'append' of 'list' objects}
           2    0.000    0.000    0.000    0.000 {thread.start_new_thread}
           2    0.000    0.000    0.000    0.000 threading.py:647(__init__)
           2    0.000    0.000    0.010    0.005 threading.py:717(start)
           2    0.000    0.000    0.010    0.005 threading.py:597(wait)
           2    0.000    0.000    0.000    0.000 threading.py:700(_set_daemon)
           2    0.000    0.000    0.000    0.000 weakref.py:368(__setitem__)
           2    0.000    0.000    0.000    0.000 threading.py:542(Event)
           2    0.000    0.000    0.000    0.000 threading.py:1142(currentThread)
           2    0.000    0.000    0.000    0.000 threading.py:561(__init__)
           2    0.000    0.000    0.000    0.000 threading.py:1014(daemon)
           2    0.000    0.000    0.000    0.000 {method 'add' of 'set' objects}
           2    0.000    0.000    0.000    0.000 threading.py:999(daemon)
           4    0.000    0.000    0.000    0.000 threading.py:570(isSet)
           2    0.000    0.000    0.000    0.000 {thread.get_ident}
           1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
   ```
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-625085646


   The load test can't work properly yet here because it depends on additional code changes in #11558. So good to go.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624742582


   Run Python Load Tests ParDo Flink Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11590:
URL: https://github.com/apache/beam/pull/11590#discussion_r420983316



##########
File path: sdks/python/apache_beam/utils/thread_pool_executor.py
##########
@@ -137,35 +101,33 @@ def submit(self, fn, *args, **kwargs):
     """
     future = _base.Future()
     work_item = _WorkItem(future, fn, args, kwargs)
-    try:
-      # Keep trying to get an idle worker from the queue until we find one
-      # that accepts the work.
-      while not self._idle_worker_queue.get(
-          block=False).accepted_work(work_item):
-        pass
-      return future
-    except queue.Empty:
-      with self._lock:
-        if self._shutdown:
-          raise RuntimeError(
-              'Cannot schedule new tasks after thread pool '
-              'has been shutdown.')
-
-        worker = _Worker(
-            self._idle_worker_queue,
-            self._permitted_thread_age_in_seconds,
-            work_item)
+    with self._lock:
+      if self._shutdown:
+        raise RuntimeError(
+            'Cannot schedule new tasks after thread pool has been shutdown.')
+      try:
+        self._idle_worker_queue.get(block=False).assign_work(work_item)
+
+        # If we have more idle threads then the max allowed, shutdown a thread.
+        if self._idle_worker_queue.qsize() > self._max_idle_threads:
+          try:
+            self._idle_worker_queue.get(block=False).shutdown()

Review comment:
       sounds good then!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11590:
URL: https://github.com/apache/beam/pull/11590#discussion_r420894920



##########
File path: sdks/python/apache_beam/utils/thread_pool_executor.py
##########
@@ -137,35 +101,33 @@ def submit(self, fn, *args, **kwargs):
     """
     future = _base.Future()
     work_item = _WorkItem(future, fn, args, kwargs)
-    try:
-      # Keep trying to get an idle worker from the queue until we find one
-      # that accepts the work.
-      while not self._idle_worker_queue.get(
-          block=False).accepted_work(work_item):
-        pass
-      return future
-    except queue.Empty:
-      with self._lock:
-        if self._shutdown:
-          raise RuntimeError(
-              'Cannot schedule new tasks after thread pool '
-              'has been shutdown.')
-
-        worker = _Worker(
-            self._idle_worker_queue,
-            self._permitted_thread_age_in_seconds,
-            work_item)
+    with self._lock:
+      if self._shutdown:
+        raise RuntimeError(
+            'Cannot schedule new tasks after thread pool has been shutdown.')
+      try:
+        self._idle_worker_queue.get(block=False).assign_work(work_item)
+
+        # If we have more idle threads then the max allowed, shutdown a thread.
+        if self._idle_worker_queue.qsize() > self._max_idle_threads:
+          try:
+            self._idle_worker_queue.get(block=False).shutdown()

Review comment:
       I was thinking removing one at a time to reduce the rate at which we kill threads would "average" out the thread creation/death rate better but I have no data to support this hunch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on a change in pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm commented on a change in pull request #11590:
URL: https://github.com/apache/beam/pull/11590#discussion_r420926934



##########
File path: sdks/python/apache_beam/utils/thread_pool_executor.py
##########
@@ -137,35 +101,33 @@ def submit(self, fn, *args, **kwargs):
     """
     future = _base.Future()
     work_item = _WorkItem(future, fn, args, kwargs)
-    try:
-      # Keep trying to get an idle worker from the queue until we find one
-      # that accepts the work.
-      while not self._idle_worker_queue.get(
-          block=False).accepted_work(work_item):
-        pass
-      return future
-    except queue.Empty:
-      with self._lock:
-        if self._shutdown:
-          raise RuntimeError(
-              'Cannot schedule new tasks after thread pool '
-              'has been shutdown.')
-
-        worker = _Worker(
-            self._idle_worker_queue,
-            self._permitted_thread_age_in_seconds,
-            work_item)
+    with self._lock:
+      if self._shutdown:
+        raise RuntimeError(
+            'Cannot schedule new tasks after thread pool has been shutdown.')
+      try:
+        self._idle_worker_queue.get(block=False).assign_work(work_item)
+
+        # If we have more idle threads then the max allowed, shutdown a thread.
+        if self._idle_worker_queue.qsize() > self._max_idle_threads:
+          try:
+            self._idle_worker_queue.get(block=False).shutdown()

Review comment:
       Removing one thread at a time seems sensible to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] udim commented on a change in pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
udim commented on a change in pull request #11590:
URL: https://github.com/apache/beam/pull/11590#discussion_r424774779



##########
File path: sdks/python/apache_beam/utils/thread_pool_executor.py
##########
@@ -137,35 +101,33 @@ def submit(self, fn, *args, **kwargs):
     """
     future = _base.Future()
     work_item = _WorkItem(future, fn, args, kwargs)
-    try:
-      # Keep trying to get an idle worker from the queue until we find one
-      # that accepts the work.
-      while not self._idle_worker_queue.get(
-          block=False).accepted_work(work_item):
-        pass
-      return future
-    except queue.Empty:
-      with self._lock:
-        if self._shutdown:
-          raise RuntimeError(
-              'Cannot schedule new tasks after thread pool '
-              'has been shutdown.')
-
-        worker = _Worker(
-            self._idle_worker_queue,
-            self._permitted_thread_age_in_seconds,
-            work_item)
+    with self._lock:
+      if self._shutdown:
+        raise RuntimeError(
+            'Cannot schedule new tasks after thread pool has been shutdown.')
+      try:
+        self._idle_worker_queue.get(block=False).assign_work(work_item)
+
+        # If we have more idle threads then the max allowed, shutdown a thread.
+        if self._idle_worker_queue.qsize() > self._max_idle_threads:
+          try:
+            self._idle_worker_queue.get(block=False).shutdown()

Review comment:
       Sorry, I missed that it was a WeakSet. Multiple shutdowns look ok as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624567650


   Run Python Load Tests ParDo Flink Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-625087599


   Jira is down for maintenance at the moment. Feel free to resolve: https://issues.apache.org/jira/browse/BEAM-8944


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624055135


   Run Python Load Tests ParDo Flink Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-622555561


   R: @mxm @pabloem 
   CC: @robertwb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #11590:
URL: https://github.com/apache/beam/pull/11590#discussion_r420468171



##########
File path: sdks/python/apache_beam/utils/thread_pool_executor.py
##########
@@ -137,35 +101,33 @@ def submit(self, fn, *args, **kwargs):
     """
     future = _base.Future()
     work_item = _WorkItem(future, fn, args, kwargs)
-    try:
-      # Keep trying to get an idle worker from the queue until we find one
-      # that accepts the work.
-      while not self._idle_worker_queue.get(
-          block=False).accepted_work(work_item):
-        pass
-      return future
-    except queue.Empty:
-      with self._lock:
-        if self._shutdown:
-          raise RuntimeError(
-              'Cannot schedule new tasks after thread pool '
-              'has been shutdown.')
-
-        worker = _Worker(
-            self._idle_worker_queue,
-            self._permitted_thread_age_in_seconds,
-            work_item)
+    with self._lock:
+      if self._shutdown:
+        raise RuntimeError(
+            'Cannot schedule new tasks after thread pool has been shutdown.')
+      try:
+        self._idle_worker_queue.get(block=False).assign_work(work_item)
+
+        # If we have more idle threads then the max allowed, shutdown a thread.
+        if self._idle_worker_queue.qsize() > self._max_idle_threads:
+          try:
+            self._idle_worker_queue.get(block=False).shutdown()

Review comment:
       should we remove a total of `self._max_idle_threads - self._idle_worker_queue.qsize()` workers rather than just one?
   IIUC, this is the only point (besides shutdown) where workers are removed, so maybe yes?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11590:
URL: https://github.com/apache/beam/pull/11590#discussion_r424755373



##########
File path: sdks/python/apache_beam/utils/thread_pool_executor.py
##########
@@ -137,35 +101,33 @@ def submit(self, fn, *args, **kwargs):
     """
     future = _base.Future()
     work_item = _WorkItem(future, fn, args, kwargs)
-    try:
-      # Keep trying to get an idle worker from the queue until we find one
-      # that accepts the work.
-      while not self._idle_worker_queue.get(
-          block=False).accepted_work(work_item):
-        pass
-      return future
-    except queue.Empty:
-      with self._lock:
-        if self._shutdown:
-          raise RuntimeError(
-              'Cannot schedule new tasks after thread pool '
-              'has been shutdown.')
-
-        worker = _Worker(
-            self._idle_worker_queue,
-            self._permitted_thread_age_in_seconds,
-            work_item)
+    with self._lock:
+      if self._shutdown:
+        raise RuntimeError(
+            'Cannot schedule new tasks after thread pool has been shutdown.')
+      try:
+        self._idle_worker_queue.get(block=False).assign_work(work_item)
+
+        # If we have more idle threads then the max allowed, shutdown a thread.
+        if self._idle_worker_queue.qsize() > self._max_idle_threads:
+          try:
+            self._idle_worker_queue.get(block=False).shutdown()

Review comment:
       `self._workers` is a weakref set so when the thread dies it should automatically be removed from the set. Calling `shutdown` multiple times should be a non-issue as well for each worker as all it does is make the worker eligible to wake up.
   
   Do you have any more details as to why you think this is an issue?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624725658


   Run Seed Job


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624767259


   Run Python Load Tests ParDo Flink Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-623378063


   Thanks @lukecwik! This look very promising. I'm going to test this out with one of our applications which experienced problems with the dynamic thread allocation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] udim commented on a change in pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
udim commented on a change in pull request #11590:
URL: https://github.com/apache/beam/pull/11590#discussion_r424735741



##########
File path: sdks/python/apache_beam/utils/thread_pool_executor.py
##########
@@ -137,35 +101,33 @@ def submit(self, fn, *args, **kwargs):
     """
     future = _base.Future()
     work_item = _WorkItem(future, fn, args, kwargs)
-    try:
-      # Keep trying to get an idle worker from the queue until we find one
-      # that accepts the work.
-      while not self._idle_worker_queue.get(
-          block=False).accepted_work(work_item):
-        pass
-      return future
-    except queue.Empty:
-      with self._lock:
-        if self._shutdown:
-          raise RuntimeError(
-              'Cannot schedule new tasks after thread pool '
-              'has been shutdown.')
-
-        worker = _Worker(
-            self._idle_worker_queue,
-            self._permitted_thread_age_in_seconds,
-            work_item)
+    with self._lock:
+      if self._shutdown:
+        raise RuntimeError(
+            'Cannot schedule new tasks after thread pool has been shutdown.')
+      try:
+        self._idle_worker_queue.get(block=False).assign_work(work_item)
+
+        # If we have more idle threads then the max allowed, shutdown a thread.
+        if self._idle_worker_queue.qsize() > self._max_idle_threads:
+          try:
+            self._idle_worker_queue.get(block=False).shutdown()

Review comment:
       Also remove the shutdown worker from `self._workers`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm removed a comment on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm removed a comment on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624567650


   Run Python Load Tests ParDo Flink Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624560179


   Run Python Load Tests ParDo Flink Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624798501


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm edited a comment on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm edited a comment on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624725658


   Run Seed Job
   
   edit: Whoops wrong PR, I meant to run it on #11558 to test this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm removed a comment on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm removed a comment on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624587849


   Run Python Load Tests ParDo Flink Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
lukecwik commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624329537


   Run Python Load Tests ParDo Flink Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624587849


   Run Python Load Tests ParDo Flink Streaming


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] mxm commented on pull request #11590: [BEAM-8944] Improve UnboundedThreadPoolExecutor performance

Posted by GitBox <gi...@apache.org>.
mxm commented on pull request #11590:
URL: https://github.com/apache/beam/pull/11590#issuecomment-624854053


   Seems like the streaming test is timing out when it wasn't before. Not sure if that's related to the changes here. I'll check.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org