You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/04/13 16:29:30 UTC
[beam] branch master updated: [BEAM-4028] Adding NameContext to
Python SDK. (#5043)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 70a6d18 [BEAM-4028] Adding NameContext to Python SDK. (#5043)
70a6d18 is described below
commit 70a6d18345e3794bd757206f5c3c2a42fa016ed4
Author: Pablo <pa...@users.noreply.github.com>
AuthorDate: Fri Apr 13 09:29:24 2018 -0700
[BEAM-4028] Adding NameContext to Python SDK. (#5043)
Adding NameContext to Python SDK.
---
sdks/python/apache_beam/runners/common.py | 70 +++++++++++++
.../apache_beam/runners/worker/operations.pxd | 1 +
.../apache_beam/runners/worker/operations.py | 113 ++++++++++++---------
3 files changed, 137 insertions(+), 47 deletions(-)
diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index e3c768b..44f9083 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -39,6 +39,76 @@ from apache_beam.transforms.window import WindowFn
from apache_beam.utils.windowed_value import WindowedValue
+class NameContext(object):
+ """Holds the name information for a step."""
+
+ def __init__(self, step_name):
+ """Creates a new step NameContext.
+
+ Args:
+ step_name: The name of the step.
+ """
+ self.step_name = step_name
+
+ def __eq__(self, other):
+ return self.step_name == other.step_name
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __repr__(self):
+ return 'NameContext(%s)' % self.__dict__()
+
+ def __hash__(self):
+ return hash(self.step_name)
+
+ def metrics_name(self):
+ """Returns the step name used for metrics reporting."""
+ return self.step_name
+
+ def logging_name(self):
+ """Returns the step name used for logging."""
+ return self.step_name
+
+
+# TODO(BEAM-4028): Move DataflowNameContext to Dataflow internal code.
+class DataflowNameContext(NameContext):
+ """Holds the name information for a step in Dataflow.
+
+ This includes a step_name (e.g. s2), a user_name (e.g. Foo/Bar/ParDo(Fab)),
+ and a system_name (e.g. s2-shuffle-read34)."""
+
+ def __init__(self, step_name, user_name, system_name):
+ """Creates a new step NameContext.
+
+ Args:
+ step_name: The internal name of the step (e.g. s2).
+ user_name: The full user-given name of the step (e.g. Foo/Bar/ParDo(Far)).
+ system_name: The step name in the optimized graph (e.g. s2-1).
+ """
+ super(DataflowNameContext, self).__init__(step_name)
+ self.user_name = user_name
+ self.system_name = system_name
+
+ def __eq__(self, other):
+ return (self.step_name == other.step_name and
+ self.user_name == other.user_name and
+ self.system_name == other.system_name)
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __hash__(self):
+ return hash((self.step_name, self.user_name, self.system_name))
+
+ def __repr__(self):
+ return 'DataflowNameContext(%s)' % self.__dict__()
+
+ def logging_name(self):
+ """Stackdriver logging relies on user-given step names (e.g. Foo/Bar)."""
+ return self.user_name
+
+
class LoggingContext(object):
"""For internal use only; no backwards-compatibility guarantees."""
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index cb05c90..0aee337 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -39,6 +39,7 @@ cdef class ConsumerSet(Receiver):
cdef class Operation(object):
+ cdef readonly name_context
cdef readonly operation_name
cdef readonly spec
cdef object consumers
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 0fa32e3..977d4bb 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -104,34 +104,45 @@ class Operation(object):
one or more receiver operations that will take that as input.
"""
- def __init__(self, operation_name, spec, counter_factory, state_sampler):
+ def __init__(self, name_context, spec, counter_factory, state_sampler):
"""Initializes a worker operation instance.
Args:
- operation_name: The system name assigned by the runner for this
- operation.
+ name_context: A NameContext instance or string(deprecated), with the
+ name information for this operation.
spec: A operation_specs.Worker* instance.
counter_factory: The CounterFactory to use for our counters.
state_sampler: The StateSampler for the current operation.
"""
- self.operation_name = operation_name
+ if isinstance(name_context, common.NameContext):
+ # TODO(BEAM-4028): Clean this up once it's completely migrated.
+ # We use the specific operation name that is used for metrics and state
+ # sampling.
+ self.name_context = name_context
+ else:
+ logging.info('Creating namecontext within operation')
+ self.name_context = common.NameContext(name_context)
+
+ # TODO(BEAM-4028): Remove following two lines. Rely on name context.
+ self.operation_name = self.name_context.step_name
+ self.step_name = self.name_context.logging_name()
+
self.spec = spec
self.counter_factory = counter_factory
self.consumers = collections.defaultdict(list)
# These are overwritten in the legacy harness.
- self.step_name = operation_name
- self.metrics_container = MetricsContainer(self.step_name)
+ self.metrics_container = MetricsContainer(self.name_context.metrics_name())
self.scoped_metrics_container = ScopedMetricsContainer(
self.metrics_container)
self.state_sampler = state_sampler
self.scoped_start_state = self.state_sampler.scoped_state(
- self.operation_name, 'start')
+ self.name_context.metrics_name(), 'start')
self.scoped_process_state = self.state_sampler.scoped_state(
- self.operation_name, 'process')
+ self.name_context.metrics_name(), 'process')
self.scoped_finish_state = self.state_sampler.scoped_state(
- self.operation_name, 'finish')
+ self.name_context.metrics_name(), 'finish')
# TODO(ccy): the '-abort' state can be added when the abort is supported in
# Operations.
self.receivers = []
@@ -142,9 +153,12 @@ class Operation(object):
logging.DEBUG)
# Everything except WorkerSideInputSource, which is not a
# top-level operation, should have output_coders
+ #TODO(pabloem): Define better what step name is used here.
if getattr(self.spec, 'output_coders', None):
- self.receivers = [ConsumerSet(self.counter_factory, self.step_name,
- i, self.consumers[i], coder)
+ self.receivers = [ConsumerSet(self.counter_factory,
+ self.name_context.logging_name(),
+ i,
+ self.consumers[i], coder)
for i, coder in enumerate(self.spec.output_coders)]
def finish(self):
@@ -205,7 +219,7 @@ class Operation(object):
"""
printable_name = self.__class__.__name__
if hasattr(self, 'step_name'):
- printable_name += ' %s' % self.step_name
+ printable_name += ' %s' % self.name_context.logging_name()
if is_recursive:
# If we have a step name, stop here, no more detail needed.
return '<%s>' % printable_name
@@ -317,7 +331,7 @@ class DoOperation(Operation):
si_counter = opcounters.SideInputReadCounter(
self.counter_factory,
self.state_sampler,
- declaring_step=self.operation_name,
+ declaring_step=self.name_context.step_name,
# Inputs are 1-indexed, so we add 1 to i in the side input id
input_index=i + 1)
else:
@@ -345,13 +359,13 @@ class DoOperation(Operation):
pickler.loads(self.spec.serialized_fn))
state = common.DoFnState(self.counter_factory)
- state.step_name = self.step_name
+ state.step_name = self.name_context.logging_name()
# Tag to output index map used to dispatch the side output values emitted
# by the DoFn function to the appropriate receivers. The main output is
# tagged with None and is associated with its corresponding index.
self.tagged_receivers = _TaggedReceivers(
- self.counter_factory, self.step_name)
+ self.counter_factory, self.name_context.logging_name())
output_tag_prefix = PropertyNames.OUT + '_'
for index, tag in enumerate(self.spec.output_tags):
@@ -372,9 +386,9 @@ class DoOperation(Operation):
self.dofn_runner = common.DoFnRunner(
fn, args, kwargs, self.side_input_maps, window_fn,
tagged_receivers=self.tagged_receivers,
- step_name=self.step_name,
+ step_name=self.name_context.logging_name(),
logging_context=logger.PerThreadLoggingContext(
- step_name=self.step_name),
+ step_name=self.name_context.logging_name()),
state=state,
scoped_metrics_container=self.scoped_metrics_container)
self.dofn_receiver = (self.dofn_runner
@@ -413,9 +427,9 @@ class DoFnRunnerReceiver(Receiver):
class CombineOperation(Operation):
"""A Combine operation executing a CombineFn for each input element."""
- def __init__(self, operation_name, spec, counter_factory, state_sampler):
+ def __init__(self, name_context, spec, counter_factory, state_sampler):
super(CombineOperation, self).__init__(
- operation_name, spec, counter_factory, state_sampler)
+ name_context, spec, counter_factory, state_sampler)
# Combiners do not accept deferred side-inputs (the ignored fourth argument)
# and therefore the code to handle the extra args/kwargs is simpler than for
# the DoFn's of ParDo.
@@ -450,9 +464,9 @@ class PGBKOperation(Operation):
values in this bundle, memory permitting.
"""
- def __init__(self, operation_name, spec, counter_factory, state_sampler):
+ def __init__(self, name_context, spec, counter_factory, state_sampler):
super(PGBKOperation, self).__init__(
- operation_name, spec, counter_factory, state_sampler)
+ name_context, spec, counter_factory, state_sampler)
assert not self.spec.combine_fn
self.table = collections.defaultdict(list)
self.size = 0
@@ -486,9 +500,9 @@ class PGBKOperation(Operation):
class PGBKCVOperation(Operation):
- def __init__(self, operation_name, spec, counter_factory, state_sampler):
+ def __init__(self, name_context, spec, counter_factory, state_sampler):
super(PGBKCVOperation, self).__init__(
- operation_name, spec, counter_factory, state_sampler)
+ name_context, spec, counter_factory, state_sampler)
# Combiners do not accept deferred side-inputs (the ignored fourth
# argument) and therefore the code to handle the extra args/kwargs is
# simpler than for the DoFn's of ParDo.
@@ -569,69 +583,72 @@ class FlattenOperation(Operation):
self.output(o)
-def create_operation(operation_name, spec, counter_factory, step_name,
+def create_operation(name_context, spec, counter_factory, step_name,
state_sampler, test_shuffle_source=None,
test_shuffle_sink=None, is_streaming=False):
"""Create Operation object for given operation specification."""
+ if not isinstance(name_context, common.NameContext):
+ # TODO(BEAM-4028): Remove ad-hoc NameContext once all has been migrated.
+ name_context = common.DataflowNameContext(step_name=name_context,
+ user_name=step_name,
+ system_name=None)
+
if isinstance(spec, operation_specs.WorkerRead):
if isinstance(spec.source, iobase.SourceBundle):
op = ReadOperation(
- operation_name, spec, counter_factory, state_sampler)
+ name_context, spec, counter_factory, state_sampler)
else:
from dataflow_worker.native_operations import NativeReadOperation
op = NativeReadOperation(
- operation_name, spec, counter_factory, state_sampler)
+ name_context, spec, counter_factory, state_sampler)
elif isinstance(spec, operation_specs.WorkerWrite):
from dataflow_worker.native_operations import NativeWriteOperation
op = NativeWriteOperation(
- operation_name, spec, counter_factory, state_sampler)
+ name_context, spec, counter_factory, state_sampler)
elif isinstance(spec, operation_specs.WorkerCombineFn):
op = CombineOperation(
- operation_name, spec, counter_factory, state_sampler)
+ name_context, spec, counter_factory, state_sampler)
elif isinstance(spec, operation_specs.WorkerPartialGroupByKey):
- op = create_pgbk_op(operation_name, spec, counter_factory, state_sampler)
+ op = create_pgbk_op(name_context, spec, counter_factory, state_sampler)
elif isinstance(spec, operation_specs.WorkerDoFn):
- op = DoOperation(operation_name, spec, counter_factory, state_sampler)
+ op = DoOperation(name_context, spec, counter_factory, state_sampler)
elif isinstance(spec, operation_specs.WorkerGroupingShuffleRead):
from dataflow_worker.shuffle_operations import GroupedShuffleReadOperation
op = GroupedShuffleReadOperation(
- operation_name, spec, counter_factory, state_sampler,
+ name_context, spec, counter_factory, state_sampler,
shuffle_source=test_shuffle_source)
elif isinstance(spec, operation_specs.WorkerUngroupedShuffleRead):
from dataflow_worker.shuffle_operations import UngroupedShuffleReadOperation
op = UngroupedShuffleReadOperation(
- operation_name, spec, counter_factory, state_sampler,
+ name_context, spec, counter_factory, state_sampler,
shuffle_source=test_shuffle_source)
elif isinstance(spec, operation_specs.WorkerInMemoryWrite):
op = InMemoryWriteOperation(
- operation_name, spec, counter_factory, state_sampler)
+ name_context, spec, counter_factory, state_sampler)
elif isinstance(spec, operation_specs.WorkerShuffleWrite):
from dataflow_worker.shuffle_operations import ShuffleWriteOperation
op = ShuffleWriteOperation(
- operation_name, spec, counter_factory, state_sampler,
+ name_context, spec, counter_factory, state_sampler,
shuffle_sink=test_shuffle_sink)
elif isinstance(spec, operation_specs.WorkerFlatten):
op = FlattenOperation(
- operation_name, spec, counter_factory, state_sampler)
+ name_context, spec, counter_factory, state_sampler)
elif isinstance(spec, operation_specs.WorkerMergeWindows):
from dataflow_worker.shuffle_operations import BatchGroupAlsoByWindowsOperation
from dataflow_worker.shuffle_operations import StreamingGroupAlsoByWindowsOperation
if is_streaming:
op = StreamingGroupAlsoByWindowsOperation(
- operation_name, spec, counter_factory, state_sampler)
+ name_context, spec, counter_factory, state_sampler)
else:
op = BatchGroupAlsoByWindowsOperation(
- operation_name, spec, counter_factory, state_sampler)
+ name_context, spec, counter_factory, state_sampler)
elif isinstance(spec, operation_specs.WorkerReifyTimestampAndWindows):
from dataflow_worker.shuffle_operations import ReifyTimestampAndWindowsOperation
op = ReifyTimestampAndWindowsOperation(
- operation_name, spec, counter_factory, state_sampler)
+ name_context, spec, counter_factory, state_sampler)
else:
raise TypeError('Expected an instance of operation_specs.Worker* class '
'instead of %s' % (spec,))
- op.step_name = step_name
- op.metrics_container = MetricsContainer(step_name)
- op.scoped_metrics_container = ScopedMetricsContainer(op.metrics_container)
return op
@@ -648,7 +665,9 @@ class SimpleMapTaskExecutor(object):
"""Initializes SimpleMapTaskExecutor.
Args:
- map_task: The map task we are to run.
+ map_task: The map task we are to run. The maptask contains a list of
+ operations, and aligned lists for step_names, original_names,
+ system_names of pipeline steps.
counter_factory: The CounterFactory instance for the work item.
state_sampler: The StateSampler tracking the execution step.
test_shuffle_source: Used during tests for dependency injection into
@@ -682,14 +701,14 @@ class SimpleMapTaskExecutor(object):
# The order of the elements is important because the inputs use
# list indexes as references.
- step_names = (
- self._map_task.step_names or [None] * len(self._map_task.operations))
for ix, spec in enumerate(self._map_task.operations):
# This is used for logging and assigning names to counters.
- operation_name = self._map_task.system_names[ix]
- step_name = step_names[ix]
+ name_context = common.DataflowNameContext(
+ step_name=self._map_task.original_names[ix],
+ user_name=self._map_task.step_names[ix],
+ system_name=self._map_task.system_names[ix])
op = create_operation(
- operation_name, spec, self._counter_factory, step_name,
+ name_context, spec, self._counter_factory, None,
self._state_sampler,
test_shuffle_source=self._test_shuffle_source,
test_shuffle_sink=self._test_shuffle_sink)
--
To stop receiving notification emails like this one, please contact
altay@apache.org.