You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:13:06 UTC
[31/50] [abbrv] incubator-beam git commit: Move all files to
apache_beam folder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
new file mode 100644
index 0000000..09f8015
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -0,0 +1,703 @@
+# Copyright 2016 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""PTransform and descendants.
+
+A PTransform is an object describing (not executing) a computation. The actual
+execution semantics for a transform is captured by a runner object. A transform
+object always belongs to a pipeline object.
+
+A PTransform derived class needs to define the apply() method that describes
+how one or more PValues are created by the transform.
+
+The module defines a few standard transforms: FlatMap (parallel do),
+GroupByKey (group by key), etc. Note that the apply() methods for these
+classes contain code that will add nodes to the processing graph associated
+with a pipeline.
+
+As support for the FlatMap transform, the module also defines a DoFn
+class and wrapper class that allows lambda functions to be used as
+FlatMap processing functions.
+"""
+
+from __future__ import absolute_import
+
+import copy
+import inspect
+import operator
+import os
+import sys
+
+from google.cloud.dataflow import coders
+from google.cloud.dataflow import error
+from google.cloud.dataflow import pvalue
+from google.cloud.dataflow import typehints
+from google.cloud.dataflow.internal import pickler
+from google.cloud.dataflow.internal import util
+from google.cloud.dataflow.typehints import getcallargs_forhints
+from google.cloud.dataflow.typehints import TypeCheckError
+from google.cloud.dataflow.typehints import validate_composite_type_param
+from google.cloud.dataflow.typehints import WithTypeHints
+from google.cloud.dataflow.typehints.trivial_inference import instance_to_type
+
+
+class _PValueishTransform(object):
+ """Visitor for PValueish objects.
+
+ A PValueish is a PValue, or list, tuple, dict of PValuesish objects.
+
+ This visits a PValueish, contstructing a (possibly mutated) copy.
+ """
+ def visit(self, node, *args):
+ return getattr(
+ self,
+ 'visit_' + node.__class__.__name__,
+ lambda x, *args: x)(node, *args)
+
+ def visit_list(self, node, *args):
+ return [self.visit(x, *args) for x in node]
+
+ def visit_tuple(self, node, *args):
+ return tuple(self.visit(x, *args) for x in node)
+
+ def visit_dict(self, node, *args):
+ return {key: self.visit(value, *args) for (key, value) in node.items()}
+
+
+class _SetInputPValues(_PValueishTransform):
+ def visit(self, node, replacements):
+ if id(node) in replacements:
+ return replacements[id(node)]
+ else:
+ return super(_SetInputPValues, self).visit(node, replacements)
+
+
+class _MaterializedDoOutputsTuple(pvalue.DoOutputsTuple):
+ def __init__(self, deferred, pvalue_cache):
+ super(_MaterializedDoOutputsTuple, self).__init__(
+ None, None, deferred._tags, deferred._main_tag)
+ self._deferred = deferred
+ self._pvalue_cache = pvalue_cache
+
+ def __getitem__(self, tag):
+ return self._pvalue_cache.get_unwindowed_pvalue(self._deferred[tag])
+
+
+class _MaterializePValues(_PValueishTransform):
+ def __init__(self, pvalue_cache):
+ self._pvalue_cache = pvalue_cache
+
+ def visit(self, node):
+ if isinstance(node, pvalue.PValue):
+ return self._pvalue_cache.get_unwindowed_pvalue(node)
+ elif isinstance(node, pvalue.DoOutputsTuple):
+ return _MaterializedDoOutputsTuple(node, self._pvalue_cache)
+ else:
+ return super(_MaterializePValues, self).visit(node)
+
+
+class GetPValues(_PValueishTransform):
+ def visit(self, node, pvalues=None):
+ if pvalues is None:
+ pvalues = []
+ self.visit(node, pvalues)
+ return pvalues
+ elif isinstance(node, (pvalue.PValue, pvalue.DoOutputsTuple)):
+ pvalues.append(node)
+ else:
+ super(GetPValues, self).visit(node, pvalues)
+
+
+class ZipPValues(_PValueishTransform):
+ """Pairs each PValue in a pvalueish with a value in a parallel out sibling.
+
+ Sibling should have the same nested structure as pvalueish. Leaves in
+ sibling are expanded across nested pvalueish lists, tuples, and dicts.
+ For example
+
+ ZipPValues().visit({'a': pc1, 'b': (pc2, pc3)},
+ {'a': 'A', 'b', 'B'})
+
+ will return
+
+ [('a', pc1, 'A'), ('b', pc2, 'B'), ('b', pc3, 'B')]
+ """
+
+ def visit(self, pvalueish, sibling, pairs=None, context=None):
+ if pairs is None:
+ pairs = []
+ self.visit(pvalueish, sibling, pairs, context)
+ return pairs
+ elif isinstance(pvalueish, (pvalue.PValue, pvalue.DoOutputsTuple)):
+ pairs.append((context, pvalueish, sibling))
+ else:
+ super(ZipPValues, self).visit(pvalueish, sibling, pairs, context)
+
+ def visit_list(self, pvalueish, sibling, pairs, context):
+ if isinstance(sibling, (list, tuple)):
+ for ix, (p, s) in enumerate(zip(
+ pvalueish, list(sibling) + [None] * len(pvalueish))):
+ self.visit(p, s, pairs, 'position %s' % ix)
+ else:
+ for p in pvalueish:
+ self.visit(p, sibling, pairs, context)
+
+ def visit_tuple(self, pvalueish, sibling, pairs, context):
+ self.visit_list(pvalueish, sibling, pairs, context)
+
+ def visit_dict(self, pvalueish, sibling, pairs, context):
+ if isinstance(sibling, dict):
+ for key, p in pvalueish.items():
+ self.visit(p, sibling.get(key), pairs, key)
+ else:
+ for p in pvalueish.values():
+ self.visit(p, sibling, pairs, context)
+
+
+class PTransform(WithTypeHints):
+ """A transform object used to modify one or more PCollections.
+
+ Subclasses must define an apply() method that will be used when the transform
+ is applied to some arguments. Typical usage pattern will be:
+
+ input | CustomTransform(...)
+
+ The apply() method of the CustomTransform object passed in will be called
+ with input as an argument.
+ """
+ # By default, transforms don't have any side inputs.
+ side_inputs = ()
+
+ # Used for nullary transforms.
+ pipeline = None
+
+ # Default is unset.
+ _user_label = None
+
+ def __init__(self, label=None):
+ super(PTransform, self).__init__()
+ self.label = label
+
+ @property
+ def label(self):
+ return self._user_label or self.default_label()
+
+ @label.setter
+ def label(self, value):
+ self._user_label = value
+
+ def default_label(self):
+ return self.__class__.__name__
+
+ @classmethod
+ def parse_label_and_arg(cls, args, kwargs, arg_name):
+ """Parses a tuple of positional arguments into label, arg_name.
+
+ The function is used by functions that take a (label, arg_name) list of
+ parameters and in which first label could be optional even if the arg_name
+ is not passed as a keyword. More specifically the following calling patterns
+ are allowed::
+
+ (value)
+ ('label', value)
+ (arg_name=value)
+ ('label', arg_name=value)
+ (value, label='label')
+ (label='label', arg_name=value)
+
+ Args:
+ args: A tuple of position arguments.
+ kwargs: A dictionary of keyword arguments.
+ arg_name: The name of the second ergument.
+
+ Returns:
+ A (label, value) tuple. The label will be the one passed in or one
+ derived from the class name. The value will the corresponding value for
+ the arg_name argument.
+
+ Raises:
+ ValueError: If the label and value cannot be deduced from args and kwargs
+ and also if the label is not a string.
+ """
+ # TODO(robertwb): Fix to not silently drop extra arguments.
+ kw_label = kwargs.get('label', None)
+ kw_value = kwargs.get(arg_name, None)
+
+ if kw_value is not None:
+ value = kw_value
+ else:
+ value = args[1] if len(args) > 1 else args[0] if args else None
+
+ if kw_label is not None:
+ label = kw_label
+ else:
+ # We need to get a label from positional arguments. If we did not get a
+ # keyword value for the arg_name either then expect that a one element
+ # list will provide the value and the label will be derived from the class
+ # name.
+ num_args = len(args)
+ if kw_value is None:
+ label = args[0] if num_args >= 2 else cls.__name__
+ else:
+ label = args[0] if num_args >= 1 else cls.__name__
+
+ if label is None or value is None or not isinstance(label, basestring):
+ raise ValueError(
+ '%s expects a (label, %s) or (%s) argument list '
+ 'instead of args=%s, kwargs=%s' % (
+ cls.__name__, arg_name, arg_name, args, kwargs))
+ return label, value
+
+ def with_input_types(self, input_type_hint):
+ """Annotates the input type of a PTransform with a type-hint.
+
+ Args:
+ input_type_hint: An instance of an allowed built-in type, a custom class,
+ or an instance of a typehints.TypeConstraint.
+
+ Raises:
+ TypeError: If 'type_hint' is not a valid type-hint. See
+ typehints.validate_composite_type_param for further details.
+
+ Returns:
+ A reference to the instance of this particular PTransform object. This
+ allows chaining type-hinting related methods.
+ """
+ validate_composite_type_param(input_type_hint,
+ 'Type hints for a PTransform')
+ return super(PTransform, self).with_input_types(input_type_hint)
+
+ def with_output_types(self, type_hint):
+ """Annotates the output type of a PTransform with a type-hint.
+
+ Args:
+ type_hint: An instance of an allowed built-in type, a custom class, or a
+ typehints.TypeConstraint.
+
+ Raises:
+ TypeError: If 'type_hint' is not a valid type-hint. See
+ typehints.validate_composite_type_param for further details.
+
+ Returns:
+ A reference to the instance of this particular PTransform object. This
+ allows chaining type-hinting related methods.
+ """
+ validate_composite_type_param(type_hint, 'Type hints for a PTransform')
+ return super(PTransform, self).with_output_types(type_hint)
+
+ def type_check_inputs(self, pvalueish):
+ self.type_check_inputs_or_outputs(pvalueish, 'input')
+
+ def infer_output_type(self, unused_input_type):
+ return self.get_type_hints().simple_output_type(self.label) or typehints.Any
+
+ def type_check_outputs(self, pvalueish):
+ self.type_check_inputs_or_outputs(pvalueish, 'output')
+
+ def type_check_inputs_or_outputs(self, pvalueish, input_or_output):
+ hints = getattr(self.get_type_hints(), input_or_output + '_types')
+ if not hints:
+ return
+ arg_hints, kwarg_hints = hints
+ if arg_hints and kwarg_hints:
+ raise TypeCheckError(
+ 'PTransform cannot have both positional and keyword type hints '
+ 'without overriding %s._type_check_%s()' % (
+ self.__class__, input_or_output))
+ root_hint = (
+ arg_hints[0] if len(arg_hints) == 1 else arg_hints or kwarg_hints)
+ for context, pvalue_, hint in ZipPValues().visit(pvalueish, root_hint):
+ if pvalue_.element_type is None:
+ # TODO(robertwb): It's a bug that we ever get here. (typecheck)
+ continue
+ if hint and not typehints.is_consistent_with(pvalue_.element_type, hint):
+ at_context = ' %s %s' % (input_or_output, context) if context else ''
+ raise TypeCheckError(
+ '%s type hint violation at %s%s: expected %s, got %s' % (
+ input_or_output.title(), self.label, at_context, hint,
+ pvalue_.element_type))
+
+ def clone(self, new_label):
+ """Clones the current transform instance under a new label."""
+ transform = copy.copy(self)
+ transform.label = new_label
+ return transform
+
+ def apply(self, input_or_inputs):
+ raise NotImplementedError
+
+ def __str__(self):
+ return '<%s>' % self._str_internal()
+
+ def __repr__(self):
+ return '<%s at %s>' % (self._str_internal(), hex(id(self)))
+
+ def _str_internal(self):
+ return '%s(PTransform)%s%s%s' % (
+ self.__class__.__name__,
+ ' label=[%s]' % self.label if (hasattr(self, 'label') and
+ self.label) else '',
+ ' inputs=%s' % str(self.inputs) if (hasattr(self, 'inputs') and
+ self.inputs) else '',
+ ' side_inputs=%s' % str(self.side_inputs) if self.side_inputs else '')
+
+ def _check_pcollection(self, pcoll):
+ if not isinstance(pcoll, pvalue.PCollection):
+ raise error.TransformError('Expecting a PCollection argument.')
+ if not pcoll.pipeline:
+ raise error.TransformError('PCollection not part of a pipeline.')
+
+ def get_windowing(self, inputs):
+ """Returns the window function to be associated with transform's output.
+
+ By default most transforms just return the windowing function associated
+ with the input PCollection (or the first input if several).
+ """
+ # TODO(robertwb): Assert all input WindowFns compatible.
+ return inputs[0].windowing
+
+ def __or__(self, right):
+ """Used to compose PTransforms, e.g., ptransform1 | ptransform2."""
+ if isinstance(right, PTransform):
+ return ChainedPTransform(self, right)
+ else:
+ return NotImplemented
+
+ def __ror__(self, left):
+ """Used to apply this PTransform to non-PValues, e.g., a tuple."""
+ pvalueish, pvalues = self._extract_input_pvalues(left)
+ pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)]
+ if pvalues and not pipelines:
+ deferred = False
+ # pylint: disable=g-import-not-at-top
+ from google.cloud.dataflow import pipeline
+ from google.cloud.dataflow.utils.options import PipelineOptions
+ # pylint: enable=g-import-not-at-top
+ p = pipeline.Pipeline(
+ 'DirectPipelineRunner', PipelineOptions(sys.argv))
+ else:
+ if not pipelines:
+ if self.pipeline is not None:
+ p = self.pipeline
+ else:
+ raise ValueError('"%s" requires a pipeline to be specified '
+ 'as there are no deferred inputs.'% self.label)
+ else:
+ p = self.pipeline or pipelines[0]
+ for pp in pipelines:
+ if p != pp:
+ raise ValueError(
+ 'Mixing value from different pipelines not allowed.')
+ deferred = not getattr(p.runner, 'is_eager', False)
+ # pylint: disable=g-import-not-at-top
+ from google.cloud.dataflow.transforms.core import Create
+ # pylint: enable=g-import-not-at-top
+ replacements = {id(v): p | Create('CreatePInput%s' % ix, v)
+ for ix, v in enumerate(pvalues)
+ if not isinstance(v, pvalue.PValue) and v is not None}
+ pvalueish = _SetInputPValues().visit(pvalueish, replacements)
+ self.pipeline = p
+ result = p.apply(self, pvalueish)
+ if deferred:
+ return result
+ else:
+ # Get a reference to the runners internal cache, otherwise runner may
+ # clean it after run.
+ cache = p.runner.cache
+ p.run()
+ return _MaterializePValues(cache).visit(result)
+
+ def _extract_input_pvalues(self, pvalueish):
+ """Extract all the pvalues contained in the input pvalueish.
+
+ Returns pvalueish as well as the flat inputs list as the input may have to
+ be copied as inspection may be destructive.
+
+ By default, recursively extracts tuple components and dict values.
+
+ Generally only needs to be overriden for multi-input PTransforms.
+ """
+ # pylint: disable=g-import-not-at-top
+ from google.cloud.dataflow import pipeline
+ # pylint: enable=g-import-not-at-top
+ if isinstance(pvalueish, pipeline.Pipeline):
+ pvalueish = pvalue.PBegin(pvalueish)
+
+ def _dict_tuple_leaves(pvalueish):
+ if isinstance(pvalueish, tuple):
+ for a in pvalueish:
+ for p in _dict_tuple_leaves(a):
+ yield p
+ elif isinstance(pvalueish, dict):
+ for a in pvalueish.values():
+ for p in _dict_tuple_leaves(a):
+ yield p
+ else:
+ yield pvalueish
+ return pvalueish, tuple(_dict_tuple_leaves(pvalueish))
+
+
+class ChainedPTransform(PTransform):
+
+ def __init__(self, *parts):
+ super(ChainedPTransform, self).__init__(label=self._chain_label(parts))
+ self._parts = parts
+
+ def _chain_label(self, parts):
+ return '|'.join(p.label for p in parts)
+
+ def __or__(self, right):
+ if isinstance(right, PTransform):
+ # Create a flat list rather than a nested tree of composite
+ # transforms for better monitoring, etc.
+ return ChainedPTransform(*(self._parts + (right,)))
+ else:
+ return NotImplemented
+
+ def apply(self, pval):
+ return reduce(operator.or_, self._parts, pval)
+
+
+class PTransformWithSideInputs(PTransform):
+ """A superclass for any PTransform (e.g. FlatMap or Combine)
+ invoking user code.
+
+ PTransforms like FlatMap invoke user-supplied code in some kind of
+ package (e.g. a DoFn) and optionally provide arguments and side inputs
+ to that code. This internal-use-only class contains common functionality
+ for PTransforms that fit this model.
+ """
+
+ def __init__(self, fn_or_label, *args, **kwargs):
+ if fn_or_label is None or isinstance(fn_or_label, basestring):
+ label = fn_or_label
+ fn, args = args[0], args[1:]
+ else:
+ label = None
+ fn = fn_or_label
+ if isinstance(fn, type) and issubclass(fn, typehints.WithTypeHints):
+ # Don't treat Fn class objects as callables.
+ raise ValueError('Use %s() not %s.' % (fn.__name__, fn.__name__))
+ self.fn = self.make_fn(fn)
+ # Now that we figure out the label, initialize the super-class.
+ super(PTransformWithSideInputs, self).__init__(label=label)
+
+ if (any([isinstance(v, pvalue.PCollection) for v in args]) or
+ any([isinstance(v, pvalue.PCollection) for v in kwargs.itervalues()])):
+ raise error.SideInputError(
+ 'PCollection used directly as side input argument. Specify '
+ 'AsIter(pcollection) or AsSingleton(pcollection) to indicate how the '
+ 'PCollection is to be used.')
+ self.args, self.kwargs, self.side_inputs = util.remove_objects_from_args(
+ args, kwargs, pvalue.PCollectionView)
+ self.raw_side_inputs = args, kwargs
+
+ # Prevent name collisions with fns of the form '<function <lambda> at ...>'
+ self._cached_fn = self.fn
+
+ # Ensure fn and side inputs are picklable for remote execution.
+ self.fn = pickler.loads(pickler.dumps(self.fn))
+ self.args = pickler.loads(pickler.dumps(self.args))
+ self.kwargs = pickler.loads(pickler.dumps(self.kwargs))
+
+ # For type hints, because loads(dumps(class)) != class.
+ self.fn = self._cached_fn
+
+ def with_input_types(
+ self, input_type_hint, *side_inputs_arg_hints, **side_input_kwarg_hints):
+ """Annotates the types of main inputs and side inputs for the PTransform.
+
+ Args:
+ input_type_hint: An instance of an allowed built-in type, a custom class,
+ or an instance of a typehints.TypeConstraint.
+ *side_inputs_arg_hints: A variable length argument composed of
+ of an allowed built-in type, a custom class, or a
+ typehints.TypeConstraint.
+ **side_input_kwarg_hints: A dictionary argument composed of
+ of an allowed built-in type, a custom class, or a
+ typehints.TypeConstraint.
+
+ Example of annotating the types of side-inputs:
+ FlatMap().with_input_types(int, int, bool)
+
+ Raises:
+ TypeError: If 'type_hint' is not a valid type-hint. See
+ typehints.validate_composite_type_param for further details.
+
+ Returns:
+ A reference to the instance of this particular PTransform object. This
+ allows chaining type-hinting related methods.
+ """
+ super(PTransformWithSideInputs, self).with_input_types(input_type_hint)
+
+ for si in side_inputs_arg_hints:
+ validate_composite_type_param(si, 'Type hints for a PTransform')
+ for si in side_input_kwarg_hints.values():
+ validate_composite_type_param(si, 'Type hints for a PTransform')
+
+ self.side_inputs_types = side_inputs_arg_hints
+ return WithTypeHints.with_input_types(
+ self, input_type_hint, *side_inputs_arg_hints, **side_input_kwarg_hints)
+
+ def type_check_inputs(self, pvalueish):
+ type_hints = self.get_type_hints().input_types
+ if type_hints:
+ args, kwargs = self.raw_side_inputs
+ def element_type(side_input):
+ if isinstance(side_input, pvalue.PCollectionView):
+ return side_input.element_type
+ else:
+ return instance_to_type(side_input)
+ arg_types = [pvalueish.element_type] + [element_type(v) for v in args]
+ kwargs_types = {k: element_type(v) for (k, v) in kwargs.items()}
+ argspec_fn = self.process_argspec_fn()
+ bindings = getcallargs_forhints(argspec_fn, *arg_types, **kwargs_types)
+ hints = getcallargs_forhints(argspec_fn, *type_hints[0], **type_hints[1])
+ for arg, hint in hints.items():
+ if arg.startswith('%unknown%'):
+ continue
+ if hint is None:
+ continue
+ if not typehints.is_consistent_with(
+ bindings.get(arg, typehints.Any), hint):
+ raise typehints.TypeCheckError(
+ 'Type hint violation for \'%s\': requires %s but got %s for %s'
+ % (self.label, hint, bindings[arg], arg))
+
+ def process_argspec_fn(self):
+ """Returns an argspec of the function actually consuming the data.
+ """
+ raise NotImplementedError
+
+ def make_fn(self, fn):
+ # TODO(silviuc): Add comment describing that this is meant to be overriden
+ # by methods detecting callables and wrapping them in DoFns.
+ return fn
+
+ def default_label(self):
+ return '%s(%s)' % (self.__class__.__name__, self.fn.default_label())
+
+
+class CallablePTransform(PTransform):
+ """A class wrapper for a function-based transform."""
+
+ def __init__(self, fn):
+ # pylint: disable=super-init-not-called
+ # This is a helper class for a function decorator. Only when the class
+ # is called (and __call__ invoked) we will have all the information
+ # needed to initialize the super class.
+ self.fn = fn
+
+ def __call__(self, *args, **kwargs):
+ if args and args[0] is None:
+ label, self._args = None, args[1:]
+ elif args and isinstance(args[0], str):
+ label, self._args = args[0], args[1:]
+ else:
+ label, self._args = None, args
+ self._kwargs = kwargs
+ # We know the label now, so initialize the super-class.
+ super(CallablePTransform, self).__init__(label=label)
+ return self
+
+ def apply(self, pcoll):
+ # Since the PTransform will be implemented entirely as a function
+ # (once called), we need to pass through any type-hinting information that
+ # may have been annotated via the .with_input_types() and
+ # .with_output_types() methods.
+ kwargs = dict(self._kwargs)
+ args = tuple(self._args)
+ try:
+ if 'type_hints' in inspect.getargspec(self.fn).args:
+ args = (self.get_type_hints(),) + args
+ except TypeError:
+ # Might not be a function.
+ pass
+ return self.fn(self.label, pcoll, *args, **kwargs)
+
+ def default_label(self):
+ if self._args:
+ return '%s(%s)' % (
+ label_from_callable(self.fn), label_from_callable(self._args[0]))
+ else:
+ return label_from_callable(self.fn)
+
+
+def ptransform_fn(fn):
+ """A decorator for a function-based PTransform.
+
+ Args:
+ fn: A function implementing a custom PTransform.
+
+ Returns:
+ A CallablePTransform instance wrapping the function-based PTransform.
+
+ This wrapper provides an alternative, simpler way to define a PTransform.
+ The standard method is to subclass from PTransform and override the apply()
+ method. An equivalent effect can be obtained by defining a function that
+ takes a label, an input PCollection and additional optional arguments and
+ returns a resulting PCollection. For example::
+
+ @ptransform_fn
+ def CustomMapper(label, pcoll, mapfn):
+ return pcoll | ParDo(mapfn)
+
+ The equivalent approach using PTransform subclassing::
+
+ class CustomMapper(PTransform):
+ def apply(self, pcoll, mapfn):
+ return pcoll | ParDo(mapfn)
+
+ With either method the custom PTransform can be used in pipelines as if
+ it were one of the "native" PTransforms::
+
+ result_pcoll = input_pcoll | CustomMapper('label', somefn)
+
+ Note that for both solutions the underlying implementation of the pipe
+ operator (i.e., `|`) will inject the pcoll argument in its proper place
+ (first argument if no label was specified and second argument otherwise).
+ """
+ return CallablePTransform(fn)
+
+
+def format_full_label(applied_transform, pending_transform):
+ """Returns a fully formatted cumulative PTransform label.
+
+ Args:
+ applied_transform: An instance of an AppliedPTransform that has been fully
+ applied prior to 'pending_transform'.
+ pending_transform: An instance of PTransform that has yet to be applied to
+ the Pipeline.
+
+ Returns:
+ A fully formatted PTransform label. Example: '/foo/bar/baz'.
+ """
+ label = '/'.join([applied_transform.full_label, pending_transform.label])
+ # Remove leading backslash because the monitoring UI expects names that do not
+ # start with such a character.
+ return label if not label.startswith('/') else label[1:]
+
+
+def label_from_callable(fn):
+ if hasattr(fn, 'default_label'):
+ return fn.default_label()
+ elif hasattr(fn, '__name__'):
+ if fn.__name__ == '<lambda>':
+ return '<lambda at %s:%s>' % (
+ os.path.basename(fn.func_code.co_filename),
+ fn.func_code.co_firstlineno)
+ else:
+ return fn.__name__
+ else:
+ return str(fn)