You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/12/01 01:36:06 UTC

[beam] branch master updated: Various worker cleanups. (#4195)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f7517d  Various worker cleanups. (#4195)
8f7517d is described below

commit 8f7517d462b1c7a1c2a3a6e3e563898fd3f669b4
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Thu Nov 30 18:36:02 2017 -0700

    Various worker cleanups. (#4195)
    
    Various worker cleanups.
---
 sdks/python/apache_beam/runners/common.py          | 79 +++-------------------
 .../apache_beam/runners/direct/bundle_factory.py   |  6 +-
 .../runners/direct/transform_evaluator.py          |  5 +-
 .../runners/portability/fn_api_runner.py           | 60 +++++++---------
 .../apache_beam/runners/worker/bundle_processor.py | 42 +-----------
 .../apache_beam/runners/worker/operations.py       | 15 ++--
 6 files changed, 50 insertions(+), 157 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 64abe41..08ddf65 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -319,15 +319,9 @@ class DoFnRunner(Receiver):
                kwargs,
                side_inputs,
                windowing,
-               context=None,
                tagged_receivers=None,
-               logger=None,
                step_name=None,
-               # Preferred alternative to logger
-               # TODO(robertwb): Remove once all runners are updated.
                logging_context=None,
-               # Preferred alternative to context
-               # TODO(robertwb): Remove once all runners are updated.
                state=None,
                scoped_metrics_container=None):
     """Initializes a DoFnRunner.
@@ -338,45 +332,31 @@ class DoFnRunner(Receiver):
       kwargs: keyword side input arguments (static and placeholder), if any
       side_inputs: list of sideinput.SideInputMaps for deferred side inputs
       windowing: windowing properties of the output PCollection(s)
-      context: a DoFnContext to use (deprecated)
       tagged_receivers: a dict of tag name to Receiver objects
-      logger: a logging module (deprecated)
       step_name: the name of this step
       logging_context: a LoggingContext object
       state: handle for accessing DoFn state
       scoped_metrics_container: Context switcher for metrics container
     """
-    self.scoped_metrics_container = (scoped_metrics_container
-                                     or ScopedMetricsContainer())
-    self.step_name = step_name
-
     # Need to support multiple iterations.
     side_inputs = list(side_inputs)
 
-    if logging_context:
-      self.logging_context = logging_context
-    else:
-      self.logging_context = get_logging_context(logger, step_name=step_name)
-
-    # TODO(sourabh): Deprecate the use of context
-    if state:
-      assert context is None
-      context = DoFnContext(step_name, state=state)
-    else:
-      assert context is not None
-      context = context
-
-    self.context = context
+    self.scoped_metrics_container = (
+        scoped_metrics_container or ScopedMetricsContainer())
+    self.step_name = step_name
+    self.logging_context = logging_context or LoggingContext()
+    self.context = DoFnContext(step_name, state=state)
 
     do_fn_signature = DoFnSignature(fn)
 
     # Optimize for the common case.
-    main_receivers = as_receiver(tagged_receivers[None])
+    main_receivers = tagged_receivers[None]
     output_processor = _OutputProcessor(
         windowing.windowfn, main_receivers, tagged_receivers)
 
     self.do_fn_invoker = DoFnInvoker.create_invoker(
-        output_processor, do_fn_signature, context, side_inputs, args, kwargs)
+        output_processor, do_fn_signature, self.context,
+        side_inputs, args, kwargs)
 
   def receive(self, windowed_value):
     self.process(windowed_value)
@@ -479,7 +459,7 @@ class _OutputProcessor(object):
       if tag is None:
         self.main_receivers.receive(windowed_value)
       else:
-        self.tagged_receivers[tag].output(windowed_value)
+        self.tagged_receivers[tag].receive(windowed_value)
 
   def start_bundle_outputs(self, results):
     """Validate that start_bundle does not output any elements"""
@@ -514,7 +494,7 @@ class _OutputProcessor(object):
       if tag is None:
         self.main_receivers.receive(windowed_value)
       else:
