You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/01/30 20:43:58 UTC

[1/2] beam git commit: Updates places in SDK that creates thread pools.

Repository: beam
Updated Branches:
  refs/heads/python-sdk f29527f68 -> 475707f0f


Updates places in SDK that creates thread pools.

Moves ThreadPool creation to a util function.
Records and resets logging level due to this being reset by  apitools when used with a ThreadPool.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51afc1cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51afc1cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51afc1cc

Branch: refs/heads/python-sdk
Commit: 51afc1ccfe78a0657b5f9bc139d1d4e7938ed672
Parents: f29527f
Author: Chamikara Jayalath <ch...@google.com>
Authored: Sat Jan 28 08:54:33 2017 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jan 30 12:43:37 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/util.py      | 33 ++++++++++++++++++++++
 sdks/python/apache_beam/io/filebasedsource.py | 17 +++--------
 sdks/python/apache_beam/io/fileio.py          | 11 ++------
 3 files changed, 40 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/internal/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py
index 2d12d49..5b31e88 100644
--- a/sdks/python/apache_beam/internal/util.py
+++ b/sdks/python/apache_beam/internal/util.py
@@ -17,6 +17,11 @@
 
 """Utility functions used throughout the package."""
 
+import logging
+from multiprocessing.pool import ThreadPool
+import threading
+import weakref
+
 
 class ArgumentPlaceholder(object):
   """A place holder object replacing PValues in argument lists.
@@ -92,3 +97,31 @@ def insert_values_in_args(args, kwargs, values):
       (k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v)
       for k, v in sorted(kwargs.iteritems()))
   return (new_args, new_kwargs)
+
+
+def run_using_threadpool(fn_to_execute, inputs, pool_size):
+  """Runs the given function on given inputs using a thread pool.
+
+  Args:
+    fn_to_execute: Function to execute
+    inputs: Inputs on which given function will be executed in parallel.
+    pool_size: Size of thread pool.
+  Returns:
+    Results retrieved after executing the given function on given inputs.
+  """
+
+  # ThreadPool crashes in old versions of Python (< 2.7.5) if created
+  # from a child thread. (http://bugs.python.org/issue10015)
+  if not hasattr(threading.current_thread(), '_children'):
+    threading.current_thread()._children = weakref.WeakKeyDictionary()
+  pool = ThreadPool(min(pool_size, len(inputs)))
+  try:
+    # We record and reset logging level here since 'apitools' library Beam
+    # depends on updates the logging level when used with a threadpool -
+    # https://github.com/google/apitools/issues/141
+    # TODO: Remove this once above issue in 'apitools' is fixed.
+    old_level = logging.getLogger().level
+    return pool.map(fn_to_execute, inputs)
+  finally:
+    pool.terminate()
+    logging.getLogger().setLevel(old_level)

http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 1bfde25..582d673 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -26,11 +26,9 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
 """
 
 import random
-import threading
-import weakref
-from multiprocessing.pool import ThreadPool
 
 from apache_beam.internal import pickler
+from apache_beam.internal import util
 from apache_beam.io import concat_source
 from apache_beam.io import fileio
 from apache_beam.io import iobase
@@ -158,16 +156,9 @@ class FileBasedSource(iobase.BoundedSource):
       return [fileio.ChannelFactory.size_in_bytes(file_names[0])]
     else:
       if pattern is None:
-        # ThreadPool crashes in old versions of Python (< 2.7.5) if created
-        # from a child thread. (http://bugs.python.org/issue10015)
-        if not hasattr(threading.current_thread(), '_children'):
-          threading.current_thread()._children = weakref.WeakKeyDictionary()
-        pool = ThreadPool(
-            min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names)))
-        try:
-          return pool.map(fileio.ChannelFactory.size_in_bytes, file_names)
-        finally:
-          pool.terminate()
+        return util.run_using_threadpool(
+            fileio.ChannelFactory.size_in_bytes, file_names,
+            MAX_NUM_THREADS_FOR_SIZE_ESTIMATION)
       else:
         file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern,
                                                                  file_names)

http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index f67dca9..97cf387 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -22,16 +22,14 @@ import bz2
 import cStringIO
 import glob
 import logging
-from multiprocessing.pool import ThreadPool
 import os
 import re
 import shutil
-import threading
 import time
 import zlib
-import weakref
 
 from apache_beam import coders
+from apache_beam.internal import util
 from apache_beam.io import gcsio
 from apache_beam.io import iobase
 from apache_beam.transforms.display import DisplayDataItem
@@ -663,11 +661,8 @@ class FileSink(iobase.Sink):
           logging.debug('Rename successful: %s -> %s', src, dest)
       return exceptions
 
-    # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a
-    # child thread. (http://bugs.python.org/issue10015)
-    if not hasattr(threading.current_thread(), '_children'):
-      threading.current_thread()._children = weakref.WeakKeyDictionary()
-    exception_batches = ThreadPool(num_threads).map(_rename_batch, batches)
+    exception_batches = util.run_using_threadpool(
+        _rename_batch, batches, num_threads)
 
     all_exceptions = []
     for exceptions in exception_batches:


[2/2] beam git commit: This closes #1866

Posted by da...@apache.org.
This closes #1866


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/475707f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/475707f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/475707f0

Branch: refs/heads/python-sdk
Commit: 475707f0ffd7bc82ca78fa3f3c9e78f661478b99
Parents: f29527f 51afc1c
Author: Davor Bonaci <da...@google.com>
Authored: Mon Jan 30 12:43:48 2017 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jan 30 12:43:48 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/util.py      | 33 ++++++++++++++++++++++
 sdks/python/apache_beam/io/filebasedsource.py | 17 +++--------
 sdks/python/apache_beam/io/fileio.py          | 11 ++------
 3 files changed, 40 insertions(+), 21 deletions(-)
----------------------------------------------------------------------