You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/05/01 20:32:00 UTC

[jira] [Work logged] (BEAM-8944) Python SDK harness performance degradation with UnboundedThreadPoolExecutor

     [ https://issues.apache.org/jira/browse/BEAM-8944?focusedWorklogId=429650&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-429650 ]

ASF GitHub Bot logged work on BEAM-8944:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/May/20 20:31
            Start Date: 01/May/20 20:31
    Worklog Time Spent: 10m 
      Work Description: lukecwik opened a new pull request #11590:
URL: https://github.com/apache/beam/pull/11590


   Existing performance suffered because of the use of timed waits and also due to the increase in number of "threading" objects being invoked.
   
   Using the benchmark from https://issues.apache.org/jira/browse/BEAM-8944?focusedCommentId=17074641&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17074641
   
   Improved performance from 5.52s down to 1.82s which is faster then the ThreadPoolExecutor with 12 threads (the default being used before) but still slower then the ThreadPoolExecutor with 1 thread.
   
   The prior performance was:
   ```
   <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 0x7feee655c8d0> uses 5.52247905731
            584051 function calls in 5.495 seconds
   
      Ordered by: internal time
   
      ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       86250    4.386    0.000    4.386    0.000 {method 'acquire' of 'thread.lock' objects}
       19800    0.150    0.000    4.773    0.000 Queue.py:150(get)
       29698    0.123    0.000    0.365    0.000 threading.py:373(notify)
        9900    0.114    0.000    0.114    0.000 {method '__enter__' of 'thread.lock' objects}
        9899    0.106    0.000    0.463    0.000 thread_pool_executor.py:103(accepted_work)
        9188    0.089    0.000    4.095    0.000 threading.py:309(wait)
        9903    0.078    0.000    0.129    0.000 threading.py:260(__init__)
        9900    0.061    0.000    1.092    0.000 thread_pool_executor.py:133(submit)
        9899    0.055    0.000    0.358    0.000 threading.py:576(set)
       38886    0.044    0.000    0.285    0.000 threading.py:300(_is_owned)
        9900    0.041    0.000    0.186    0.000 _base.py:318(__init__)
       28987    0.023    0.000    0.031    0.000 Queue.py:200(_qsize)
        9900    0.022    0.000    0.030    0.000 threading.py:132(__init__)
       38887    0.021    0.000    0.021    0.000 threading.py:64(_note)
        9899    0.019    0.000    0.163    0.000 threading.py:400(notifyAll)
       19799    0.018    0.000    0.023    0.000 Queue.py:208(_get)
       38887    0.016    0.000    0.016    0.000 {method 'release' of 'thread.lock' objects}
        9903    0.016    0.000    0.145    0.000 threading.py:242(Condition)
       19806    0.014    0.000    0.014    0.000 threading.py:59(__init__)
        9900    0.014    0.000    0.127    0.000 threading.py:285(__enter__)
        9188    0.013    0.000    0.090    0.000 threading.py:297(_acquire_restore)
        9900    0.013    0.000    0.043    0.000 threading.py:114(RLock)
        9900    0.011    0.000    0.011    0.000 thread_pool_executor.py:34(__init__)
       38886    0.011    0.000    0.011    0.000 {len}
        9900    0.010    0.000    0.012    0.000 threading.py:288(__exit__)
        9188    0.008    0.000    0.012    0.000 threading.py:294(_release_save)
       19092    0.006    0.000    0.006    0.000 {thread.allocate_lock}
       19799    0.005    0.000    0.005    0.000 {method 'popleft' of 'collections.deque' objects}
        9188    0.004    0.000    0.004    0.000 {method 'append' of 'list' objects}
        9899    0.003    0.000    0.003    0.000 {method 'remove' of 'list' objects}
        9900    0.002    0.000    0.002    0.000 {method '__exit__' of 'thread.lock' objects}
           1    0.000    0.000    0.000    0.000 {thread.start_new_thread}
           1    0.000    0.000    0.000    0.000 threading.py:647(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:717(start)
           1    0.000    0.000    0.000    0.000 thread_pool_executor.py:58(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:620(_newname)
           1    0.000    0.000    0.000    0.000 _weakrefset.py:83(add)
           1    0.000    0.000    0.000    0.000 threading.py:597(wait)
           2    0.000    0.000    0.000    0.000 threading.py:561(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:1142(currentThread)
           2    0.000    0.000    0.000    0.000 threading.py:542(Event)
           1    0.000    0.000    0.000    0.000 threading.py:700(_set_daemon)
           1    0.000    0.000    0.000    0.000 threading.py:1014(daemon)
           2    0.000    0.000    0.000    0.000 threading.py:570(isSet)
           1    0.000    0.000    0.000    0.000 threading.py:999(daemon)
           1    0.000    0.000    0.000    0.000 {thread.get_ident}
           1    0.000    0.000    0.000    0.000 {method 'add' of 'set' objects}
           1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
   ```
   
   The new performance is:
   ```
   <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 0x7f71dd5cacd0> uses 1.82196497917
            504935 function calls in 1.787 seconds
   
      Ordered by: internal time
   
      ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       62233    0.940    0.000    0.940    0.000 {method 'acquire' of 'thread.lock' objects}
        9900    0.142    0.000    1.248    0.000 thread_pool_executor.py:98(submit)
       29698    0.102    0.000    0.364    0.000 threading.py:373(notify)
       19800    0.101    0.000    0.880    0.000 Queue.py:150(get)
        9900    0.068    0.000    0.068    0.000 {method '__enter__' of 'thread.lock' objects}
       40304    0.063    0.000    0.063    0.000 {method 'release' of 'thread.lock' objects}
        9899    0.053    0.000    0.323    0.000 threading.py:479(release)
        9899    0.044    0.000    0.299    0.000 Queue.py:86(qsize)
        9903    0.043    0.000    0.082    0.000 threading.py:260(__init__)
       30407    0.031    0.000    0.221    0.000 threading.py:300(_is_owned)
        9900    0.027    0.000    0.119    0.000 _base.py:318(__init__)
       30407    0.022    0.000    0.027    0.000 Queue.py:200(_qsize)
        9900    0.017    0.000    0.025    0.000 threading.py:132(__init__)
       19799    0.016    0.000    0.019    0.000 Queue.py:208(_get)
        9899    0.015    0.000    0.338    0.000 thread_pool_executor.py:77(assign_work)
       40307    0.015    0.000    0.015    0.000 threading.py:64(_note)
        9900    0.015    0.000    0.019    0.000 threading.py:288(__exit__)
        9903    0.011    0.000    0.093    0.000 threading.py:242(Condition)
       19806    0.010    0.000    0.010    0.000 threading.py:59(__init__)
        9900    0.009    0.000    0.034    0.000 threading.py:114(RLock)
        9900    0.009    0.000    0.077    0.000 threading.py:285(__enter__)
         709    0.007    0.000    0.148    0.000 threading.py:309(wait)
        9900    0.007    0.000    0.007    0.000 thread_pool_executor.py:34(__init__)
       30407    0.005    0.000    0.005    0.000 {len}
        9900    0.004    0.000    0.004    0.000 {method '__exit__' of 'thread.lock' objects}
       10613    0.003    0.000    0.003    0.000 {thread.allocate_lock}
       19799    0.003    0.000    0.003    0.000 {method 'popleft' of 'collections.deque' objects}
        9896    0.002    0.000    0.002    0.000 {method 'remove' of 'list' objects}
         709    0.001    0.000    0.023    0.000 threading.py:297(_acquire_restore)
         709    0.001    0.000    0.002    0.000 threading.py:294(_release_save)
         709    0.000    0.000    0.000    0.000 {method 'append' of 'list' objects}
           1    0.000    0.000    0.000    0.000 {thread.start_new_thread}
           1    0.000    0.000    0.000    0.000 threading.py:717(start)
           1    0.000    0.000    0.000    0.000 threading.py:647(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:620(_newname)
           1    0.000    0.000    0.000    0.000 thread_pool_executor.py:58(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:1142(currentThread)
           1    0.000    0.000    0.000    0.000 _weakrefset.py:83(add)
           1    0.000    0.000    0.000    0.000 threading.py:597(wait)
           1    0.000    0.000    0.000    0.000 threading.py:433(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:700(_set_daemon)
           1    0.000    0.000    0.000    0.000 threading.py:542(Event)
           1    0.000    0.000    0.000    0.000 threading.py:561(__init__)
           1    0.000    0.000    0.000    0.000 threading.py:1014(daemon)
           1    0.000    0.000    0.000    0.000 threading.py:412(Semaphore)
           2    0.000    0.000    0.000    0.000 threading.py:570(isSet)
           1    0.000    0.000    0.000    0.000 threading.py:999(daemon)
           1    0.000    0.000    0.000    0.000 {thread.get_ident}
           1    0.000    0.000    0.000    0.000 {method 'add' of 'set' objects}
           1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
   ```
   
   The max_workers=1 ThreadPoolExecutor is:
   ```
   <concurrent.futures.thread.ThreadPoolExecutor object at 0x7f71dd5e7710> uses 1.03040599823
            436503 function calls in 1.011 seconds
   
      Ordered by: internal time
   
      ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       59160    0.462    0.000    0.462    0.000 {method 'acquire' of 'thread.lock' objects}
        9900    0.114    0.000    0.114    0.000 {method '__enter__' of 'thread.lock' objects}
        9900    0.072    0.000    0.537    0.000 thread.py:128(submit)
        9900    0.045    0.000    0.474    0.000 Queue.py:150(get)
        9900    0.034    0.000    0.167    0.000 threading.py:440(acquire)
        9900    0.034    0.000    0.061    0.000 threading.py:260(__init__)
       19800    0.033    0.000    0.112    0.000 threading.py:373(notify)
        9900    0.031    0.000    0.192    0.000 Queue.py:107(put)
        4890    0.024    0.000    0.269    0.000 threading.py:309(wait)
        9900    0.022    0.000    0.092    0.000 _base.py:318(__init__)
       25561    0.017    0.000    0.017    0.000 {method 'release' of 'thread.lock' objects}
       24690    0.016    0.000    0.091    0.000 threading.py:300(_is_owned)
        9900    0.011    0.000    0.016    0.000 threading.py:132(__init__)
       34590    0.010    0.000    0.010    0.000 threading.py:64(_note)
        9900    0.009    0.000    0.176    0.000 thread.py:141(_adjust_thread_count)
        9900    0.009    0.000    0.070    0.000 threading.py:242(Condition)
        9900    0.007    0.000    0.023    0.000 threading.py:114(RLock)
       14790    0.007    0.000    0.009    0.000 Queue.py:200(_qsize)
       19800    0.007    0.000    0.007    0.000 threading.py:59(__init__)
        9900    0.007    0.000    0.008    0.000 Queue.py:208(_get)
        9900    0.006    0.000    0.010    0.000 threading.py:288(__exit__)
        9900    0.006    0.000    0.007    0.000 Queue.py:204(_put)
        9900    0.006    0.000    0.119    0.000 threading.py:285(__enter__)
        9900    0.006    0.000    0.006    0.000 thread.py:52(__init__)
        9900    0.003    0.000    0.003    0.000 {method '__exit__' of 'thread.lock' objects}
       14790    0.003    0.000    0.003    0.000 {thread.allocate_lock}
        4890    0.003    0.000    0.020    0.000 threading.py:297(_acquire_restore)
        4890    0.003    0.000    0.008    0.000 threading.py:294(_release_save)
       14790    0.002    0.000    0.002    0.000 {len}
        9900    0.001    0.000    0.001    0.000 {method 'popleft' of 'collections.deque' objects}
        9900    0.001    0.000    0.001    0.000 {method 'append' of 'collections.deque' objects}
        4890    0.001    0.000    0.001    0.000 {method 'append' of 'list' objects}
         871    0.000    0.000    0.000    0.000 {method 'remove' of 'list' objects}
           1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
   ```
   
   The max_workers=12 ThreadPoolExecutor is:
   ```
   <concurrent.futures.thread.ThreadPoolExecutor object at 0x7f71dd575d90> uses 2.03954315186
            402533 function calls in 2.002 seconds
   
      Ordered by: internal time
   
      ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       44284    0.878    0.000    0.878    0.000 {method 'acquire' of 'thread.lock' objects}
        9902    0.228    0.000    0.228    0.000 {method '__enter__' of 'thread.lock' objects}
        9900    0.207    0.000    1.342    0.000 thread.py:128(submit)
        9900    0.084    0.000    0.660    0.000 Queue.py:150(get)
       19800    0.084    0.000    0.269    0.000 threading.py:373(notify)
        9900    0.080    0.000    0.351    0.000 threading.py:440(acquire)
       30862    0.069    0.000    0.069    0.000 {method 'release' of 'thread.lock' objects}
        9900    0.054    0.000    0.594    0.000 Queue.py:107(put)
        9904    0.054    0.000    0.102    0.000 threading.py:260(__init__)
        9900    0.032    0.000    0.150    0.000 _base.py:318(__init__)
       20971    0.027    0.000    0.144    0.000 threading.py:300(_is_owned)
        9900    0.022    0.000    0.383    0.000 thread.py:141(_adjust_thread_count)
        9900    0.018    0.000    0.028    0.000 threading.py:132(__init__)
       30871    0.017    0.000    0.017    0.000 threading.py:64(_note)
        9904    0.016    0.000    0.118    0.000 threading.py:242(Condition)
        9902    0.015    0.000    0.244    0.000 threading.py:285(__enter__)
        9902    0.014    0.000    0.020    0.000 threading.py:288(__exit__)
       19808    0.013    0.000    0.013    0.000 threading.py:59(__init__)
       11069    0.013    0.000    0.016    0.000 Queue.py:200(_qsize)
        9900    0.013    0.000    0.041    0.000 threading.py:114(RLock)
        1171    0.011    0.000    0.234    0.000 threading.py:309(wait)
        9900    0.011    0.000    0.012    0.000 Queue.py:204(_put)
        9900    0.011    0.000    0.012    0.000 Queue.py:208(_get)
        9900    0.008    0.000    0.008    0.000 thread.py:52(__init__)
        9902    0.007    0.000    0.007    0.000 {method '__exit__' of 'thread.lock' objects}
       11075    0.005    0.000    0.005    0.000 {thread.allocate_lock}
       11071    0.003    0.000    0.003    0.000 {len}
        9891    0.002    0.000    0.002    0.000 {method 'remove' of 'list' objects}
        9900    0.002    0.000    0.002    0.000 {method 'popleft' of 'collections.deque' objects}
        9900    0.002    0.000    0.002    0.000 {method 'append' of 'collections.deque' objects}
        1171    0.001    0.000    0.041    0.000 threading.py:297(_acquire_restore)
        1171    0.001    0.000    0.004    0.000 threading.py:294(_release_save)
        1171    0.000    0.000    0.000    0.000 {method 'append' of 'list' objects}
           2    0.000    0.000    0.000    0.000 {thread.start_new_thread}
           2    0.000    0.000    0.000    0.000 threading.py:647(__init__)
           2    0.000    0.000    0.010    0.005 threading.py:717(start)
           2    0.000    0.000    0.010    0.005 threading.py:597(wait)
           2    0.000    0.000    0.000    0.000 threading.py:700(_set_daemon)
           2    0.000    0.000    0.000    0.000 weakref.py:368(__setitem__)
           2    0.000    0.000    0.000    0.000 threading.py:542(Event)
           2    0.000    0.000    0.000    0.000 threading.py:1142(currentThread)
           2    0.000    0.000    0.000    0.000 threading.py:561(__init__)
           2    0.000    0.000    0.000    0.000 threading.py:1014(daemon)
           2    0.000    0.000    0.000    0.000 {method 'add' of 'set' objects}
           2    0.000    0.000    0.000    0.000 threading.py:999(daemon)
           4    0.000    0.000    0.000    0.000 threading.py:570(isSet)
           2    0.000    0.000    0.000    0.000 {thread.get_ident}
           1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
   ```
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 429650)
    Time Spent: 4.5h  (was: 4h 20m)

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-8944
>                 URL: https://issues.apache.org/jira/browse/BEAM-8944
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-harness
>    Affects Versions: 2.18.0
>            Reporter: Yichi Zhang
>            Priority: Critical
>         Attachments: checkpoint-duration.png, profiling.png, profiling_one_thread.png, profiling_twelve_threads.png
>
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load tests.
>  
> After some investigation, it appears to be caused by swapping the original ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 1000000
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
> <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 0x7fab400dbe50> uses 268.160675049
>  <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab40096290> uses 79.904583931
>  <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab400dbe50> uses 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)