-        self.tagged_receivers[tag].output(windowed_value)
+        self.tagged_receivers[tag].receive(windowed_value)
 
 
 class _NoContext(WindowFn.AssignContext):
@@ -586,42 +566,3 @@ class DoFnContext(object):
       raise AttributeError('windows not accessible in this context')
     else:
       return self.windowed_value.windows
-
-
-# TODO(robertwb): Remove all these adapters once service is updated out.
-class _LoggingContextAdapter(LoggingContext):
-
-  def __init__(self, underlying):
-    self.underlying = underlying
-
-  def enter(self):
-    self.underlying.enter()
-
-  def exit(self):
-    self.underlying.exit()
-
-
-def get_logging_context(maybe_logger, **kwargs):
-  if maybe_logger:
-    maybe_context = maybe_logger.PerThreadLoggingContext(**kwargs)
-    if isinstance(maybe_context, LoggingContext):
-      return maybe_context
-    return _LoggingContextAdapter(maybe_context)
-  return LoggingContext()
-
-
-class _ReceiverAdapter(Receiver):
-
-  def __init__(self, underlying):
-    self.underlying = underlying
-
-  def receive(self, windowed_value):
-    self.underlying.output(windowed_value)
-
-
-def as_receiver(maybe_receiver):
-  """For internal use only; no backwards-compatibility guarantees."""
-
-  if isinstance(maybe_receiver, Receiver):
-    return maybe_receiver
-  return _ReceiverAdapter(maybe_receiver)
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index 0182b4c..942d282 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -20,6 +20,7 @@
 from __future__ import absolute_import
 
 from apache_beam import pvalue
+from apache_beam.runners import common
 from apache_beam.utils.windowed_value import WindowedValue
 
 
@@ -47,7 +48,7 @@ class BundleFactory(object):
 
 
 # a bundle represents a unit of work that will be processed by a transform.
-class _Bundle(object):
+class _Bundle(common.Receiver):
   """Part of a PCollection with output elements.
 
   Part of a PCollection. Elements are output to a bundle, which will cause them
@@ -185,6 +186,9 @@ class _Bundle(object):
   def output(self, element):
     self.add(element)
 
+  def receive(self, element):
+    self.add(element)
+
   def commit(self, synchronized_processing_time):
     """Commits this bundle.
 
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 2f3ac4f..4c3e7f1 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -28,6 +28,7 @@ from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam.internal import pickler
 from apache_beam.options.pipeline_options import TypeOptions
+from apache_beam.runners import common
 from apache_beam.runners.common import DoFnRunner
 from apache_beam.runners.common import DoFnState
 from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite  # pylint: disable=protected-access
@@ -490,14 +491,14 @@ class _TaggedReceivers(dict):
     def output(self, element):
       pass
 
-  class _InMemoryReceiver(object):
+  class _InMemoryReceiver(common.Receiver):
     """Buffers undeclared outputs to the given dictionary."""
 
     def __init__(self, target, tag):
       self._target = target
       self._tag = tag
 
-    def output(self, element):
+    def receive(self, element):
       self._target[self._tag].append(element)
 
   def __missing__(self, key):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 674d523..e40faa5 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -33,7 +33,6 @@ from apache_beam.coders import registry
 from apache_beam.coders.coder_impl import create_InputStream
 from apache_beam.coders.coder_impl import create_OutputStream
 from apache_beam.internal import pickler
-from apache_beam.io import iobase
 from apache_beam.metrics.execution import MetricsEnvironment
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
@@ -100,29 +99,6 @@ def streaming_rpc_handler(cls, method_name):
   return StreamingRpcHandler()
 
 
