You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/10/19 06:40:52 UTC
[1/2] incubator-beam git commit: Close threadpools when finished with
them
Repository: incubator-beam
Updated Branches:
refs/heads/python-sdk 8e1793caf -> 1260bf779
Close threadpools when finished with them
This avoids building up an arbitrary number of dangling threads,
which can cause issues in testing and is undesirable in production.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/152a828d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/152a828d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/152a828d
Branch: refs/heads/python-sdk
Commit: 152a828dc3b45b88b4b00416012cc11f3b25db00
Parents: 8e1793c
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Oct 18 17:08:18 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Oct 18 23:40:15 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/152a828d/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index e067833..931628c 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -122,11 +122,17 @@ class FileBasedSource(iobase.BoundedSource):
@staticmethod
def _estimate_sizes_in_parallel(file_names):
- def _calculate_size_of_file(file_name):
- return fileio.ChannelFactory.size_in_bytes(file_name)
-
- return ThreadPool(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION).map(
- _calculate_size_of_file, file_names)
+ if not file_names:
+ return []
+ elif len(file_names) == 1:
+ return [fileio.ChannelFactory.size_in_bytes(file_names[0])]
+ else:
+ pool = ThreadPool(
+ min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names)))
+ try:
+ return pool.map(fileio.ChannelFactory.size_in_bytes, file_names)
+ finally:
+ pool.terminate()
def split(
self, desired_bundle_size=None, start_position=None, stop_position=None):
[2/2] incubator-beam git commit: Closes #1132
Posted by ro...@apache.org.
Closes #1132
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1260bf77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1260bf77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1260bf77
Branch: refs/heads/python-sdk
Commit: 1260bf779658947d8f62323eefe48b49a4269012
Parents: 8e1793c 152a828
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Oct 18 23:40:16 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Oct 18 23:40:16 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/filebasedsource.py | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------