You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/01/30 20:43:58 UTC
[1/2] beam git commit: Updates places in SDK that creates thread
pools.
Repository: beam
Updated Branches:
refs/heads/python-sdk f29527f68 -> 475707f0f
Updates places in SDK that creates thread pools.
Moves ThreadPool creation to a util function.
Records and resets logging level due to this being reset by apitools when used with a ThreadPool.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/51afc1cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/51afc1cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/51afc1cc
Branch: refs/heads/python-sdk
Commit: 51afc1ccfe78a0657b5f9bc139d1d4e7938ed672
Parents: f29527f
Author: Chamikara Jayalath <ch...@google.com>
Authored: Sat Jan 28 08:54:33 2017 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jan 30 12:43:37 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/internal/util.py | 33 ++++++++++++++++++++++
sdks/python/apache_beam/io/filebasedsource.py | 17 +++--------
sdks/python/apache_beam/io/fileio.py | 11 ++------
3 files changed, 40 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/internal/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/util.py b/sdks/python/apache_beam/internal/util.py
index 2d12d49..5b31e88 100644
--- a/sdks/python/apache_beam/internal/util.py
+++ b/sdks/python/apache_beam/internal/util.py
@@ -17,6 +17,11 @@
"""Utility functions used throughout the package."""
+import logging
+from multiprocessing.pool import ThreadPool
+import threading
+import weakref
+
class ArgumentPlaceholder(object):
"""A place holder object replacing PValues in argument lists.
@@ -92,3 +97,31 @@ def insert_values_in_args(args, kwargs, values):
(k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v)
for k, v in sorted(kwargs.iteritems()))
return (new_args, new_kwargs)
+
+
+def run_using_threadpool(fn_to_execute, inputs, pool_size):
+ """Runs the given function on given inputs using a thread pool.
+
+ Args:
+ fn_to_execute: Function to execute
+ inputs: Inputs on which given function will be executed in parallel.
+ pool_size: Size of thread pool.
+ Returns:
+ Results retrieved after executing the given function on given inputs.
+ """
+
+ # ThreadPool crashes in old versions of Python (< 2.7.5) if created
+ # from a child thread. (http://bugs.python.org/issue10015)
+ if not hasattr(threading.current_thread(), '_children'):
+ threading.current_thread()._children = weakref.WeakKeyDictionary()
+ pool = ThreadPool(min(pool_size, len(inputs)))
+ try:
+ # We record and reset logging level here since 'apitools' library Beam
+ # depends on updates the logging level when used with a threadpool -
+ # https://github.com/google/apitools/issues/141
+ # TODO: Remove this once above issue in 'apitools' is fixed.
+ old_level = logging.getLogger().level
+ return pool.map(fn_to_execute, inputs)
+ finally:
+ pool.terminate()
+ logging.getLogger().setLevel(old_level)
http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 1bfde25..582d673 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -26,11 +26,9 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
"""
import random
-import threading
-import weakref
-from multiprocessing.pool import ThreadPool
from apache_beam.internal import pickler
+from apache_beam.internal import util
from apache_beam.io import concat_source
from apache_beam.io import fileio
from apache_beam.io import iobase
@@ -158,16 +156,9 @@ class FileBasedSource(iobase.BoundedSource):
return [fileio.ChannelFactory.size_in_bytes(file_names[0])]
else:
if pattern is None:
- # ThreadPool crashes in old versions of Python (< 2.7.5) if created
- # from a child thread. (http://bugs.python.org/issue10015)
- if not hasattr(threading.current_thread(), '_children'):
- threading.current_thread()._children = weakref.WeakKeyDictionary()
- pool = ThreadPool(
- min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names)))
- try:
- return pool.map(fileio.ChannelFactory.size_in_bytes, file_names)
- finally:
- pool.terminate()
+ return util.run_using_threadpool(
+ fileio.ChannelFactory.size_in_bytes, file_names,
+ MAX_NUM_THREADS_FOR_SIZE_ESTIMATION)
else:
file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern,
file_names)
http://git-wip-us.apache.org/repos/asf/beam/blob/51afc1cc/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index f67dca9..97cf387 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -22,16 +22,14 @@ import bz2
import cStringIO
import glob
import logging
-from multiprocessing.pool import ThreadPool
import os
import re
import shutil
-import threading
import time
import zlib
-import weakref
from apache_beam import coders
+from apache_beam.internal import util
from apache_beam.io import gcsio
from apache_beam.io import iobase
from apache_beam.transforms.display import DisplayDataItem
@@ -663,11 +661,8 @@ class FileSink(iobase.Sink):
logging.debug('Rename successful: %s -> %s', src, dest)
return exceptions
- # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a
- # child thread. (http://bugs.python.org/issue10015)
- if not hasattr(threading.current_thread(), '_children'):
- threading.current_thread()._children = weakref.WeakKeyDictionary()
- exception_batches = ThreadPool(num_threads).map(_rename_batch, batches)
+ exception_batches = util.run_using_threadpool(
+ _rename_batch, batches, num_threads)
all_exceptions = []
for exceptions in exception_batches:
[2/2] beam git commit: This closes #1866
Posted by da...@apache.org.
This closes #1866
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/475707f0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/475707f0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/475707f0
Branch: refs/heads/python-sdk
Commit: 475707f0ffd7bc82ca78fa3f3c9e78f661478b99
Parents: f29527f 51afc1c
Author: Davor Bonaci <da...@google.com>
Authored: Mon Jan 30 12:43:48 2017 -0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jan 30 12:43:48 2017 -0800
----------------------------------------------------------------------
sdks/python/apache_beam/internal/util.py | 33 ++++++++++++++++++++++
sdks/python/apache_beam/io/filebasedsource.py | 17 +++--------
sdks/python/apache_beam/io/fileio.py | 11 ++------
3 files changed, 40 insertions(+), 21 deletions(-)
----------------------------------------------------------------------