You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/09/20 00:40:16 UTC
[2/2] incubator-beam git commit: Removed unnecessary throttling of
rename parallelism.
Removed unnecessary throttling of rename parallelism.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24bb8f19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24bb8f19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24bb8f19
Branch: refs/heads/python-sdk
Commit: 24bb8f19329b3d0c1d0330e0c16c41ab1554684d
Parents: 4b7fe2d
Author: Marian Dvorsky <ma...@google.com>
Authored: Fri Sep 16 10:46:32 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Sep 19 17:39:47 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/io/fileio.py | 9 ++-------
1 file changed, 2 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bb8f19/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index e3d4dae..d640d50 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -693,11 +693,7 @@ class FileSink(iobase.Sink):
The output of this write is a PCollection of all written shards.
"""
- # Approximate number of write results be assigned for each rename thread.
- _WRITE_RESULTS_PER_RENAME_THREAD = 100
-
- # Max number of threads to be used for renaming even if it means each thread
- # will process more write results.
+ # Max number of threads to be used for renaming.
_MAX_RENAME_THREADS = 64
def __init__(self,
@@ -785,8 +781,7 @@ class FileSink(iobase.Sink):
writer_results = sorted(writer_results)
num_shards = len(writer_results)
channel_factory = ChannelFactory()
- min_threads = min(num_shards / FileSink._WRITE_RESULTS_PER_RENAME_THREAD,
- FileSink._MAX_RENAME_THREADS)
+ min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS)
num_threads = max(1, min_threads)
rename_ops = []