You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/05/01 20:32:00 UTC
[jira] [Work logged] (BEAM-8944) Python SDK harness performance
degradation with UnboundedThreadPoolExecutor
[ https://issues.apache.org/jira/browse/BEAM-8944?focusedWorklogId=429650&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-429650 ]
ASF GitHub Bot logged work on BEAM-8944:
----------------------------------------
Author: ASF GitHub Bot
Created on: 01/May/20 20:31
Start Date: 01/May/20 20:31
Worklog Time Spent: 10m
Work Description: lukecwik opened a new pull request #11590:
URL: https://github.com/apache/beam/pull/11590
Existing performance suffered because of the use of timed waits and also due to the increase in number of "threading" objects being invoked.
Using the benchmark from https://issues.apache.org/jira/browse/BEAM-8944?focusedCommentId=17074641&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17074641
Improved performance from 5.52s down to 1.82s which is faster then the ThreadPoolExecutor with 12 threads (the default being used before) but still slower then the ThreadPoolExecutor with 1 thread.
The prior performance was:
```
<apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 0x7feee655c8d0> uses 5.52247905731
584051 function calls in 5.495 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
86250 4.386 0.000 4.386 0.000 {method 'acquire' of 'thread.lock' objects}
19800 0.150 0.000 4.773 0.000 Queue.py:150(get)
29698 0.123 0.000 0.365 0.000 threading.py:373(notify)
9900 0.114 0.000 0.114 0.000 {method '__enter__' of 'thread.lock' objects}
9899 0.106 0.000 0.463 0.000 thread_pool_executor.py:103(accepted_work)
9188 0.089 0.000 4.095 0.000 threading.py:309(wait)
9903 0.078 0.000 0.129 0.000 threading.py:260(__init__)
9900 0.061 0.000 1.092 0.000 thread_pool_executor.py:133(submit)
9899 0.055 0.000 0.358 0.000 threading.py:576(set)
38886 0.044 0.000 0.285 0.000 threading.py:300(_is_owned)
9900 0.041 0.000 0.186 0.000 _base.py:318(__init__)
28987 0.023 0.000 0.031 0.000 Queue.py:200(_qsize)
9900 0.022 0.000 0.030 0.000 threading.py:132(__init__)
38887 0.021 0.000 0.021 0.000 threading.py:64(_note)
9899 0.019 0.000 0.163 0.000 threading.py:400(notifyAll)
19799 0.018 0.000 0.023 0.000 Queue.py:208(_get)
38887 0.016 0.000 0.016 0.000 {method 'release' of 'thread.lock' objects}
9903 0.016 0.000 0.145 0.000 threading.py:242(Condition)
19806 0.014 0.000 0.014 0.000 threading.py:59(__init__)
9900 0.014 0.000 0.127 0.000 threading.py:285(__enter__)
9188 0.013 0.000 0.090 0.000 threading.py:297(_acquire_restore)
9900 0.013 0.000 0.043 0.000 threading.py:114(RLock)
9900 0.011 0.000 0.011 0.000 thread_pool_executor.py:34(__init__)
38886 0.011 0.000 0.011 0.000 {len}
9900 0.010 0.000 0.012 0.000 threading.py:288(__exit__)
9188 0.008 0.000 0.012 0.000 threading.py:294(_release_save)
19092 0.006 0.000 0.006 0.000 {thread.allocate_lock}
19799 0.005 0.000 0.005 0.000 {method 'popleft' of 'collections.deque' objects}
9188 0.004 0.000 0.004 0.000 {method 'append' of 'list' objects}
9899 0.003 0.000 0.003 0.000 {method 'remove' of 'list' objects}
9900 0.002 0.000 0.002 0.000 {method '__exit__' of 'thread.lock' objects}
1 0.000 0.000 0.000 0.000 {thread.start_new_thread}
1 0.000 0.000 0.000 0.000 threading.py:647(__init__)
1 0.000 0.000 0.000 0.000 threading.py:717(start)
1 0.000 0.000 0.000 0.000 thread_pool_executor.py:58(__init__)
1 0.000 0.000 0.000 0.000 threading.py:620(_newname)
1 0.000 0.000 0.000 0.000 _weakrefset.py:83(add)
1 0.000 0.000 0.000 0.000 threading.py:597(wait)
2 0.000 0.000 0.000 0.000 threading.py:561(__init__)
1 0.000 0.000 0.000 0.000 threading.py:1142(currentThread)
2 0.000 0.000 0.000 0.000 threading.py:542(Event)
1 0.000 0.000 0.000 0.000 threading.py:700(_set_daemon)
1 0.000 0.000 0.000 0.000 threading.py:1014(daemon)
2 0.000 0.000 0.000 0.000 threading.py:570(isSet)
1 0.000 0.000 0.000 0.000 threading.py:999(daemon)
1 0.000 0.000 0.000 0.000 {thread.get_ident}
1 0.000 0.000 0.000 0.000 {method 'add' of 'set' objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
```
The new performance is:
```
<apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 0x7f71dd5cacd0> uses 1.82196497917
504935 function calls in 1.787 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
62233 0.940 0.000 0.940 0.000 {method 'acquire' of 'thread.lock' objects}
9900 0.142 0.000 1.248 0.000 thread_pool_executor.py:98(submit)
29698 0.102 0.000 0.364 0.000 threading.py:373(notify)
19800 0.101 0.000 0.880 0.000 Queue.py:150(get)
9900 0.068 0.000 0.068 0.000 {method '__enter__' of 'thread.lock' objects}
40304 0.063 0.000 0.063 0.000 {method 'release' of 'thread.lock' objects}
9899 0.053 0.000 0.323 0.000 threading.py:479(release)
9899 0.044 0.000 0.299 0.000 Queue.py:86(qsize)
9903 0.043 0.000 0.082 0.000 threading.py:260(__init__)
30407 0.031 0.000 0.221 0.000 threading.py:300(_is_owned)
9900 0.027 0.000 0.119 0.000 _base.py:318(__init__)
30407 0.022 0.000 0.027 0.000 Queue.py:200(_qsize)
9900 0.017 0.000 0.025 0.000 threading.py:132(__init__)
19799 0.016 0.000 0.019 0.000 Queue.py:208(_get)
9899 0.015 0.000 0.338 0.000 thread_pool_executor.py:77(assign_work)
40307 0.015 0.000 0.015 0.000 threading.py:64(_note)
9900 0.015 0.000 0.019 0.000 threading.py:288(__exit__)
9903 0.011 0.000 0.093 0.000 threading.py:242(Condition)
19806 0.010 0.000 0.010 0.000 threading.py:59(__init__)
9900 0.009 0.000 0.034 0.000 threading.py:114(RLock)
9900 0.009 0.000 0.077 0.000 threading.py:285(__enter__)
709 0.007 0.000 0.148 0.000 threading.py:309(wait)
9900 0.007 0.000 0.007 0.000 thread_pool_executor.py:34(__init__)
30407 0.005 0.000 0.005 0.000 {len}
9900 0.004 0.000 0.004 0.000 {method '__exit__' of 'thread.lock' objects}
10613 0.003 0.000 0.003 0.000 {thread.allocate_lock}
19799 0.003 0.000 0.003 0.000 {method 'popleft' of 'collections.deque' objects}
9896 0.002 0.000 0.002 0.000 {method 'remove' of 'list' objects}
709 0.001 0.000 0.023 0.000 threading.py:297(_acquire_restore)
709 0.001 0.000 0.002 0.000 threading.py:294(_release_save)
709 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects}
1 0.000 0.000 0.000 0.000 {thread.start_new_thread}
1 0.000 0.000 0.000 0.000 threading.py:717(start)
1 0.000 0.000 0.000 0.000 threading.py:647(__init__)
1 0.000 0.000 0.000 0.000 threading.py:620(_newname)
1 0.000 0.000 0.000 0.000 thread_pool_executor.py:58(__init__)
1 0.000 0.000 0.000 0.000 threading.py:1142(currentThread)
1 0.000 0.000 0.000 0.000 _weakrefset.py:83(add)
1 0.000 0.000 0.000 0.000 threading.py:597(wait)
1 0.000 0.000 0.000 0.000 threading.py:433(__init__)
1 0.000 0.000 0.000 0.000 threading.py:700(_set_daemon)
1 0.000 0.000 0.000 0.000 threading.py:542(Event)
1 0.000 0.000 0.000 0.000 threading.py:561(__init__)
1 0.000 0.000 0.000 0.000 threading.py:1014(daemon)
1 0.000 0.000 0.000 0.000 threading.py:412(Semaphore)
2 0.000 0.000 0.000 0.000 threading.py:570(isSet)
1 0.000 0.000 0.000 0.000 threading.py:999(daemon)
1 0.000 0.000 0.000 0.000 {thread.get_ident}
1 0.000 0.000 0.000 0.000 {method 'add' of 'set' objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
```
The max_workers=1 ThreadPoolExecutor is:
```
<concurrent.futures.thread.ThreadPoolExecutor object at 0x7f71dd5e7710> uses 1.03040599823
436503 function calls in 1.011 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
59160 0.462 0.000 0.462 0.000 {method 'acquire' of 'thread.lock' objects}
9900 0.114 0.000 0.114 0.000 {method '__enter__' of 'thread.lock' objects}
9900 0.072 0.000 0.537 0.000 thread.py:128(submit)
9900 0.045 0.000 0.474 0.000 Queue.py:150(get)
9900 0.034 0.000 0.167 0.000 threading.py:440(acquire)
9900 0.034 0.000 0.061 0.000 threading.py:260(__init__)
19800 0.033 0.000 0.112 0.000 threading.py:373(notify)
9900 0.031 0.000 0.192 0.000 Queue.py:107(put)
4890 0.024 0.000 0.269 0.000 threading.py:309(wait)
9900 0.022 0.000 0.092 0.000 _base.py:318(__init__)
25561 0.017 0.000 0.017 0.000 {method 'release' of 'thread.lock' objects}
24690 0.016 0.000 0.091 0.000 threading.py:300(_is_owned)
9900 0.011 0.000 0.016 0.000 threading.py:132(__init__)
34590 0.010 0.000 0.010 0.000 threading.py:64(_note)
9900 0.009 0.000 0.176 0.000 thread.py:141(_adjust_thread_count)
9900 0.009 0.000 0.070 0.000 threading.py:242(Condition)
9900 0.007 0.000 0.023 0.000 threading.py:114(RLock)
14790 0.007 0.000 0.009 0.000 Queue.py:200(_qsize)
19800 0.007 0.000 0.007 0.000 threading.py:59(__init__)
9900 0.007 0.000 0.008 0.000 Queue.py:208(_get)
9900 0.006 0.000 0.010 0.000 threading.py:288(__exit__)
9900 0.006 0.000 0.007 0.000 Queue.py:204(_put)
9900 0.006 0.000 0.119 0.000 threading.py:285(__enter__)
9900 0.006 0.000 0.006 0.000 thread.py:52(__init__)
9900 0.003 0.000 0.003 0.000 {method '__exit__' of 'thread.lock' objects}
14790 0.003 0.000 0.003 0.000 {thread.allocate_lock}
4890 0.003 0.000 0.020 0.000 threading.py:297(_acquire_restore)
4890 0.003 0.000 0.008 0.000 threading.py:294(_release_save)
14790 0.002 0.000 0.002 0.000 {len}
9900 0.001 0.000 0.001 0.000 {method 'popleft' of 'collections.deque' objects}
9900 0.001 0.000 0.001 0.000 {method 'append' of 'collections.deque' objects}
4890 0.001 0.000 0.001 0.000 {method 'append' of 'list' objects}
871 0.000 0.000 0.000 0.000 {method 'remove' of 'list' objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
```
The max_workers=12 ThreadPoolExecutor is:
```
<concurrent.futures.thread.ThreadPoolExecutor object at 0x7f71dd575d90> uses 2.03954315186
402533 function calls in 2.002 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
44284 0.878 0.000 0.878 0.000 {method 'acquire' of 'thread.lock' objects}
9902 0.228 0.000 0.228 0.000 {method '__enter__' of 'thread.lock' objects}
9900 0.207 0.000 1.342 0.000 thread.py:128(submit)
9900 0.084 0.000 0.660 0.000 Queue.py:150(get)
19800 0.084 0.000 0.269 0.000 threading.py:373(notify)
9900 0.080 0.000 0.351 0.000 threading.py:440(acquire)
30862 0.069 0.000 0.069 0.000 {method 'release' of 'thread.lock' objects}
9900 0.054 0.000 0.594 0.000 Queue.py:107(put)
9904 0.054 0.000 0.102 0.000 threading.py:260(__init__)
9900 0.032 0.000 0.150 0.000 _base.py:318(__init__)
20971 0.027 0.000 0.144 0.000 threading.py:300(_is_owned)
9900 0.022 0.000 0.383 0.000 thread.py:141(_adjust_thread_count)
9900 0.018 0.000 0.028 0.000 threading.py:132(__init__)
30871 0.017 0.000 0.017 0.000 threading.py:64(_note)
9904 0.016 0.000 0.118 0.000 threading.py:242(Condition)
9902 0.015 0.000 0.244 0.000 threading.py:285(__enter__)
9902 0.014 0.000 0.020 0.000 threading.py:288(__exit__)
19808 0.013 0.000 0.013 0.000 threading.py:59(__init__)
11069 0.013 0.000 0.016 0.000 Queue.py:200(_qsize)
9900 0.013 0.000 0.041 0.000 threading.py:114(RLock)
1171 0.011 0.000 0.234 0.000 threading.py:309(wait)
9900 0.011 0.000 0.012 0.000 Queue.py:204(_put)
9900 0.011 0.000 0.012 0.000 Queue.py:208(_get)
9900 0.008 0.000 0.008 0.000 thread.py:52(__init__)
9902 0.007 0.000 0.007 0.000 {method '__exit__' of 'thread.lock' objects}
11075 0.005 0.000 0.005 0.000 {thread.allocate_lock}
11071 0.003 0.000 0.003 0.000 {len}
9891 0.002 0.000 0.002 0.000 {method 'remove' of 'list' objects}
9900 0.002 0.000 0.002 0.000 {method 'popleft' of 'collections.deque' objects}
9900 0.002 0.000 0.002 0.000 {method 'append' of 'collections.deque' objects}
1171 0.001 0.000 0.041 0.000 threading.py:297(_acquire_restore)
1171 0.001 0.000 0.004 0.000 threading.py:294(_release_save)
1171 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects}
2 0.000 0.000 0.000 0.000 {thread.start_new_thread}
2 0.000 0.000 0.000 0.000 threading.py:647(__init__)
2 0.000 0.000 0.010 0.005 threading.py:717(start)
2 0.000 0.000 0.010 0.005 threading.py:597(wait)
2 0.000 0.000 0.000 0.000 threading.py:700(_set_daemon)
2 0.000 0.000 0.000 0.000 weakref.py:368(__setitem__)
2 0.000 0.000 0.000 0.000 threading.py:542(Event)
2 0.000 0.000 0.000 0.000 threading.py:1142(currentThread)
2 0.000 0.000 0.000 0.000 threading.py:561(__init__)
2 0.000 0.000 0.000 0.000 threading.py:1014(daemon)
2 0.000 0.000 0.000 0.000 {method 'add' of 'set' objects}
2 0.000 0.000 0.000 0.000 threading.py:999(daemon)
4 0.000 0.000 0.000 0.000 threading.py:570(isSet)
2 0.000 0.000 0.000 0.000 {thread.get_ident}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
```
------------------------
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Issue Time Tracking
-------------------
Worklog Id: (was: 429650)
Time Spent: 4.5h (was: 4h 20m)
> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---------------------------------------------------------------------------
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Affects Versions: 2.18.0
> Reporter: Yichi Zhang
> Priority: Critical
> Attachments: checkpoint-duration.png, profiling.png, profiling_one_thread.png, profiling_twelve_threads.png
>
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load tests.
>
> After some investigation, it appears to be caused by swapping the original ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is that python performance is worse with more threads on cpu-bounded tasks.
>
> A simple test for comparing the multiple thread pool executor performance:
>
> {code:python}
> def test_performance(self):
> def run_perf(executor):
> total_number = 1000000
> q = queue.Queue()
> def task(number):
> hash(number)
> q.put(number + 200)
> return number
> t = time.time()
> count = 0
> for i in range(200):
> q.put(i)
> while count < total_number:
> executor.submit(task, q.get(block=True))
> count += 1
> print('%s uses %s' % (executor, time.time() - t))
> with UnboundedThreadPoolExecutor() as executor:
> run_perf(executor)
> with futures.ThreadPoolExecutor(max_workers=1) as executor:
> run_perf(executor)
> with futures.ThreadPoolExecutor(max_workers=12) as executor:
> run_perf(executor)
> {code}
> Results:
> <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 0x7fab400dbe50> uses 268.160675049
> <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab40096290> uses 79.904583931
> <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab400dbe50> uses 191.179054976
> ```
> Profiling:
> UnboundedThreadPoolExecutor:
> !profiling.png!
> 1 Thread ThreadPoolExecutor:
> !profiling_one_thread.png!
> 12 Threads ThreadPoolExecutor:
> !profiling_twelve_threads.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)