You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/09/20 00:40:16 UTC

[2/2] incubator-beam git commit: Removed unnecessary throttling of rename parallelism.

Removed unnecessary throttling of rename parallelism.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24bb8f19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24bb8f19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24bb8f19

Branch: refs/heads/python-sdk
Commit: 24bb8f19329b3d0c1d0330e0c16c41ab1554684d
Parents: 4b7fe2d
Author: Marian Dvorsky <ma...@google.com>
Authored: Fri Sep 16 10:46:32 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Sep 19 17:39:47 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/fileio.py | 9 ++-------
 1 file changed, 2 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bb8f19/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index e3d4dae..d640d50 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -693,11 +693,7 @@ class FileSink(iobase.Sink):
   The output of this write is a PCollection of all written shards.
   """
 
-  # Approximate number of write results be assigned for each rename thread.
-  _WRITE_RESULTS_PER_RENAME_THREAD = 100
-
-  # Max number of threads to be used for renaming even if it means each thread
-  # will process more write results.
+  # Max number of threads to be used for renaming.
   _MAX_RENAME_THREADS = 64
 
   def __init__(self,
@@ -785,8 +781,7 @@ class FileSink(iobase.Sink):
     writer_results = sorted(writer_results)
     num_shards = len(writer_results)
     channel_factory = ChannelFactory()
-    min_threads = min(num_shards / FileSink._WRITE_RESULTS_PER_RENAME_THREAD,
-                      FileSink._MAX_RENAME_THREADS)
+    min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS)
     num_threads = max(1, min_threads)
 
     rename_ops = []