-class OldeSourceSplittableDoFn(beam.DoFn):
-  """A DoFn that reads and emits an entire source.
-  """
-
-  # TODO(robertwb): Make this a full SDF with progress splitting, etc.
-  def process(self, source):
-    if isinstance(source, iobase.SourceBundle):
-      for value in source.source.read(source.source.get_range_tracker(
-          source.start_position, source.stop_position)):
-        yield value
-    else:
-      # Dataflow native source
-      with source.reader() as reader:
-        for value in reader:
-          yield value
-
-
-# See DataflowRunner._pardo_fn_data
-OLDE_SOURCE_SPLITTABLE_DOFN_DATA = pickler.dumps(
-    (OldeSourceSplittableDoFn(), (), {}, [],
-     beam.transforms.core.Windowing(GlobalWindows())))
-
-
 class _GroupingBuffer(object):
   """Used to accumulate groupded (shuffled) results."""
   def __init__(self, pre_grouped_coder, post_grouped_coder, windowing):
@@ -134,22 +110,32 @@ class _GroupingBuffer(object):
 
   def append(self, elements_data):
     input_stream = create_InputStream(elements_data)
+    coder_impl = self._pre_grouped_coder.get_impl()
+    key_coder_impl = self._key_coder.get_impl()
+    # TODO(robertwb): We could optimize this even more by using a
+    # window-dropping coder for the data plane.
+    is_trivial_windowing = self._windowing.is_default()
     while input_stream.size() > 0:
-      windowed_key_value = self._pre_grouped_coder.get_impl(
-          ).decode_from_stream(input_stream, True)
-      key = windowed_key_value.value[0]
-      windowed_value = windowed_key_value.with_value(
-          windowed_key_value.value[1])
-      self._table[self._key_coder.encode(key)].append(windowed_value)
+      windowed_key_value = coder_impl.decode_from_stream(input_stream, True)
+      key, value = windowed_key_value.value
+      self._table[key_coder_impl.encode(key)].append(
+          value if is_trivial_windowing
+          else windowed_key_value.with_value(value))
 
   def __iter__(self):
     output_stream = create_OutputStream()
-    trigger_driver = trigger.create_trigger_driver(self._windowing, True)
+    if self._windowing.is_default():
+      globally_window = GlobalWindows.windowed_value(None).with_value
+      windowed_key_values = lambda key, values: [globally_window((key, values))]
+    else:
+      trigger_driver = trigger.create_trigger_driver(self._windowing, True)
+      windowed_key_values = trigger_driver.process_entire_key
+    coder_impl = self._post_grouped_coder.get_impl()
+    key_coder_impl = self._key_coder.get_impl()
     for encoded_key, windowed_values in self._table.items():
-      key = self._key_coder.decode(encoded_key)
-      for wkvs in trigger_driver.process_entire_key(key, windowed_values):
-        self._post_grouped_coder.get_impl().encode_to_stream(
-            wkvs, output_stream, True)
+      key = key_coder_impl.decode(encoded_key)
+      for wkvs in windowed_key_values(key, windowed_values):
+        coder_impl.encode_to_stream(wkvs, output_stream, True)
     return iter([output_stream.get()])
 
 
@@ -867,9 +853,9 @@ class FnApiRunner(runner.PipelineRunner):
               self.data_plane_handler.inverse()))
 
     def push(self, request):
-      logging.info('CONTROL REQUEST %s', request)
+      logging.debug('CONTROL REQUEST %s', request)
       response = self.worker.do_instruction(request)
-      logging.info('CONTROL RESPONSE %s', response)
+      logging.debug('CONTROL RESPONSE %s', response)
       self._responses.append(response)
 
     def pull(self):
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 689eab7..0c46b81 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -36,7 +36,6 @@ from apache_beam.io import iobase
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners import pipeline_context
-from apache_beam.runners.dataflow.native_io import iobase as native_iobase
 from apache_beam.runners.worker import operation_specs
 from apache_beam.runners.worker import operations
 from apache_beam.transforms import sideinputs
@@ -71,6 +70,7 @@ class RunnerIOOperation(operations.Operation):
     super(RunnerIOOperation, self).__init__(
         operation_name, None, counter_factory, state_sampler)
     self.windowed_coder = windowed_coder
