You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/05/14 01:30:16 UTC

[beam] branch master updated: [BEAM-562] Add DoFn.setup and DoFn.teardown to Python SDK (#7994)

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4629e82  [BEAM-562] Add DoFn.setup and DoFn.teardown to Python SDK (#7994)
4629e82 is described below

commit 4629e82512ef1606c78cf28a2d66402c3533e23f
Author: Yifan Mai <yi...@google.com>
AuthorDate: Mon May 13 18:30:03 2019 -0700

    [BEAM-562] Add DoFn.setup and DoFn.teardown to Python SDK (#7994)
    
    * [BEAM-562] Add setup and teardown to Python DoFn
---
 sdks/python/apache_beam/runners/common.pxd         |  2 +
 sdks/python/apache_beam/runners/common.py          | 25 ++++++
 .../runners/portability/fn_api_runner.py           |  4 +-
 .../apache_beam/runners/worker/bundle_processor.py |  4 +
 .../apache_beam/runners/worker/operations.pxd      |  1 +
 .../apache_beam/runners/worker/operations.py       | 14 ++++
 .../apache_beam/runners/worker/sdk_worker.py       | 12 +++
 sdks/python/apache_beam/transforms/core.py         | 27 +++++++
 .../apache_beam/transforms/dofn_lifecycle_test.py  | 89 ++++++++++++++++++++++
 9 files changed, 176 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd
index b28e852..1d87507 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -43,6 +43,8 @@ cdef class DoFnSignature(object):
   cdef public MethodWrapper process_method
   cdef public MethodWrapper start_bundle_method
   cdef public MethodWrapper finish_bundle_method
+  cdef public MethodWrapper setup_lifecycle_method
+  cdef public MethodWrapper teardown_lifecycle_method
   cdef public MethodWrapper initial_restriction_method
   cdef public MethodWrapper restriction_coder_method
   cdef public MethodWrapper create_tracker_method
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 3bbfd90..cae0d4c 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -210,6 +210,8 @@ class DoFnSignature(object):
     self.process_method = MethodWrapper(do_fn, 'process')
     self.start_bundle_method = MethodWrapper(do_fn, 'start_bundle')
     self.finish_bundle_method = MethodWrapper(do_fn, 'finish_bundle')
+    self.setup_lifecycle_method = MethodWrapper(do_fn, 'setup')
+    self.teardown_lifecycle_method = MethodWrapper(do_fn, 'teardown')
 
     restriction_provider = self.get_restriction_provider()
     self.initial_restriction_method = (
@@ -356,6 +358,11 @@ class DoFnInvoker(object):
     """
     raise NotImplementedError
 
+  def invoke_setup(self):
+    """Invokes the DoFn.setup() method
+    """
+    self.signature.setup_lifecycle_method.method_value()
+
   def invoke_start_bundle(self):
     """Invokes the DoFn.start_bundle() method.
     """
@@ -368,6 +375,11 @@ class DoFnInvoker(object):
     self.output_processor.finish_bundle_outputs(
         self.signature.finish_bundle_method.method_value())
 
+  def invoke_teardown(self):
+    """Invokes the DoFn.teardown() method
+    """
+    self.signature.teardown_lifecycle_method.method_value()
+
   def invoke_user_timer(self, timer_spec, key, window, timestamp):
     self.output_processor.process_outputs(
         WindowedValue(None, timestamp, (window,)),
@@ -778,12 +790,25 @@ class DoFnRunner(Receiver):
     except BaseException as exn:
       self._reraise_augmented(exn)
 
+  def _invoke_lifecycle_method(self, lifecycle_method):
+    try:
+      self.context.set_element(None)
+      lifecycle_method()
+    except BaseException as exn:
+      self._reraise_augmented(exn)
+
+  def setup(self):
+    self._invoke_lifecycle_method(self.do_fn_invoker.invoke_setup)
+
   def start(self):
     self._invoke_bundle_method(self.do_fn_invoker.invoke_start_bundle)
 
   def finish(self):
     self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
 
+  def teardown(self):
+    self._invoke_lifecycle_method(self.do_fn_invoker.invoke_teardown)
+
   def finalize(self):
     self.bundle_finalizer_param.finalize_bundle()
 
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index bc330b1..5f6e5f7 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -847,7 +847,7 @@ class EmbeddedWorkerHandler(WorkerHandler):
     pass
 
   def stop_worker(self):
-    pass
+    self.worker.stop()
 
   def done(self):
     pass
@@ -1134,7 +1134,7 @@ class WorkerHandlerManager(object):
       try:
         controller.close()
       except Exception:
-        logging.info("Error closing controller %s" % controller, exc_info=True)
+        logging.error("Error closing controller %s" % controller, exc_info=True)
     self._cached_handlers = {}
 
 
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index b04dabf..c35b646 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -746,6 +746,10 @@ class BundleProcessor(object):
         monitoring_info.labels['TAG'] = actual_output_tags[0]
     return monitoring_info
 
+  def shutdown(self):
+    for op in self.ops.values():
+      op.teardown()
+
 
 class ExecutionContext(object):
   def __init__(self):
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index 5743dec..68da490 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -68,6 +68,7 @@ cdef class Operation(object):
   cpdef start(self)
   cpdef process(self, WindowedValue windowed_value)
   cpdef finish(self)
+  cpdef teardown(self)
   cpdef output(self, WindowedValue windowed_value, int output_index=*)
   cpdef execution_time_monitoring_infos(self, transform_id)
   cpdef user_monitoring_infos(self, transform_id)
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 1484c6e..6a0ef72 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -198,6 +198,9 @@ class Operation(object):
     self.setup_done = False
 
   def setup(self):
+    """Set up operation.
+
+    This must be called before any other methods of the operation."""
     with self.scoped_start_state:
       self.debug_logging_enabled = logging.getLogger().isEnabledFor(
           logging.DEBUG)
@@ -240,6 +243,12 @@ class Operation(object):
     """Finish operation."""
     pass
 
+  def teardown(self):
+    """Tear down operation.
+
+    No other methods of this operation should be called after this."""
+    pass
+
   def reset(self):
     self.metrics_container.reset()
 
@@ -569,6 +578,7 @@ class DoOperation(Operation):
           state=state,
           user_state_context=self.user_state_context,
           operation_name=self.name_context.metrics_name())
+      self.dofn_runner.setup()
 
       self.dofn_receiver = (self.dofn_runner
                             if isinstance(self.dofn_runner, Receiver)
@@ -604,6 +614,10 @@ class DoOperation(Operation):
       if self.user_state_context:
         self.user_state_context.commit()
 
+  def teardown(self):
+    with self.scoped_finish_state:
+      self.dofn_runner.teardown()
+
   def reset(self):
     super(DoOperation, self).reset()
     for side_input_map in self.side_input_maps:
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 663f224..0c52274 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -311,6 +311,7 @@ class BundleProcessorCache(object):
     return self.active_bundle_processors.get(instruction_id, (None, None))[-1]
 
   def discard(self, instruction_id):
+    self.active_bundle_processors[instruction_id][1].shutdown()
     del self.active_bundle_processors[instruction_id]
 
   def release(self, instruction_id):
@@ -318,6 +319,14 @@ class BundleProcessorCache(object):
     processor.reset()
     self.cached_bundle_processors[descriptor_id].append(processor)
 
+  def shutdown(self):
+    for instruction_id in self.active_bundle_processors:
+      self.active_bundle_processors[instruction_id][1].shutdown()
+      del self.active_bundle_processors[instruction_id]
+    for cached_bundle_processors in self.cached_bundle_processors.values():
+      while len(cached_bundle_processors) > 0:
+        cached_bundle_processors.pop().shutdown()
+
 
 class SdkWorker(object):
 
@@ -413,6 +422,9 @@ class SdkWorker(object):
           instruction_id=instruction_id,
           error='Instruction not running: %s' % instruction_id)
 
+  def stop(self):
+    self.bundle_processor_cache.shutdown()
+
   @contextlib.contextmanager
   def maybe_profile(self, instruction_id):
     if self.profiler_factory:
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index fb93c00..ca1fb46 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -472,6 +472,15 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
     """
     raise NotImplementedError
 
+  def setup(self):
+    """Called to prepare an instance for processing bundles of elements.
+
+    This is a good place to initialize transient in-memory resources, such as
+    network connections. The resources can then be disposed in
+    ``DoFn.teardown``.
+    """
+    pass
+
   def start_bundle(self):
     """Called before a bundle of elements is processed on a worker.
 
@@ -486,6 +495,24 @@ class DoFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
     """
     pass
 
+  def teardown(self):
+    """Called to use to clean up this instance before it is discarded.
+
+    A runner will do its best to call this method on any given instance to
+    prevent leaks of transient resources, however, there may be situations where
+    this is impossible (e.g. process crash, hardware failure, etc.) or
+    unnecessary (e.g. the pipeline is shutting down and the process is about to
+    be killed anyway, so all transient resources will be released automatically
+    by the OS). In these cases, the call may not happen. It will also not be
+    retried, because in such situations the DoFn instance no longer exists, so
+    there's no instance to retry it on.
+
+    Thus, all work that depends on input elements, and all externally important
+    side effects, must be performed in ``DoFn.process`` or
+    ``DoFn.finish_bundle``.
+    """
+    pass
+
   def get_function_arguments(self, func):
     return get_function_arguments(self, func)
 
diff --git a/sdks/python/apache_beam/transforms/dofn_lifecycle_test.py b/sdks/python/apache_beam/transforms/dofn_lifecycle_test.py
new file mode 100644
index 0000000..fd3eee6
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/dofn_lifecycle_test.py
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""UnitTests for DoFn lifecycle and bundle methods"""
+
+from __future__ import absolute_import
+
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.testing.test_pipeline import TestPipeline
+
+
+class CallSequenceEnforcingDoFn(beam.DoFn):
+  def __init__(self):
+    self._setup_called = False
+    self._start_bundle_calls = 0
+    self._finish_bundle_calls = 0
+    self._teardown_called = False
+
+  def setup(self):
+    assert not self._setup_called, 'setup should not be called twice'
+    assert self._start_bundle_calls == 0, \
+      'setup should be called before start_bundle'
+    assert self._finish_bundle_calls == 0, \
+      'setup should be called before finish_bundle'
+    assert not self._teardown_called, 'setup should be called before teardown'
+    self._setup_called = True
+
+  def start_bundle(self):
+    assert self._setup_called, 'setup should have been called'
+    assert self._start_bundle_calls == self._finish_bundle_calls, \
+      'there should be as many start_bundle calls as finish_bundle calls'
+    assert not self._teardown_called, 'teardown should not have been called'
+    self._start_bundle_calls += 1
+
+  def process(self, element):
+    assert self._setup_called, 'setup should have been called'
+    assert self._start_bundle_calls > 0, 'start_bundle should have been called'
+    assert self._start_bundle_calls == self._finish_bundle_calls + 1, \
+      'there should be one start_bundle call with no call to finish_bundle'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return [element * element]
+
+  def finish_bundle(self):
+    assert self._setup_called, 'setup should have been called'
+    assert self._start_bundle_calls > 0, 'start_bundle should have been called'
+    assert self._start_bundle_calls == self._finish_bundle_calls + 1, \
+      'there should be one start_bundle call with no call to finish_bundle'
+    assert not self._teardown_called, 'teardown should not have been called'
+    self._finish_bundle_calls += 1
+
+  def teardown(self):
+    assert self._setup_called, 'setup should have been called'
+    assert self._start_bundle_calls == self._finish_bundle_calls, \
+      'there should be as many start_bundle calls as finish_bundle calls'
+    assert not self._teardown_called, 'teardown should not be called twice'
+    self._teardown_called = True
+
+
+@attr('ValidatesRunner')
+class DoFnLifecycleTest(unittest.TestCase):
+  def test_dofn_lifecycle(self):
+    p = TestPipeline()
+    _ = (p
+         | 'Start' >> beam.Create([1, 2, 3])
+         | 'Do' >> beam.ParDo(CallSequenceEnforcingDoFn()))
+    result = p.run()
+    result.wait_until_finish()
+    # Assumes that the worker is run in the same process as the test.
+
+
+if __name__ == '__main__':
+  unittest.main()