You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/12/01 01:36:06 UTC
[beam] branch master updated: Various worker cleanups. (#4195)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8f7517d Various worker cleanups. (#4195)
8f7517d is described below
commit 8f7517d462b1c7a1c2a3a6e3e563898fd3f669b4
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Thu Nov 30 18:36:02 2017 -0700
Various worker cleanups. (#4195)
Various worker cleanups.
---
sdks/python/apache_beam/runners/common.py | 79 +++-------------------
.../apache_beam/runners/direct/bundle_factory.py | 6 +-
.../runners/direct/transform_evaluator.py | 5 +-
.../runners/portability/fn_api_runner.py | 60 +++++++---------
.../apache_beam/runners/worker/bundle_processor.py | 42 +-----------
.../apache_beam/runners/worker/operations.py | 15 ++--
6 files changed, 50 insertions(+), 157 deletions(-)
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 64abe41..08ddf65 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -319,15 +319,9 @@ class DoFnRunner(Receiver):
kwargs,
side_inputs,
windowing,
- context=None,
tagged_receivers=None,
- logger=None,
step_name=None,
- # Preferred alternative to logger
- # TODO(robertwb): Remove once all runners are updated.
logging_context=None,
- # Preferred alternative to context
- # TODO(robertwb): Remove once all runners are updated.
state=None,
scoped_metrics_container=None):
"""Initializes a DoFnRunner.
@@ -338,45 +332,31 @@ class DoFnRunner(Receiver):
kwargs: keyword side input arguments (static and placeholder), if any
side_inputs: list of sideinput.SideInputMaps for deferred side inputs
windowing: windowing properties of the output PCollection(s)
- context: a DoFnContext to use (deprecated)
tagged_receivers: a dict of tag name to Receiver objects
- logger: a logging module (deprecated)
step_name: the name of this step
logging_context: a LoggingContext object
state: handle for accessing DoFn state
scoped_metrics_container: Context switcher for metrics container
"""
- self.scoped_metrics_container = (scoped_metrics_container
- or ScopedMetricsContainer())
- self.step_name = step_name
-
# Need to support multiple iterations.
side_inputs = list(side_inputs)
- if logging_context:
- self.logging_context = logging_context
- else:
- self.logging_context = get_logging_context(logger, step_name=step_name)
-
- # TODO(sourabh): Deprecate the use of context
- if state:
- assert context is None
- context = DoFnContext(step_name, state=state)
- else:
- assert context is not None
- context = context
-
- self.context = context
+ self.scoped_metrics_container = (
+ scoped_metrics_container or ScopedMetricsContainer())
+ self.step_name = step_name
+ self.logging_context = logging_context or LoggingContext()
+ self.context = DoFnContext(step_name, state=state)
do_fn_signature = DoFnSignature(fn)
# Optimize for the common case.
- main_receivers = as_receiver(tagged_receivers[None])
+ main_receivers = tagged_receivers[None]
output_processor = _OutputProcessor(
windowing.windowfn, main_receivers, tagged_receivers)
self.do_fn_invoker = DoFnInvoker.create_invoker(
- output_processor, do_fn_signature, context, side_inputs, args, kwargs)
+ output_processor, do_fn_signature, self.context,
+ side_inputs, args, kwargs)
def receive(self, windowed_value):
self.process(windowed_value)
@@ -479,7 +459,7 @@ class _OutputProcessor(object):
if tag is None:
self.main_receivers.receive(windowed_value)
else:
- self.tagged_receivers[tag].output(windowed_value)
+ self.tagged_receivers[tag].receive(windowed_value)
def start_bundle_outputs(self, results):
"""Validate that start_bundle does not output any elements"""
@@ -514,7 +494,7 @@ class _OutputProcessor(object):
if tag is None:
self.main_receivers.receive(windowed_value)
else:
- self.tagged_receivers[tag].output(windowed_value)
+ self.tagged_receivers[tag].receive(windowed_value)
class _NoContext(WindowFn.AssignContext):
@@ -586,42 +566,3 @@ class DoFnContext(object):
raise AttributeError('windows not accessible in this context')
else:
return self.windowed_value.windows
-
-
-# TODO(robertwb): Remove all these adapters once service is updated out.
-class _LoggingContextAdapter(LoggingContext):
-
- def __init__(self, underlying):
- self.underlying = underlying
-
- def enter(self):
- self.underlying.enter()
-
- def exit(self):
- self.underlying.exit()
-
-
-def get_logging_context(maybe_logger, **kwargs):
- if maybe_logger:
- maybe_context = maybe_logger.PerThreadLoggingContext(**kwargs)
- if isinstance(maybe_context, LoggingContext):
- return maybe_context
- return _LoggingContextAdapter(maybe_context)
- return LoggingContext()
-
-
-class _ReceiverAdapter(Receiver):
-
- def __init__(self, underlying):
- self.underlying = underlying
-
- def receive(self, windowed_value):
- self.underlying.output(windowed_value)
-
-
-def as_receiver(maybe_receiver):
- """For internal use only; no backwards-compatibility guarantees."""
-
- if isinstance(maybe_receiver, Receiver):
- return maybe_receiver
- return _ReceiverAdapter(maybe_receiver)
diff --git a/sdks/python/apache_beam/runners/direct/bundle_factory.py b/sdks/python/apache_beam/runners/direct/bundle_factory.py
index 0182b4c..942d282 100644
--- a/sdks/python/apache_beam/runners/direct/bundle_factory.py
+++ b/sdks/python/apache_beam/runners/direct/bundle_factory.py
@@ -20,6 +20,7 @@
from __future__ import absolute_import
from apache_beam import pvalue
+from apache_beam.runners import common
from apache_beam.utils.windowed_value import WindowedValue
@@ -47,7 +48,7 @@ class BundleFactory(object):
# a bundle represents a unit of work that will be processed by a transform.
-class _Bundle(object):
+class _Bundle(common.Receiver):
"""Part of a PCollection with output elements.
Part of a PCollection. Elements are output to a bundle, which will cause them
@@ -185,6 +186,9 @@ class _Bundle(object):
def output(self, element):
self.add(element)
+ def receive(self, element):
+ self.add(element)
+
def commit(self, synchronized_processing_time):
"""Commits this bundle.
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 2f3ac4f..4c3e7f1 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -28,6 +28,7 @@ from apache_beam import coders
from apache_beam import pvalue
from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import TypeOptions
+from apache_beam.runners import common
from apache_beam.runners.common import DoFnRunner
from apache_beam.runners.common import DoFnState
from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # pylint: disable=protected-access
@@ -490,14 +491,14 @@ class _TaggedReceivers(dict):
def output(self, element):
pass
- class _InMemoryReceiver(object):
+ class _InMemoryReceiver(common.Receiver):
"""Buffers undeclared outputs to the given dictionary."""
def __init__(self, target, tag):
self._target = target
self._tag = tag
- def output(self, element):
+ def receive(self, element):
self._target[self._tag].append(element)
def __missing__(self, key):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 674d523..e40faa5 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -33,7 +33,6 @@ from apache_beam.coders import registry
from apache_beam.coders.coder_impl import create_InputStream
from apache_beam.coders.coder_impl import create_OutputStream
from apache_beam.internal import pickler
-from apache_beam.io import iobase
from apache_beam.metrics.execution import MetricsEnvironment
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_fn_api_pb2_grpc
@@ -100,29 +99,6 @@ def streaming_rpc_handler(cls, method_name):
return StreamingRpcHandler()
-class OldeSourceSplittableDoFn(beam.DoFn):
- """A DoFn that reads and emits an entire source.
- """
-
- # TODO(robertwb): Make this a full SDF with progress splitting, etc.
- def process(self, source):
- if isinstance(source, iobase.SourceBundle):
- for value in source.source.read(source.source.get_range_tracker(
- source.start_position, source.stop_position)):
- yield value
- else:
- # Dataflow native source
- with source.reader() as reader:
- for value in reader:
- yield value
-
-
-# See DataflowRunner._pardo_fn_data
-OLDE_SOURCE_SPLITTABLE_DOFN_DATA = pickler.dumps(
- (OldeSourceSplittableDoFn(), (), {}, [],
- beam.transforms.core.Windowing(GlobalWindows())))
-
-
class _GroupingBuffer(object):
"""Used to accumulate groupded (shuffled) results."""
def __init__(self, pre_grouped_coder, post_grouped_coder, windowing):
@@ -134,22 +110,32 @@ class _GroupingBuffer(object):
def append(self, elements_data):
input_stream = create_InputStream(elements_data)
+ coder_impl = self._pre_grouped_coder.get_impl()
+ key_coder_impl = self._key_coder.get_impl()
+ # TODO(robertwb): We could optimize this even more by using a
+ # window-dropping coder for the data plane.
+ is_trivial_windowing = self._windowing.is_default()
while input_stream.size() > 0:
- windowed_key_value = self._pre_grouped_coder.get_impl(
- ).decode_from_stream(input_stream, True)
- key = windowed_key_value.value[0]
- windowed_value = windowed_key_value.with_value(
- windowed_key_value.value[1])
- self._table[self._key_coder.encode(key)].append(windowed_value)
+ windowed_key_value = coder_impl.decode_from_stream(input_stream, True)
+ key, value = windowed_key_value.value
+ self._table[key_coder_impl.encode(key)].append(
+ value if is_trivial_windowing
+ else windowed_key_value.with_value(value))
def __iter__(self):
output_stream = create_OutputStream()
- trigger_driver = trigger.create_trigger_driver(self._windowing, True)
+ if self._windowing.is_default():
+ globally_window = GlobalWindows.windowed_value(None).with_value
+ windowed_key_values = lambda key, values: [globally_window((key, values))]
+ else:
+ trigger_driver = trigger.create_trigger_driver(self._windowing, True)
+ windowed_key_values = trigger_driver.process_entire_key
+ coder_impl = self._post_grouped_coder.get_impl()
+ key_coder_impl = self._key_coder.get_impl()
for encoded_key, windowed_values in self._table.items():
- key = self._key_coder.decode(encoded_key)
- for wkvs in trigger_driver.process_entire_key(key, windowed_values):
- self._post_grouped_coder.get_impl().encode_to_stream(
- wkvs, output_stream, True)
+ key = key_coder_impl.decode(encoded_key)
+ for wkvs in windowed_key_values(key, windowed_values):
+ coder_impl.encode_to_stream(wkvs, output_stream, True)
return iter([output_stream.get()])
@@ -867,9 +853,9 @@ class FnApiRunner(runner.PipelineRunner):
self.data_plane_handler.inverse()))
def push(self, request):
- logging.info('CONTROL REQUEST %s', request)
+ logging.debug('CONTROL REQUEST %s', request)
response = self.worker.do_instruction(request)
- logging.info('CONTROL RESPONSE %s', response)
+ logging.debug('CONTROL RESPONSE %s', response)
self._responses.append(response)
def pull(self):
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 689eab7..0c46b81 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -36,7 +36,6 @@ from apache_beam.io import iobase
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners import pipeline_context
-from apache_beam.runners.dataflow.native_io import iobase as native_iobase
from apache_beam.runners.worker import operation_specs
from apache_beam.runners.worker import operations
from apache_beam.transforms import sideinputs
@@ -71,6 +70,7 @@ class RunnerIOOperation(operations.Operation):
super(RunnerIOOperation, self).__init__(
operation_name, None, counter_factory, state_sampler)
self.windowed_coder = windowed_coder
+ self.windowed_coder_impl = windowed_coder.get_impl()
self.step_name = step_name
# target represents the consumer for the bytes in the data plane for a
# DataInputOperation or a producer of these bytes for a DataOutputOperation.
@@ -89,7 +89,7 @@ class DataOutputOperation(RunnerIOOperation):
self.output_stream = output_stream
def process(self, windowed_value):
- self.windowed_coder.get_impl().encode_to_stream(
+ self.windowed_coder_impl.encode_to_stream(
windowed_value, self.output_stream, True)
def finish(self):
@@ -118,47 +118,11 @@ class DataInputOperation(RunnerIOOperation):
def process_encoded(self, encoded_windowed_values):
input_stream = coder_impl.create_InputStream(encoded_windowed_values)
while input_stream.size() > 0:
- decoded_value = self.windowed_coder.get_impl().decode_from_stream(
+ decoded_value = self.windowed_coder_impl.decode_from_stream(
input_stream, True)
self.output(decoded_value)
-# TODO(robertwb): Revise side input API to not be in terms of native sources.
-# This will enable lookups, but there's an open question as to how to handle
-# custom sources without forcing intermediate materialization. This seems very
-# related to the desire to inject key and window preserving [Splittable]DoFns
-# into the view computation.
-class SideInputSource(native_iobase.NativeSource,
- native_iobase.NativeSourceReader):
- """A 'source' for reading side inputs via state API calls.
- """
-
- def __init__(self, state_handler, state_key, coder):
- self._state_handler = state_handler
- self._state_key = state_key
- self._coder = coder
-
- def reader(self):
- return self
-
- @property
- def returns_windowed_values(self):
- return True
-
- def __enter__(self):
- return self
-
- def __exit__(self, *exn_info):
- pass
-
- def __iter__(self):
- # TODO(robertwb): Support pagination.
- input_stream = coder_impl.create_InputStream(
- self._state_handler.Get(self._state_key).data)
- while input_stream.size() > 0:
- yield self._coder.get_impl().decode_from_stream(input_stream, True)
-
-
class StateBackedSideInputMap(object):
def __init__(self, state_handler, transform_id, tag, side_input_data):
self._state_handler = state_handler
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 6b5f024..c245655 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -78,9 +78,6 @@ class ConsumerSet(Receiver):
self.output_index = output_index
self.coder = coder
- def output(self, windowed_value): # For old SDKs.
- self.receive(windowed_value)
-
def receive(self, windowed_value):
self.update_counters_start(windowed_value)
for consumer in self.consumers:
@@ -326,9 +323,6 @@ class DoOperation(Operation):
state = common.DoFnState(self.counter_factory)
state.step_name = self.step_name
- # TODO(silviuc): What is the proper label here? PCollection being
- # processed?
- context = common.DoFnContext('label', state=state)
# Tag to output index map used to dispatch the side output values emitted
# by the DoFn function to the appropriate receivers. The main output is
# tagged with None and is associated with its corresponding index.
@@ -352,9 +346,12 @@ class DoOperation(Operation):
self.side_input_maps = []
self.dofn_runner = common.DoFnRunner(
- fn, args, kwargs, self.side_input_maps,
- window_fn, context, self.tagged_receivers,
- logger, self.step_name,
+ fn, args, kwargs, self.side_input_maps, window_fn,
+ tagged_receivers=self.tagged_receivers,
+ step_name=self.step_name,
+ logging_context=logger.PerThreadLoggingContext(
+ step_name=self.step_name),
+ state=state,
scoped_metrics_container=self.scoped_metrics_container)
self.dofn_receiver = (self.dofn_runner
if isinstance(self.dofn_runner, Receiver)
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].