+    self.windowed_coder_impl = windowed_coder.get_impl()
     self.step_name = step_name
     # target represents the consumer for the bytes in the data plane for a
     # DataInputOperation or a producer of these bytes for a DataOutputOperation.
@@ -89,7 +89,7 @@ class DataOutputOperation(RunnerIOOperation):
     self.output_stream = output_stream
 
   def process(self, windowed_value):
-    self.windowed_coder.get_impl().encode_to_stream(
+    self.windowed_coder_impl.encode_to_stream(
         windowed_value, self.output_stream, True)
 
   def finish(self):
@@ -118,47 +118,11 @@ class DataInputOperation(RunnerIOOperation):
   def process_encoded(self, encoded_windowed_values):
     input_stream = coder_impl.create_InputStream(encoded_windowed_values)
     while input_stream.size() > 0:
-      decoded_value = self.windowed_coder.get_impl().decode_from_stream(
+      decoded_value = self.windowed_coder_impl.decode_from_stream(
           input_stream, True)
       self.output(decoded_value)
 
 
-# TODO(robertwb): Revise side input API to not be in terms of native sources.
-# This will enable lookups, but there's an open question as to how to handle
-# custom sources without forcing intermediate materialization.  This seems very
-# related to the desire to inject key and window preserving [Splittable]DoFns
-# into the view computation.
-class SideInputSource(native_iobase.NativeSource,
-                      native_iobase.NativeSourceReader):
-  """A 'source' for reading side inputs via state API calls.
-  """
-
-  def __init__(self, state_handler, state_key, coder):
-    self._state_handler = state_handler
-    self._state_key = state_key
-    self._coder = coder
-
-  def reader(self):
-    return self
-
-  @property
-  def returns_windowed_values(self):
-    return True
-
-  def __enter__(self):
-    return self
-
-  def __exit__(self, *exn_info):
-    pass
-
-  def __iter__(self):
-    # TODO(robertwb): Support pagination.
-    input_stream = coder_impl.create_InputStream(
-        self._state_handler.Get(self._state_key).data)
-    while input_stream.size() > 0:
-      yield self._coder.get_impl().decode_from_stream(input_stream, True)
-
-
 class StateBackedSideInputMap(object):
   def __init__(self, state_handler, transform_id, tag, side_input_data):
     self._state_handler = state_handler
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 6b5f024..c245655 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -78,9 +78,6 @@ class ConsumerSet(Receiver):
     self.output_index = output_index
     self.coder = coder
 
-  def output(self, windowed_value):  # For old SDKs.
-    self.receive(windowed_value)
-
   def receive(self, windowed_value):
     self.update_counters_start(windowed_value)
     for consumer in self.consumers:
@@ -326,9 +323,6 @@ class DoOperation(Operation):
       state = common.DoFnState(self.counter_factory)
       state.step_name = self.step_name
 
-      # TODO(silviuc): What is the proper label here? PCollection being
-      # processed?
-      context = common.DoFnContext('label', state=state)
       # Tag to output index map used to dispatch the side output values emitted
       # by the DoFn function to the appropriate receivers. The main output is
       # tagged with None and is associated with its corresponding index.
@@ -352,9 +346,12 @@ class DoOperation(Operation):
           self.side_input_maps = []
 
       self.dofn_runner = common.DoFnRunner(
-          fn, args, kwargs, self.side_input_maps,
-          window_fn, context, self.tagged_receivers,
-          logger, self.step_name,
+          fn, args, kwargs, self.side_input_maps, window_fn,
+          tagged_receivers=self.tagged_receivers,
+          step_name=self.step_name,
+          logging_context=logger.PerThreadLoggingContext(
+              step_name=self.step_name),
+          state=state,
           scoped_metrics_container=self.scoped_metrics_container)
       self.dofn_receiver = (self.dofn_runner
                             if isinstance(self.dofn_runner, Receiver)

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].