You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/14 01:30:16 UTC
[beam] branch master updated: [BEAM-562] Add DoFn.setup and
DoFn.teardown to Python SDK (#7994)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4629e82 [BEAM-562] Add DoFn.setup and DoFn.teardown to Python SDK (#7994)
4629e82 is described below
commit 4629e82512ef1606c78cf28a2d66402c3533e23f
Author: Yifan Mai <yi...@google.com>
AuthorDate: Mon May 13 18:30:03 2019 -0700
[BEAM-562] Add DoFn.setup and DoFn.teardown to Python SDK (#7994)
* [BEAM-562] Add setup and teardown to Python DoFn
---
sdks/python/apache_beam/runners/common.pxd | 2 +
sdks/python/apache_beam/runners/common.py | 25 ++++++
.../runners/portability/fn_api_runner.py | 4 +-
.../apache_beam/runners/worker/bundle_processor.py | 4 +
.../apache_beam/runners/worker/operations.pxd | 1 +
.../apache_beam/runners/worker/operations.py | 14 ++++
.../apache_beam/runners/worker/sdk_worker.py | 12 +++
sdks/python/apache_beam/transforms/core.py | 27 +++++++
.../apache_beam/transforms/dofn_lifecycle_test.py | 89 ++++++++++++++++++++++
9 files changed, 176 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index b28e852..1d87507 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -43,6 +43,8 @@ cdef class DoFnSignature(object):
cdef public MethodWrapper process_method
cdef public MethodWrapper start_bundle_method
cdef public MethodWrapper finish_bundle_method
+ cdef public MethodWrapper setup_lifecycle_method
+ cdef public MethodWrapper teardown_lifecycle_method
cdef public MethodWrapper initial_restriction_method
cdef public MethodWrapper restriction_coder_method
cdef public MethodWrapper create_tracker_method
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 3bbfd90..cae0d4c 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -210,6 +210,8 @@ class DoFnSignature(object):
self.process_method = MethodWrapper(do_fn, 'process')
self.start_bundle_method = MethodWrapper(do_fn, 'start_bundle')
self.finish_bundle_method = MethodWrapper(do_fn, 'finish_bundle')
+ self.setup_lifecycle_method = MethodWrapper(do_fn, 'setup')
+ self.teardown_lifecycle_method = MethodWrapper(do_fn, 'teardown')
restriction_provider = self.get_restriction_provider()
self.initial_restriction_method = (
@@ -356,6 +358,11 @@ class DoFnInvoker(object):
"""
raise NotImplementedError
+ def invoke_setup(self):
+ """Invokes the DoFn.setup() method
+ """
+ self.signature.setup_lifecycle_method.method_value()
+
def invoke_start_bundle(self):
"""Invokes the DoFn.start_bundle() method.
"""
@@ -368,6 +375,11 @@ class DoFnInvoker(object):
self.output_processor.finish_bundle_outputs(
self.signature.finish_bundle_method.method_value())
+ def invoke_teardown(self):
+ """Invokes the DoFn.teardown() method
+ """
+ self.signature.teardown_lifecycle_method.method_value()
+
def invoke_user_timer(self, timer_spec, key, window, timestamp):
self.output_processor.process_outputs(
WindowedValue(None, timestamp, (window,)),
@@ -778,12 +790,25 @@ class DoFnRunner(Receiver):
except BaseException as exn:
self._reraise_augmented(exn)
+ def _invoke_lifecycle_method(self, lifecycle_method):
+ try:
+ self.context.set_element(None)
+ lifecycle_method()
+ except BaseException as exn:
+ self._reraise_augmented(exn)
+
+ def setup(self):
+ self._invoke_lifecycle_method(self.do_fn_invoker.invoke_setup)
+
def start(self):
self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle)
def finish(self):
self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
+ def teardown(self):
+ self._invoke_lifecycle_method(self.do_fn_invoker.invoke_teardown)
+
def finalize(self):
self.bundle_finalizer_param.finalize_bundle()
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index bc330b1..5f6e5f7 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -847,7 +847,7 @@ class EmbeddedWorkerHandler(WorkerHandler):
pass
def stop_worker(self):
- pass
+ self.worker.stop()
def done(self):
pass
@@ -1134,7 +1134,7 @@ class WorkerHandlerManager(object):
try:
controller.close()
except Exception:
- logging.info("Error closing controller %s" % controller, exc_info=True)
+ logging.error("Error closing controller %s" % controller, exc_info=True)
self._cached_handlers = {}
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index b04dabf..c35b646 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -746,6 +746,10 @@ class BundleProcessor(object):
monitoring_info.labels['TAG'] = actual_output_tags[0]
return monitoring_info
+ def shutdown(self):
+ for op in self.ops.values():
+ op.teardown()
+
class ExecutionContext(object):
def __init__(self):
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index 5743dec..68da490 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -68,6 +68,7 @@ cdef class Operation(object):
cpdef start(self)
cpdef process(self, WindowedValue windowed_value)
cpdef finish(self)
+ cpdef teardown(self)
cpdef output(self, WindowedValue windowed_value, int output_index=*)
cpdef execution_time_monitoring_infos(self, transform_id)
cpdef user_monitoring_infos(self, transform_id)
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 1484c6e..6a0ef72 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -198,6 +198,9 @@ class Operation(object):
self.setup_done = False
def setup(self):
+ """Set up operation.
+
+ This must be called before any other methods of the operation."""
with self.scoped_start_state:
self.debug_logging_enabled = logging.getLogger().isEnabledFor(
logging.DEBUG)
@@ -240,6 +243,12 @@ class Operation(object):
"""Finish operation."""
pass
+ def teardown(self):
+ """Tear down operation.
+
+ No other methods of this operation should be called after this."""
+ pass
+
def reset(self):
self.metrics_container.reset()
@@ -569,6 +578,7 @@ class DoOperation(Operation):
state=state,
user_state_context=self.user_state_context,
operation_name=self.name_context.metrics_name())
+ self.dofn_runner.setup()
self.dofn_receiver = (self.dofn_runner
if isinstance(self.dofn_runner, Receiver)
@@ -604,6 +614,10 @@ class DoOperation(Operation):
if self.user_state_context:
self.user_state_context.commit()
+ def teardown(self):
+ with self.scoped_finish_state:
+ self.dofn_runner.teardown()
+
def reset(self):
super(DoOperation, self).reset()
for side_input_map in self.side_input_maps:
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 663f224..0c52274 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -311,6 +311,7 @@ class BundleProcessorCache(object):
return self.active_bundle_processors.get(instruction_id, (None, None))[-1]
def discard(self, instruction_id):
+ self.active_bundle_processors[instruction_id][1].shutdown()
del self.active_bundle_processors[instruction_id]
def release(self, instruction_id):
@@ -318,6 +319,14 @@ class BundleProcessorCache(object):
processor.reset()
self.cached_bundle_processors[descriptor_id].append(processor)
+ def shutdown(self):
+ for instruction_id in self.active_bundle_processors:
+ self.active_bundle_processors[instruction_id][1].shutdown()
+ del self.active_bundle_processors[instruction_id]
+ for cached_bundle_processors in self.cached_bundle_processors.values():
+ while len(cached_bundle_processors) > 0:
+ cached_bundle_processors.pop().shutdown()
+
class SdkWorker(object):
@@ -413,6 +422,9 @@ class SdkWorker(object):
instruction_id=instruction_id,
error='Instruction not running: %s' % instruction_id)
+ def stop(self):
+ self.bundle_processor_cache.shutdown()
+
@contextlib.contextmanager
def maybe_profile(self, instruction_id):
if self.profiler_factory:
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index fb93c00..ca1fb46 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -472,6 +472,15 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
"""
raise NotImplementedError
+ def setup(self):
+ """Called to prepare an instance for processing bundles of elements.
+
+ This is a good place to initialize transient in-memory resources, such as
+ network connections. The resources can then be disposed in
+ ``DoFn.teardown``.
+ """
+ pass
+
def start_bundle(self):
"""Called before a bundle of elements is processed on a worker.
@@ -486,6 +495,24 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
"""
pass
+ def teardown(self):
+ """Called to use to clean up this instance before it is discarded.
+
+ A runner will do its best to call this method on any given instance to
+ prevent leaks of transient resources, however, there may be situations where
+ this is impossible (e.g. process crash, hardware failure, etc.) or
+ unnecessary (e.g. the pipeline is shutting down and the process is about to
+ be killed anyway, so all transient resources will be released automatically
+ by the OS). In these cases, the call may not happen. It will also not be
+ retried, because in such situations the DoFn instance no longer exists, so
+ there's no instance to retry it on.
+
+ Thus, all work that depends on input elements, and all externally important
+ side effects, must be performed in ``DoFn.process`` or
+ ``DoFn.finish_bundle``.
+ """
+ pass
+
def get_function_arguments(self, func):
return get_function_arguments(self, func)
diff --git a/sdks/python/apache_beam/transforms/dofn_lifecycle_test.py b/sdks/python/apache_beam/transforms/dofn_lifecycle_test.py
new file mode 100644
index 0000000..fd3eee6
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/dofn_lifecycle_test.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""UnitTests for DoFn lifecycle and bundle methods"""
+
+from __future__ import absolute_import
+
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class CallSequenceEnforcingDoFn(beam.DoFn):
+ def __init__(self):
+ self._setup_called = False
+ self._start_bundle_calls = 0
+ self._finish_bundle_calls = 0
+ self._teardown_called = False
+
+ def setup(self):
+ assert not self._setup_called, 'setup should not be called twice'
+ assert self._start_bundle_calls == 0, \
+ 'setup should be called before start_bundle'
+ assert self._finish_bundle_calls == 0, \
+ 'setup should be called before finish_bundle'
+ assert not self._teardown_called, 'setup should be called before teardown'
+ self._setup_called = True
+
+ def start_bundle(self):
+ assert self._setup_called, 'setup should have been called'
+ assert self._start_bundle_calls == self._finish_bundle_calls, \
+ 'there should be as many start_bundle calls as finish_bundle calls'
+ assert not self._teardown_called, 'teardown should not have been called'
+ self._start_bundle_calls += 1
+
+ def process(self, element):
+ assert self._setup_called, 'setup should have been called'
+ assert self._start_bundle_calls > 0, 'start_bundle should have been called'
+ assert self._start_bundle_calls == self._finish_bundle_calls + 1, \
+ 'there should be one start_bundle call with no call to finish_bundle'
+ assert not self._teardown_called, 'teardown should not have been called'
+ return [element * element]
+
+ def finish_bundle(self):
+ assert self._setup_called, 'setup should have been called'
+ assert self._start_bundle_calls > 0, 'start_bundle should have been called'
+ assert self._start_bundle_calls == self._finish_bundle_calls + 1, \
+ 'there should be one start_bundle call with no call to finish_bundle'
+ assert not self._teardown_called, 'teardown should not have been called'
+ self._finish_bundle_calls += 1
+
+ def teardown(self):
+ assert self._setup_called, 'setup should have been called'
+ assert self._start_bundle_calls == self._finish_bundle_calls, \
+ 'there should be as many start_bundle calls as finish_bundle calls'
+ assert not self._teardown_called, 'teardown should not be called twice'
+ self._teardown_called = True
+
+
+@attr('ValidatesRunner')
+class DoFnLifecycleTest(unittest.TestCase):
+ def test_dofn_lifecycle(self):
+ p = TestPipeline()
+ _ = (p
+ | 'Start' >> beam.Create([1, 2, 3])
+ | 'Do' >> beam.ParDo(CallSequenceEnforcingDoFn()))
+ result = p.run()
+ result.wait_until_finish()
+ # Assumes that the worker is run in the same process as the test.
+
+
+if __name__ == '__main__':
+ unittest.main()