You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/12 00:02:49 UTC
[1/3] beam git commit: Makes Python API reference generation more
strict
Repository: beam
Updated Branches:
refs/heads/master d035a345f -> 84a23793c
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/transforms/display.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py
index 152f16e..88a1fee 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -16,21 +16,24 @@
#
"""
-DisplayData, its classes, interfaces and methods.
+:class:`DisplayData`, its classes, interfaces and methods.
The classes in this module allow users and transform developers to define
-static display data to be displayed when a pipeline runs. PTransforms, DoFns
-and other pipeline components are subclasses of the HasDisplayData mixin. To
-add static display data to a component, you can override the display_data
-method of the HasDisplayData class.
+static display data to be displayed when a pipeline runs.
+:class:`~apache_beam.transforms.ptransform.PTransform` s,
+:class:`~apache_beam.transforms.core.DoFn` s
+and other pipeline components are subclasses of the :class:`HasDisplayData`
+mixin. To add static display data to a component, you can override the
+:meth:`HasDisplayData.display_data()` method.
Available classes:
-- HasDisplayData - Components that inherit from this class can have static
- display data shown in the UI.
-- DisplayDataItem - This class represents static display data elements.
-- DisplayData - Internal class that is used to create display data and
- communicate it to the API.
+* :class:`HasDisplayData` - Components that inherit from this class can have
+ static display data shown in the UI.
+* :class:`DisplayDataItem` - This class represents static display data
+ elements.
+* :class:`DisplayData` - Internal class that is used to create display data
+ and communicate it to the API.
"""
from __future__ import absolute_import
@@ -57,17 +60,19 @@ class HasDisplayData(object):
static display data.
Returns:
- A dictionary containing key:value pairs. The value might be an
- integer, float or string value; a DisplayDataItem for values that
- have more data (e.g. short value, label, url); or a HasDisplayData
- instance that has more display data that should be picked up. For
- example:
-
- { 'key1': 'string_value',
- 'key2': 1234,
- 'key3': 3.14159265,
- 'key4': DisplayDataItem('apache.org', url='http://apache.org'),
- 'key5': subComponent }
+ Dict[str, Any]: A dictionary containing ``key:value`` pairs.
+ The value might be an integer, float or string value; a
+ :class:`DisplayDataItem` for values that have more data
+ (e.g. short value, label, url); or a :class:`HasDisplayData` instance
+ that has more display data that should be picked up. For example::
+
+ {
+ 'key1': 'string_value',
+ 'key2': 1234,
+ 'key3': 3.14159265,
+ 'key4': DisplayDataItem('apache.org', url='http://apache.org'),
+ 'key5': subComponent
+ }
"""
return {}
@@ -111,18 +116,19 @@ class DisplayData(object):
@classmethod
def create_from_options(cls, pipeline_options):
- """ Creates DisplayData from a PipelineOptions instance.
+ """ Creates :class:`DisplayData` from a
+ :class:`~apache_beam.options.pipeline_options.PipelineOptions` instance.
- When creating DisplayData, this method will convert the value of any
- item of a non-supported type to its string representation.
- The normal DisplayData.create_from method rejects those items.
+ When creating :class:`DisplayData`, this method will convert the value of
+ any item of a non-supported type to its string representation.
+ The normal :meth:`.create_from()` method rejects those items.
Returns:
- A DisplayData instance with populated items.
+ DisplayData: A :class:`DisplayData` instance with populated items.
Raises:
- ValueError: If the has_display_data argument is not an instance of
- HasDisplayData.
+ ~exceptions.ValueError: If the **has_display_data** argument is
+ not an instance of :class:`HasDisplayData`.
"""
from apache_beam.options.pipeline_options import PipelineOptions
if not isinstance(pipeline_options, PipelineOptions):
@@ -138,14 +144,14 @@ class DisplayData(object):
@classmethod
def create_from(cls, has_display_data):
- """ Creates DisplayData from a HasDisplayData instance.
+ """ Creates :class:`DisplayData` from a :class:`HasDisplayData` instance.
Returns:
- A DisplayData instance with populated items.
+ DisplayData: A :class:`DisplayData` instance with populated items.
Raises:
- ValueError: If the has_display_data argument is not an instance of
- HasDisplayData.
+ ~exceptions.ValueError: If the **has_display_data** argument is
+ not an instance of :class:`HasDisplayData`.
"""
if not isinstance(has_display_data, HasDisplayData):
raise ValueError('Element of class {}.{} does not subclass HasDisplayData'
@@ -214,11 +220,13 @@ class DisplayDataItem(object):
return False
def is_valid(self):
- """ Checks that all the necessary fields of the DisplayDataItem are
- filled in. It checks that neither key, namespace, value or type are None.
+ """ Checks that all the necessary fields of the :class:`DisplayDataItem`
+ are filled in. It checks that neither key, namespace, value or type are
+ :data:`None`.
Raises:
- ValueError: If the item does not have a key, namespace, value or type.
+ ~exceptions.ValueError: If the item does not have a key, namespace,
+ value or type.
"""
if self.key is None:
raise ValueError('Invalid DisplayDataItem. Key must not be None')
@@ -247,14 +255,15 @@ class DisplayDataItem(object):
return res
def get_dict(self):
- """ Returns the internal-API dictionary representing the DisplayDataItem.
+ """ Returns the internal-API dictionary representing the
+ :class:`DisplayDataItem`.
Returns:
- A dictionary. The internal-API dictionary representing the
- DisplayDataItem
+ Dict[str, Any]: A dictionary. The internal-API dictionary representing
+ the :class:`DisplayDataItem`.
Raises:
- ValueError: if the item is not valid.
+ ~exceptions.ValueError: if the item is not valid.
"""
self.is_valid()
return self._get_dict()
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index a798fa1..f6e08ca 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -214,38 +214,44 @@ class PTransform(WithTypeHints, HasDisplayData):
return self.__class__.__name__
def with_input_types(self, input_type_hint):
- """Annotates the input type of a PTransform with a type-hint.
+ """Annotates the input type of a :class:`PTransform` with a type-hint.
Args:
- input_type_hint: An instance of an allowed built-in type, a custom class,
- or an instance of a typehints.TypeConstraint.
+ input_type_hint (type): An instance of an allowed built-in type, a custom
+ class, or an instance of a
+ :class:`~apache_beam.typehints.typehints.TypeConstraint`.
Raises:
- TypeError: If 'type_hint' is not a valid type-hint. See
- typehints.validate_composite_type_param for further details.
+ ~exceptions.TypeError: If **input_type_hint** is not a valid type-hint.
+ See
+ :obj:`apache_beam.typehints.typehints.validate_composite_type_param()`
+ for further details.
Returns:
- A reference to the instance of this particular PTransform object. This
- allows chaining type-hinting related methods.
+ PTransform: A reference to the instance of this particular
+ :class:`PTransform` object. This allows chaining type-hinting related
+ methods.
"""
validate_composite_type_param(input_type_hint,
'Type hints for a PTransform')
return super(PTransform, self).with_input_types(input_type_hint)
def with_output_types(self, type_hint):
- """Annotates the output type of a PTransform with a type-hint.
+ """Annotates the output type of a :class:`PTransform` with a type-hint.
Args:
- type_hint: An instance of an allowed built-in type, a custom class, or a
- typehints.TypeConstraint.
+ type_hint (type): An instance of an allowed built-in type, a custom class,
+ or a :class:`~apache_beam.typehints.typehints.TypeConstraint`.
Raises:
- TypeError: If 'type_hint' is not a valid type-hint. See
- typehints.validate_composite_type_param for further details.
+ ~exceptions.TypeError: If **type_hint** is not a valid type-hint. See
+ :obj:`~apache_beam.typehints.typehints.validate_composite_type_param()`
+ for further details.
Returns:
- A reference to the instance of this particular PTransform object. This
- allows chaining type-hinting related methods.
+ PTransform: A reference to the instance of this particular
+ :class:`PTransform` object. This allows chaining type-hinting related
+ methods.
"""
validate_composite_type_param(type_hint, 'Type hints for a PTransform')
return super(PTransform, self).with_output_types(type_hint)
@@ -491,13 +497,16 @@ class _ChainedPTransform(PTransform):
class PTransformWithSideInputs(PTransform):
- """A superclass for any PTransform (e.g. FlatMap or Combine)
+ """A superclass for any :class:`PTransform` (e.g.
+ :func:`~apache_beam.transforms.core.FlatMap` or
+ :class:`~apache_beam.transforms.core.CombineFn`)
invoking user code.
- PTransforms like FlatMap invoke user-supplied code in some kind of
- package (e.g. a DoFn) and optionally provide arguments and side inputs
- to that code. This internal-use-only class contains common functionality
- for PTransforms that fit this model.
+ :class:`PTransform` s like :func:`~apache_beam.transforms.core.FlatMap`
+ invoke user-supplied code in some kind of package (e.g. a
+ :class:`~apache_beam.transforms.core.DoFn`) and optionally provide arguments
+ and side inputs to that code. This internal-use-only class contains common
+ functionality for :class:`PTransform` s that fit this model.
"""
def __init__(self, fn, *args, **kwargs):
@@ -543,16 +552,20 @@ class PTransformWithSideInputs(PTransform):
of an allowed built-in type, a custom class, or a
typehints.TypeConstraint.
- Example of annotating the types of side-inputs:
+ Example of annotating the types of side-inputs::
+
FlatMap().with_input_types(int, int, bool)
Raises:
- TypeError: If 'type_hint' is not a valid type-hint. See
- typehints.validate_composite_type_param for further details.
+ :class:`~exceptions.TypeError`: If **type_hint** is not a valid type-hint.
+ See
+ :func:`~apache_beam.typehints.typehints.validate_composite_type_param`
+ for further details.
Returns:
- A reference to the instance of this particular PTransform object. This
- allows chaining type-hinting related methods.
+ :class:`PTransform`: A reference to the instance of this particular
+ :class:`PTransform` object. This allows chaining type-hinting related
+ methods.
"""
super(PTransformWithSideInputs, self).with_input_types(input_type_hint)
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/typehints/decorators.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/decorators.py b/sdks/python/apache_beam/typehints/decorators.py
index d5954e2..3f5b4c9 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -310,38 +310,54 @@ def with_input_types(*positional_hints, **keyword_hints):
be type-hinted in totality if even one parameter is type-hinted.
Once fully decorated, if the arguments passed to the resulting function
- violate the type-hint constraints defined, a TypeCheckError detailing the
- error will be raised.
+ violate the type-hint constraints defined, a :class:`TypeCheckError`
+ detailing the error will be raised.
- To be used as::
+ To be used as:
- * @with_input_types(s=str) # just @with_input_types(str) will work too.
- def upper(s):
- return s.upper()
+ .. testcode::
- Or::
+ from apache_beam.typehints import with_input_types
- * @with_input_types(ls=List[Tuple[int, int])
- def increment(ls):
- [(i + 1, j + 1) for (i,j) in ls]
+ @with_input_types(str)
+ def upper(s):
+ return s.upper()
+
+ Or:
+
+ .. testcode::
+
+ from apache_beam.typehints import with_input_types
+ from apache_beam.typehints import List
+ from apache_beam.typehints import Tuple
+
+ @with_input_types(ls=List[Tuple[int, int]])
+ def increment(ls):
+ [(i + 1, j + 1) for (i,j) in ls]
Args:
*positional_hints: Positional type-hints having identical order as the
function's formal arguments. Values for this argument must either be a
- built-in Python type or an instance of a TypeContraint created by
- 'indexing' a CompositeTypeHint instance with a type parameter.
+ built-in Python type or an instance of a
+ :class:`~apache_beam.typehints.typehints.TypeConstraint` created by
+ 'indexing' a
+ :class:`~apache_beam.typehints.typehints.CompositeTypeHint` instance
+ with a type parameter.
**keyword_hints: Keyword arguments mirroring the names of the parameters to
the decorated functions. The value of each keyword argument must either
be one of the allowed built-in Python types, a custom class, or an
- instance of a TypeContraint created by 'indexing' a CompositeTypeHint
- instance with a type parameter.
+ instance of a :class:`~apache_beam.typehints.typehints.TypeConstraint`
+ created by 'indexing' a
+ :class:`~apache_beam.typehints.typehints.CompositeTypeHint` instance
+ with a type parameter.
Raises:
- ValueError: If not all function arguments have corresponding type-hints
- specified. Or if the inner wrapper function isn't passed a function
- object.
- TypeCheckError: If the any of the passed type-hint constraints are not a
- type or TypeContraint instance.
+ :class:`~exceptions.ValueError`: If not all function arguments have
+ corresponding type-hints specified. Or if the inner wrapper function isn't
+ passed a function object.
+ :class:`TypeCheckError`: If the any of the passed type-hint
+ constraints are not a type or
+ :class:`~apache_beam.typehints.typehints.TypeConstraint` instance.
Returns:
The original function decorated such that it enforces type-hint constraints
@@ -375,37 +391,53 @@ def with_output_types(*return_type_hint, **kwargs):
Only a single type-hint is accepted to specify the return type of the return
value. If the function to be decorated has multiple return values, then one
- should use: 'Tuple[type_1, type_2]' to annotate the types of the return
+ should use: ``Tuple[type_1, type_2]`` to annotate the types of the return
values.
If the ultimate return value for the function violates the specified type-hint
- a TypeCheckError will be raised detailing the type-constraint violation.
+ a :class:`TypeCheckError` will be raised detailing the type-constraint
+ violation.
+
+ This decorator is intended to be used like:
+
+ .. testcode::
+
+ from apache_beam.typehints import with_output_types
+ from apache_beam.typehints import Set
+
+ class Coordinate:
+ def __init__(self, x, y):
+ self.x = x
+ self.y = y
+
+ @with_output_types(Set[Coordinate])
+ def parse_ints(ints):
+ return {Coordinate(i, i) for i in ints}
- This decorator is intended to be used like::
+ Or with a simple type-hint:
- * @with_output_types(Set[Coordinate])
- def parse_ints(ints):
- ....
- return [Coordinate.from_int(i) for i in ints]
+ .. testcode::
- Or with a simple type-hint::
+ from apache_beam.typehints import with_output_types
- * @with_output_types(bool)
- def negate(p):
- return not p if p else p
+ @with_output_types(bool)
+ def negate(p):
+ return not p if p else p
Args:
*return_type_hint: A type-hint specifying the proper return type of the
function. This argument should either be a built-in Python type or an
- instance of a 'TypeConstraint' created by 'indexing' a
- 'CompositeTypeHint'.
+ instance of a :class:`~apache_beam.typehints.typehints.TypeConstraint`
+ created by 'indexing' a
+ :class:`~apache_beam.typehints.typehints.CompositeTypeHint`.
**kwargs: Not used.
Raises:
- ValueError: If any kwarg parameters are passed in, or the length of
- 'return_type_hint' is greater than 1. Or if the inner wrapper function
- isn't passed a function object.
- TypeCheckError: If the 'return_type_hint' object is in invalid type-hint.
+ :class:`~exceptions.ValueError`: If any kwarg parameters are passed in,
+ or the length of **return_type_hint** is greater than ``1``. Or if the
+ inner wrapper function isn't passed a function object.
+ :class:`TypeCheckError`: If the **return_type_hint** object is
+ in invalid type-hint.
Returns:
The original function decorated such that it enforces type-hint constraints
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/typehints/native_type_compatibility.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py
index d88f933..26c584e 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py
@@ -82,13 +82,14 @@ def convert_to_beam_type(typ):
"""Convert a given typing type to a Beam type.
Args:
- typ: typing type.
+ typ (type): typing type.
Returns:
- The given type converted to a Beam type as far as we can do the conversion.
+ type: The given type converted to a Beam type as far as we can do the
+ conversion.
Raises:
- ValueError: The type was malformed.
+ ~exceptions.ValueError: The type was malformed.
"""
type_map = [
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/typehints/typehints.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/typehints/typehints.py b/sdks/python/apache_beam/typehints/typehints.py
index 6039e0e..98d399b 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -73,7 +73,6 @@ __all__ = [
'Union',
'Optional',
'Tuple',
- 'Tuple',
'List',
'KV',
'Dict',
@@ -109,9 +108,10 @@ class TypeConstraint(object):
"""The base-class for all created type-constraints defined below.
- A TypeConstraint is the result of parameterizing a CompositeTypeHint with
- with one of the allowed Python types or another CompositeTypeHint. It
- binds and enforces a specific version of a generalized TypeHint.
+ A :class:`TypeConstraint` is the result of parameterizing a
+ :class:`CompositeTypeHint` with with one of the allowed Python types or
+ another :class:`CompositeTypeHint`. It binds and enforces a specific
+ version of a generalized TypeHint.
"""
def _consistent_with_check_(self, sub):
@@ -135,12 +135,14 @@ class TypeConstraint(object):
instance: An instance of a Python object.
Raises:
- TypeError: The passed 'instance' doesn't satisfy this TypeConstraint.
- Subclasses of TypeConstraint are free to raise any of the subclasses of
- TypeError defined above, depending on the manner of the type hint error.
-
- All TypeConstraint sub-classes must define this method in other for the
- class object to be created.
+ :class:`~exceptions.TypeError`: The passed **instance** doesn't satisfy
+ this :class:`TypeConstraint`. Subclasses of
+ :class:`TypeConstraint` are free to raise any of the subclasses of
+ :class:`~exceptions.TypeError` defined above, depending on
+ the manner of the type hint error.
+
+ All :class:`TypeConstraint` sub-classes must define this method in other
+ for the class object to be created.
"""
raise NotImplementedError
@@ -296,19 +298,21 @@ class CompositeTypeHint(object):
def validate_composite_type_param(type_param, error_msg_prefix):
- """Determines if an object is a valid type parameter to a CompositeTypeHint.
+ """Determines if an object is a valid type parameter to a
+ :class:`CompositeTypeHint`.
+
+ Implements sanity checking to disallow things like::
- Implements sanity checking to disallow things like:
- * List[1, 2, 3] or Dict[5].
+ List[1, 2, 3] or Dict[5].
Args:
type_param: An object instance.
- error_msg_prefix: A string prefix used to format an error message in the
- case of an exception.
+ error_msg_prefix (:class:`str`): A string prefix used to format an error
+ message in the case of an exception.
Raises:
- TypeError: If the passed 'type_param' is not a valid type parameter for a
- CompositeTypeHint.
+ ~exceptions.TypeError: If the passed **type_param** is not a valid type
+ parameter for a :class:`CompositeTypeHint`.
"""
# Must either be a TypeConstraint instance or a basic Python type.
is_not_type_constraint = (
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/generate_pydoc.sh
----------------------------------------------------------------------
diff --git a/sdks/python/generate_pydoc.sh b/sdks/python/generate_pydoc.sh
index 1fea6f1..662bd09 100755
--- a/sdks/python/generate_pydoc.sh
+++ b/sdks/python/generate_pydoc.sh
@@ -31,43 +31,132 @@ rm -rf target/docs/*
mkdir -p target/docs/source
-# Exclude internal/experimental files from the documentation.
-excluded_internal_code=(
+# Sphinx apidoc autodoc options
+export SPHINX_APIDOC_OPTIONS=\
+members,\
+undoc-members,\
+show-inheritance
+
+# Exclude internal, test, and Cython paths/patterns from the documentation.
+excluded_patterns=(
+ apache_beam/coders/stream.*
+ apache_beam/coders/coder_impl.*
apache_beam/examples/
apache_beam/internal/clients/
- apache_beam/io/gcp/internal/clients/
+ apache_beam/io/gcp/internal/
+ apache_beam/io/gcp/tests/
+ apache_beam/metrics/execution.*
+ apache_beam/runners/common.*
apache_beam/runners/api/
apache_beam/runners/test/
+ apache_beam/runners/dataflow/internal/
apache_beam/runners/portability/
apache_beam/runners/worker/
- apache_beam/runners/dataflow/internal/clients/
- apache_beam/testing/data/)
-
-python $(type -p sphinx-apidoc) -f -o target/docs/source apache_beam \
- "${excluded_internal_code[@]}" "*_test.py"
+ apache_beam/transforms/cy_combiners.*
+ apache_beam/utils/counters.*
+ apache_beam/utils/windowed_value.*
+ *_pb2.py
+ *_test.py
+ *_test_common.py
+)
-# Remove Cython modules from doc template; they won't load
-sed -i -e '/.. automodule:: apache_beam.coders.stream/d' \
- target/docs/source/apache_beam.coders.rst
+python $(type -p sphinx-apidoc) -fMeT -o target/docs/source apache_beam \
+ "${excluded_patterns[@]}"
# Create the configuration and index files
+#=== conf.py ===#
cat > target/docs/source/conf.py <<'EOF'
import os
import sys
+import sphinx_rtd_theme
+
sys.path.insert(0, os.path.abspath('../../..'))
+exclude_patterns = [
+ '_build',
+ 'target/docs/source/apache_beam.rst',
+]
+
extensions = [
'sphinx.ext.autodoc',
+ 'sphinx.ext.doctest',
+ 'sphinx.ext.intersphinx',
'sphinx.ext.napoleon',
'sphinx.ext.viewcode',
]
master_doc = 'index'
-html_theme = 'sphinxdoc'
+html_theme = 'sphinx_rtd_theme'
+html_theme_path = [sphinx_rtd_theme.get_html_theme_path()]
project = 'Apache Beam'
+
+autoclass_content = 'both'
+autodoc_member_order = 'bysource'
+
+doctest_global_setup = '''
+import apache_beam as beam
+'''
+
+intersphinx_mapping = {
+ 'python': ('https://docs.python.org/2', None),
+ 'hamcrest': ('https://pyhamcrest.readthedocs.io/en/latest/', None),
+}
+
+# Since private classes are skipped by sphinx, if there is any cross reference
+# to them, it will be broken. This can happen if a class inherits from a
+# private class.
+ignore_identifiers = [
+ # Ignore "custom" builtin types
+ '',
+ 'Any',
+ 'Dict',
+ 'Iterable',
+ 'List',
+ 'Set',
+ 'Tuple',
+
+ # Ignore private classes
+ 'apache_beam.coders.coders._PickleCoderBase',
+ 'apache_beam.coders.coders.FastCoder',
+ 'apache_beam.io._AvroSource',
+ 'apache_beam.io.gcp.bigquery.RowAsDictJsonCoder',
+ 'apache_beam.io.gcp.datastore.v1.datastoreio._Mutate',
+ 'apache_beam.io.gcp.internal.clients.bigquery.'
+ 'bigquery_v2_messages.TableSchema',
+ 'apache_beam.io.iobase.SourceBase',
+ 'apache_beam.io.source_test_utils.ExpectedSplitOutcome',
+ 'apache_beam.metrics.metric.MetricResults',
+ 'apache_beam.pipeline.PipelineVisitor',
+ 'apache_beam.pipeline.PTransformOverride',
+ 'apache_beam.pvalue.AsSideInput',
+ 'apache_beam.pvalue.DoOutputsTuple',
+ 'apache_beam.pvalue.PValue',
+ 'apache_beam.runners.direct.executor.CallableTask',
+ 'apache_beam.transforms.core.CallableWrapperCombineFn',
+ 'apache_beam.transforms.ptransform.PTransformWithSideInputs',
+ 'apache_beam.transforms.trigger._ParallelTriggerFn',
+ 'apache_beam.transforms.trigger.InMemoryUnmergedState',
+ 'apache_beam.typehints.typehints.AnyTypeConstraint',
+ 'apache_beam.typehints.typehints.CompositeTypeHint',
+ 'apache_beam.typehints.typehints.TypeConstraint',
+ 'apache_beam.typehints.typehints.validate_composite_type_param()',
+
+ # Private classes which are used within the same module
+ 'WindowedTypeConstraint', # apache_beam.typehints.typehints
+]
+
+# When inferring a base class it will use ':py:class'; if inferring a function
+# argument type or return type, it will use ':py:obj'. We'll generate both.
+nitpicky = True
+nitpick_ignore = []
+nitpick_ignore += [('py:class', iden) for iden in ignore_identifiers]
+nitpick_ignore += [('py:obj', iden) for iden in ignore_identifiers]
EOF
+
+#=== index.rst ===#
cat > target/docs/source/index.rst <<'EOF'
-.. include:: ./modules.rst
+.. include:: ./apache_beam.rst
+ :start-line: 2
EOF
# Build the documentation using sphinx
@@ -76,10 +165,21 @@ python $(type -p sphinx-build) -v -a -E -q target/docs/source \
target/docs/_build -c target/docs/source \
-w "target/docs/sphinx-build.warnings.log"
+# Fail if there are errors or warnings in docs
+! grep -q "ERROR:" target/docs/sphinx-build.warnings.log || exit 1
+! grep -q "WARNING:" target/docs/sphinx-build.warnings.log || exit 1
+
+# Run tests for code samples, these can be:
+# - Code blocks using '.. testsetup::', '.. testcode::' and '.. testoutput::'
+# - Interactive code starting with '>>>'
+python -msphinx -M doctest target/docs/source \
+ target/docs/_build -c target/docs/source \
+ -w "target/docs/sphinx-doctest.warnings.log"
+
+# Fail if there are errors or warnings in docs
+! grep -q "ERROR:" target/docs/sphinx-doctest.warnings.log || exit 1
+! grep -q "WARNING:" target/docs/sphinx-doctest.warnings.log || exit 1
+
# Message is useful only when this script is run locally. In a remote
# test environment, this path will be removed when the test completes.
echo "Browse to file://$PWD/target/docs/_build/index.html"
-
-# Fail if there are errors or warnings in docs
-! grep -q "ERROR:" target/docs/sphinx-build.warnings.log
-! grep -q "WARNING:" target/docs/sphinx-build.warnings.log
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/tox.ini
----------------------------------------------------------------------
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index eff91fe..fea3854 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -99,6 +99,7 @@ deps=
nose==1.3.7
grpcio-tools==1.3.5
Sphinx==1.5.5
+ sphinx_rtd_theme==0.2.4
commands =
pip install -e .[test,gcp,docs]
{toxinidir}/generate_pydoc.sh
[3/3] beam git commit: This closes #3613
Posted by al...@apache.org.
This closes #3613
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/84a23793
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/84a23793
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/84a23793
Branch: refs/heads/master
Commit: 84a23793c5009afcaf40b068757d079fccbc6a76
Parents: d035a34 e1baf55
Author: Ahmet Altay <al...@google.com>
Authored: Fri Aug 11 17:02:38 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Aug 11 17:02:38 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/__init__.py | 47 ++--
.../apache_beam/internal/gcp/json_value.py | 45 ++--
sdks/python/apache_beam/io/avroio.py | 79 ++++--
sdks/python/apache_beam/io/filebasedsink.py | 18 +-
sdks/python/apache_beam/io/filebasedsource.py | 67 +++--
sdks/python/apache_beam/io/filesystem.py | 27 +-
sdks/python/apache_beam/io/gcp/bigquery.py | 257 +++++++++++--------
sdks/python/apache_beam/io/gcp/gcsio.py | 12 +-
sdks/python/apache_beam/io/range_trackers.py | 12 +-
sdks/python/apache_beam/io/source_test_utils.py | 88 ++++---
sdks/python/apache_beam/io/textio.py | 121 +++++----
sdks/python/apache_beam/pipeline.py | 89 ++++---
sdks/python/apache_beam/runners/runner.py | 31 ++-
.../python/apache_beam/testing/test_pipeline.py | 48 ++--
sdks/python/apache_beam/transforms/core.py | 165 +++++++-----
sdks/python/apache_beam/transforms/display.py | 87 ++++---
.../python/apache_beam/transforms/ptransform.py | 61 +++--
sdks/python/apache_beam/typehints/decorators.py | 104 +++++---
.../typehints/native_type_compatibility.py | 7 +-
sdks/python/apache_beam/typehints/typehints.py | 38 +--
sdks/python/generate_pydoc.sh | 134 ++++++++--
sdks/python/tox.ini | 1 +
22 files changed, 953 insertions(+), 585 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Makes Python API reference generation more
strict
Posted by al...@apache.org.
Makes Python API reference generation more strict
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e1baf55d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e1baf55d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e1baf55d
Branch: refs/heads/master
Commit: e1baf55d82fcc4a3951057b2321f77319d88b6c3
Parents: d035a34
Author: David Cavazos <dc...@google.com>
Authored: Fri Jul 21 09:58:11 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Aug 11 17:01:34 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/__init__.py | 47 ++--
.../apache_beam/internal/gcp/json_value.py | 45 ++--
sdks/python/apache_beam/io/avroio.py | 79 ++++--
sdks/python/apache_beam/io/filebasedsink.py | 18 +-
sdks/python/apache_beam/io/filebasedsource.py | 67 +++--
sdks/python/apache_beam/io/filesystem.py | 27 +-
sdks/python/apache_beam/io/gcp/bigquery.py | 257 +++++++++++--------
sdks/python/apache_beam/io/gcp/gcsio.py | 12 +-
sdks/python/apache_beam/io/range_trackers.py | 12 +-
sdks/python/apache_beam/io/source_test_utils.py | 88 ++++---
sdks/python/apache_beam/io/textio.py | 121 +++++----
sdks/python/apache_beam/pipeline.py | 89 ++++---
sdks/python/apache_beam/runners/runner.py | 31 ++-
.../python/apache_beam/testing/test_pipeline.py | 48 ++--
sdks/python/apache_beam/transforms/core.py | 165 +++++++-----
sdks/python/apache_beam/transforms/display.py | 87 ++++---
.../python/apache_beam/transforms/ptransform.py | 61 +++--
sdks/python/apache_beam/typehints/decorators.py | 104 +++++---
.../typehints/native_type_compatibility.py | 7 +-
sdks/python/apache_beam/typehints/typehints.py | 38 +--
sdks/python/generate_pydoc.sh | 134 ++++++++--
sdks/python/tox.ini | 1 +
22 files changed, 953 insertions(+), 585 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py
index 8b772c9..791ebb7 100644
--- a/sdks/python/apache_beam/__init__.py
+++ b/sdks/python/apache_beam/__init__.py
@@ -15,11 +15,12 @@
# limitations under the License.
#
-"""Apache Beam SDK for Python.
+"""
+Apache Beam SDK for Python
+==========================
-Apache Beam <https://beam.apache.org/>
-provides a simple, powerful programming model for building both batch
-and streaming parallel data processing pipelines.
+`Apache Beam <https://beam.apache.org>`_ provides a simple, powerful programming
+model for building both batch and streaming parallel data processing pipelines.
The Apache Beam SDK for Python provides access to Apache Beam capabilities
from the Python programming language.
@@ -33,32 +34,40 @@ Overview
--------
The key concepts in this programming model are
-* PCollection: represents a collection of data, which could be
- bounded or unbounded in size.
-* PTransform: represents a computation that transforms input
- PCollections into output PCollections.
-* Pipeline: manages a directed acyclic graph of PTransforms and
- PCollections that is ready for execution.
-* Runner: specifies where and how the Pipeline should execute.
-* Reading and Writing Data: your pipeline can read from an external
- source and write to an external data sink.
+* :class:`~apache_beam.pvalue.PCollection`: represents a collection of data,
+ which could be bounded or unbounded in size.
+* :class:`~apache_beam.transforms.ptransform.PTransform`: represents a
+ computation that transforms input PCollections into output PCollections.
+* :class:`~apache_beam.pipeline.Pipeline`: manages a directed acyclic graph of
+ :class:`~apache_beam.transforms.ptransform.PTransform` s and
+ :class:`~apache_beam.pvalue.PCollection` s that is ready for execution.
+* :class:`~apache_beam.runners.runner.PipelineRunner`: specifies where and how
+ the pipeline should execute.
+* :class:`~apache_beam.io.iobase.Read`: read from an external source.
+* :class:`~apache_beam.io.iobase.Write`: write to an external data sink.
Typical usage
-------------
At the top of your source file::
- import apache_beam as beam
+ import apache_beam as beam
After this import statement
-* transform classes are available as beam.FlatMap, beam.GroupByKey, etc.
-* Pipeline class is available as beam.Pipeline
-* text read/write transforms are available as beam.io.ReadfromText,
- beam.io.WriteToText
+* Transform classes are available as
+ :class:`beam.FlatMap <apache_beam.transforms.core.FlatMap>`,
+ :class:`beam.GroupByKey <apache_beam.transforms.core.GroupByKey>`, etc.
+* Pipeline class is available as
+ :class:`beam.Pipeline <apache_beam.pipeline.Pipeline>`
+* Text read/write transforms are available as
+ :class:`beam.io.ReadFromText <apache_beam.io.textio.ReadFromText>`,
+ :class:`beam.io.WriteToText <apache_beam.io.textio.WriteToText>`.
Examples
--------
-The examples subdirectory has some examples.
+The `examples subdirectory
+<https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples>`_
+has some examples.
"""
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/internal/gcp/json_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py b/sdks/python/apache_beam/internal/gcp/json_value.py
index 59f8b60..167b173 100644
--- a/sdks/python/apache_beam/internal/gcp/json_value.py
+++ b/sdks/python/apache_beam/internal/gcp/json_value.py
@@ -41,11 +41,12 @@ def get_typed_value_descriptor(obj):
obj: A basestring, bool, int, or float to be converted.
Returns:
- A dictionary containing the keys '@type' and 'value' with the value for
- the @type of appropriate type.
+ A dictionary containing the keys ``@type`` and ``value`` with the value for
+ the ``@type`` of appropriate type.
Raises:
- TypeError: if the Python object has a type that is not supported.
+ ~exceptions.TypeError: if the Python object has a type that is not
+ supported.
"""
if isinstance(obj, basestring):
type_name = 'Text'
@@ -66,21 +67,23 @@ def to_json_value(obj, with_type=False):
Converts Python objects into extra_types.JsonValue objects.
Args:
- obj: Python object to be converted. Can be 'None'.
- with_type: If true then the basic types (string, int, float, bool) will
- be wrapped in @type/value dictionaries. Otherwise the straight value is
- encoded into a JsonValue.
+ obj: Python object to be converted. Can be :data:`None`.
+ with_type: If true then the basic types (``string``, ``int``, ``float``,
+ ``bool``) will be wrapped in ``@type:value`` dictionaries. Otherwise the
+ straight value is encoded into a ``JsonValue``.
Returns:
- A JsonValue object using JsonValue, JsonArray and JsonObject types for the
- corresponding values, lists, or dictionaries.
+ A ``JsonValue`` object using ``JsonValue``, ``JsonArray`` and ``JsonObject``
+ types for the corresponding values, lists, or dictionaries.
Raises:
- TypeError: if the Python object contains a type that is not supported.
+ ~exceptions.TypeError: if the Python object contains a type that is not
+ supported.
- The types supported are str, bool, list, tuple, dict, and None. The Dataflow
- API requires JsonValue(s) in many places, and it is quite convenient to be
- able to specify these hierarchical objects using Python syntax.
+ The types supported are ``str``, ``bool``, ``list``, ``tuple``, ``dict``, and
+ ``None``. The Dataflow API requires JsonValue(s) in many places, and it is
+ quite convenient to be able to specify these hierarchical objects using
+ Python syntax.
"""
if obj is None:
return extra_types.JsonValue(is_null=True)
@@ -121,21 +124,23 @@ def to_json_value(obj, with_type=False):
def from_json_value(v):
"""For internal use only; no backwards-compatibility guarantees.
- Converts extra_types.JsonValue objects into Python objects.
+ Converts ``extra_types.JsonValue`` objects into Python objects.
Args:
- v: JsonValue object to be converted.
+ v: ``JsonValue`` object to be converted.
Returns:
A Python object structured as values, lists, and dictionaries corresponding
- to JsonValue, JsonArray and JsonObject types.
+ to ``JsonValue``, ``JsonArray`` and ``JsonObject`` types.
Raises:
- TypeError: if the JsonValue object contains a type that is not supported.
+ ~exceptions.TypeError: if the ``JsonValue`` object contains a type that is
+ not supported.
- The types supported are str, bool, list, dict, and None. The Dataflow API
- returns JsonValue(s) in many places and it is quite convenient to be able to
- convert these hierarchical objects to much simpler Python objects.
+ The types supported are ``str``, ``bool``, ``list``, ``dict``, and ``None``.
+ The Dataflow API returns JsonValue(s) in many places and it is quite
+ convenient to be able to convert these hierarchical objects to much simpler
+ Python objects.
"""
if isinstance(v, extra_types.JsonValue):
if v.string_value is not None:
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 47ea282..cb14c65 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -64,27 +64,74 @@ __all__ = ['ReadFromAvro', 'ReadAllFromAvro', 'WriteToAvro']
class ReadFromAvro(PTransform):
- """A ``PTransform`` for reading Avro files.
+ """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading avro
+ files."""
- Uses source '_AvroSource' to read a set of Avro files defined by a given
- file pattern.
- If '/mypath/myavrofiles*' is a file-pattern that points to a set of Avro
- files, a ``PCollection`` for the records in these Avro files can be created
- in the following manner.
+ def __init__(self, file_pattern=None, min_bundle_size=0, validate=True):
+ """Initializes :class:`ReadFromAvro`.
- p = df.Pipeline(argv=pipeline_args)
- records = p | 'Read' >> df.io.ReadFromAvro('/mypath/myavrofiles*')
- """
+ Uses source :class:`~apache_beam.io._AvroSource` to read a set of Avro
+ files defined by a given file pattern.
- def __init__(self, file_pattern=None, min_bundle_size=0, validate=True):
- """Initializes ``ReadFromAvro``.
+ If ``/mypath/myavrofiles*`` is a file-pattern that points to a set of Avro
+ files, a :class:`~apache_beam.pvalue.PCollection` for the records in
+ these Avro files can be created in the following manner.
+
+ .. testcode::
+
+ with beam.Pipeline() as p:
+ records = p | 'Read' >> beam.io.ReadFromAvro('/mypath/myavrofiles*')
+
+ .. NOTE: We're not actually interested in this error; but if we get here,
+ it means that the way of calling this transform hasn't changed.
+
+ .. testoutput::
+ :hide:
+
+ Traceback (most recent call last):
+ ...
+ IOError: No files found based on the file pattern
+
+ Each record of this :class:`~apache_beam.pvalue.PCollection` will contain
+ a single record read from a source. Records that are of simple types will be
+ mapped into corresponding Python types. Records that are of Avro type
+ ``RECORD`` will be mapped to Python dictionaries that comply with the schema
+ contained in the Avro file that contains those records. In this case, keys
+ of each dictionary will contain the corresponding field names and will be of
+ type :class:`str` while the values of the dictionary will be of the type
+ defined in the corresponding Avro schema.
+
+ For example, if schema of the Avro file is the following. ::
+
+ {
+ "namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+
+ {"name": "name",
+ "type": "string"},
+
+ {"name": "favorite_number",
+ "type": ["int", "null"]},
+
+ {"name": "favorite_color",
+ "type": ["string", "null"]}
+
+ ]
+ }
+
+ Then records generated by :class:`~apache_beam.io._AvroSource` will be
+ dictionaries of the following form. ::
+
+ {u'name': u'Alyssa', u'favorite_number': 256, u'favorite_color': None}).
Args:
- file_pattern: the set of files to be read.
- min_bundle_size: the minimum size in bytes, to be considered when
- splitting the input into bundles.
- validate: flag to verify that the files exist during the pipeline
- creation time.
+ file_pattern (str): the file glob to read
+ min_bundle_size (int): the minimum size in bytes, to be considered when
+ splitting the input into bundles.
+ validate (bool): flag to verify that the files exist during the pipeline
+ creation time.
"""
super(ReadFromAvro, self).__init__()
self._source = _AvroSource(file_pattern, min_bundle_size, validate=validate)
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/filebasedsink.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py
index 76c09fc..eb99d08 100644
--- a/sdks/python/apache_beam/io/filebasedsink.py
+++ b/sdks/python/apache_beam/io/filebasedsink.py
@@ -44,12 +44,13 @@ class FileBasedSink(iobase.Sink):
"""A sink to a GCS or local files.
To implement a file-based sink, extend this class and override
- either ``write_record()`` or ``write_encoded_record()``.
+ either :meth:`.write_record()` or :meth:`.write_encoded_record()`.
- If needed, also overwrite ``open()`` and/or ``close()`` to customize the
- file handling or write headers and footers.
+ If needed, also overwrite :meth:`.open()` and/or :meth:`.close()` to customize
+ the file handling or write headers and footers.
- The output of this write is a PCollection of all written shards.
+ The output of this write is a :class:`~apache_beam.pvalue.PCollection` of
+ all written shards.
"""
# Max number of threads to be used for renaming.
@@ -65,9 +66,12 @@ class FileBasedSink(iobase.Sink):
compression_type=CompressionTypes.AUTO):
"""
Raises:
- TypeError: if file path parameters are not a string or ValueProvider,
- or if compression_type is not member of CompressionTypes.
- ValueError: if shard_name_template is not of expected format.
+ ~exceptions.TypeError: if file path parameters are not a :class:`str` or
+ :class:`~apache_beam.options.value_provider.ValueProvider`, or if
+ **compression_type** is not member of
+ :class:`~apache_beam.io.filesystem.CompressionTypes`.
+ ~exceptions.ValueError: if **shard_name_template** is not of expected
+ format.
"""
if not isinstance(file_path_prefix, (basestring, ValueProvider)):
raise TypeError('file_path_prefix must be a string or ValueProvider;'
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index f78bf3f..6496930 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -17,12 +17,13 @@
"""A framework for developing sources for new file types.
-To create a source for a new file type a sub-class of ``FileBasedSource`` should
-be created. Sub-classes of ``FileBasedSource`` must implement the method
-``FileBasedSource.read_records()``. Please read the documentation of that method
-for more details.
+To create a source for a new file type a sub-class of :class:`FileBasedSource`
+should be created. Sub-classes of :class:`FileBasedSource` must implement the
+method :meth:`FileBasedSource.read_records()`. Please read the documentation of
+that method for more details.
-For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
+For an example implementation of :class:`FileBasedSource` see
+:class:`~apache_beam.io._AvroSource`.
"""
import uuid
@@ -51,7 +52,8 @@ __all__ = ['FileBasedSource']
class FileBasedSource(iobase.BoundedSource):
- """A ``BoundedSource`` for reading a file glob of a given type."""
+ """A :class:`~apache_beam.io.iobase.BoundedSource` for reading a file glob of
+ a given type."""
MIN_NUMBER_OF_FILES_TO_STAT = 100
MIN_FRACTION_OF_FILES_TO_STAT = 0.01
@@ -62,31 +64,40 @@ class FileBasedSource(iobase.BoundedSource):
compression_type=CompressionTypes.AUTO,
splittable=True,
validate=True):
- """Initializes ``FileBasedSource``.
+ """Initializes :class:`FileBasedSource`.
Args:
- file_pattern: the file glob to read a string or a ValueProvider
- (placeholder to inject a runtime value).
- min_bundle_size: minimum size of bundles that should be generated when
- performing initial splitting on this source.
- compression_type: compression type to use
- splittable: whether FileBasedSource should try to logically split a single
- file into data ranges so that different parts of the same file
- can be read in parallel. If set to False, FileBasedSource will
- prevent both initial and dynamic splitting of sources for
- single files. File patterns that represent multiple files may
- still get split into sources for individual files. Even if set
- to True by the user, FileBasedSource may choose to not split
- the file, for example, for compressed files where currently
- it is not possible to efficiently read a data range without
- decompressing the whole file.
- validate: Boolean flag to verify that the files exist during the pipeline
- creation time.
+ file_pattern (str): the file glob to read a string or a
+ :class:`~apache_beam.options.value_provider.ValueProvider`
+ (placeholder to inject a runtime value).
+ min_bundle_size (str): minimum size of bundles that should be generated
+ when performing initial splitting on this source.
+ compression_type (str): Used to handle compressed output files.
+ Typical value is :attr:`CompressionTypes.AUTO
+ <apache_beam.io.filesystem.CompressionTypes.AUTO>`,
+ in which case the final file path's extension will be used to detect
+ the compression.
+ splittable (bool): whether :class:`FileBasedSource` should try to
+ logically split a single file into data ranges so that different parts
+ of the same file can be read in parallel. If set to :data:`False`,
+ :class:`FileBasedSource` will prevent both initial and dynamic splitting
+ of sources for single files. File patterns that represent multiple files
+ may still get split into sources for individual files. Even if set to
+ :data:`True` by the user, :class:`FileBasedSource` may choose to not
+ split the file, for example, for compressed files where currently it is
+ not possible to efficiently read a data range without decompressing the
+ whole file.
+ validate (bool): Boolean flag to verify that the files exist during the
+ pipeline creation time.
+
Raises:
- TypeError: when compression_type is not valid or if file_pattern is not a
- string or a ValueProvider.
- ValueError: when compression and splittable files are specified.
- IOError: when the file pattern specified yields an empty result.
+ ~exceptions.TypeError: when **compression_type** is not valid or if
+ **file_pattern** is not a :class:`str` or a
+ :class:`~apache_beam.options.value_provider.ValueProvider`.
+ ~exceptions.ValueError: when compression and splittable files are
+ specified.
+ ~exceptions.IOError: when the file pattern specified yields an empty
+ result.
"""
if not isinstance(file_pattern, (basestring, ValueProvider)):
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index ef3040c..5804d00 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -299,23 +299,28 @@ class CompressedFile(object):
"""Set the file's current offset.
Seeking behavior:
- * seeking from the end (SEEK_END) the whole file is decompressed once to
- determine it's size. Therefore it is preferred to use
- SEEK_SET or SEEK_CUR to avoid the processing overhead
- * seeking backwards from the current position rewinds the file to 0
+
+ * seeking from the end :data:`os.SEEK_END` the whole file is decompressed
+ once to determine it's size. Therefore it is preferred to use
+ :data:`os.SEEK_SET` or :data:`os.SEEK_CUR` to avoid the processing
+ overhead
+ * seeking backwards from the current position rewinds the file to ``0``
and decompresses the chunks to the requested offset
* seeking is only supported in files opened for reading
- * if the new offset is out of bound, it is adjusted to either 0 or EOF.
+ * if the new offset is out of bound, it is adjusted to either ``0`` or
+ ``EOF``.
Args:
- offset: seek offset in the uncompressed content represented as number
- whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
- os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
- (seek relative to the end, offset should be negative).
+ offset (int): seek offset in the uncompressed content represented as
+ number
+ whence (int): seek mode. Supported modes are :data:`os.SEEK_SET`
+ (absolute seek), :data:`os.SEEK_CUR` (seek relative to the current
+ position), and :data:`os.SEEK_END` (seek relative to the end, offset
+ should be negative).
Raises:
- IOError: When this buffer is closed.
- ValueError: When whence is invalid or the file is not seekable
+ ~exceptions.IOError: When this buffer is closed.
+ ~exceptions.ValueError: When whence is invalid or the file is not seekable
"""
if whence == os.SEEK_SET:
absolute_offset = offset
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index db6715a..33d67bf 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -330,45 +330,49 @@ class BigQuerySource(dataflow_io.NativeSource):
def __init__(self, table=None, dataset=None, project=None, query=None,
validate=False, coder=None, use_standard_sql=False,
flatten_results=True):
- """Initialize a BigQuerySource.
+ """Initialize a :class:`BigQuerySource`.
Args:
- table: The ID of a BigQuery table. If specified all data of the table
- will be used as input of the current source. The ID must contain only
- letters (a-z, A-Z), numbers (0-9), or underscores (_). If dataset
- and query arguments are None then the table argument must contain the
- entire table reference specified as: 'DATASET.TABLE' or
- 'PROJECT:DATASET.TABLE'.
- dataset: The ID of the dataset containing this table or null if the table
- reference is specified entirely by the table argument or a query is
- specified.
- project: The ID of the project containing this table or null if the table
- reference is specified entirely by the table argument or a query is
- specified.
- query: A query to be used instead of arguments table, dataset, and
+ table (str): The ID of a BigQuery table. If specified all data of the
+ table will be used as input of the current source. The ID must contain
+ only letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores
+ ``_``. If dataset and query arguments are :data:`None` then the table
+ argument must contain the entire table reference specified as:
+ ``'DATASET.TABLE'`` or ``'PROJECT:DATASET.TABLE'``.
+ dataset (str): The ID of the dataset containing this table or
+ :data:`None` if the table reference is specified entirely by the table
+ argument or a query is specified.
+ project (str): The ID of the project containing this table or
+ :data:`None` if the table reference is specified entirely by the table
+ argument or a query is specified.
+ query (str): A query to be used instead of arguments table, dataset, and
project.
- validate: If true, various checks will be done when source gets
- initialized (e.g., is table present?). This should be True for most
- scenarios in order to catch errors as early as possible (pipeline
- construction instead of pipeline execution). It should be False if the
- table is created during pipeline execution by a previous step.
- coder: The coder for the table rows if serialized to disk. If None, then
- the default coder is RowAsDictJsonCoder, which will interpret every line
- in a file as a JSON serialized dictionary. This argument needs a value
- only in special cases when returning table rows as dictionaries is not
- desirable.
- use_standard_sql: Specifies whether to use BigQuery's standard
- SQL dialect for this query. The default value is False. If set to True,
- the query will use BigQuery's updated SQL dialect with improved
- standards compliance. This parameter is ignored for table inputs.
- flatten_results: Flattens all nested and repeated fields in the
- query results. The default value is true.
+ validate (bool): If :data:`True`, various checks will be done when source
+ gets initialized (e.g., is table present?). This should be
+ :data:`True` for most scenarios in order to catch errors as early as
+ possible (pipeline construction instead of pipeline execution). It
+ should be :data:`False` if the table is created during pipeline
+ execution by a previous step.
+ coder (~apache_beam.coders.coders.Coder): The coder for the table
+ rows if serialized to disk. If :data:`None`, then the default coder is
+ :class:`~apache_beam.io.gcp.bigquery.RowAsDictJsonCoder`,
+ which will interpret every line in a file as a JSON serialized
+ dictionary. This argument needs a value only in special cases when
+ returning table rows as dictionaries is not desirable.
+ use_standard_sql (bool): Specifies whether to use BigQuery's standard SQL
+ dialect for this query. The default value is :data:`False`.
+ If set to :data:`True`, the query will use BigQuery's updated SQL
+ dialect with improved standards compliance.
+ This parameter is ignored for table inputs.
+ flatten_results (bool): Flattens all nested and repeated fields in the
+ query results. The default value is :data:`True`.
Raises:
- ValueError: if any of the following is true
- (1) the table reference as a string does not match the expected format
- (2) neither a table nor a query is specified
- (3) both a table and a query is specified.
+ ~exceptions.ValueError: if any of the following is true:
+
+ 1) the table reference as a string does not match the expected format
+ 2) neither a table nor a query is specified
+ 3) both a table and a query is specified.
"""
# Import here to avoid adding the dependency for local running scenarios.
@@ -439,46 +443,62 @@ class BigQuerySink(dataflow_io.NativeSink):
"""Initialize a BigQuerySink.
Args:
- table: The ID of the table. The ID must contain only letters
- (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
- None then the table argument must contain the entire table reference
- specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
- dataset: The ID of the dataset containing this table or null if the table
- reference is specified entirely by the table argument.
- project: The ID of the project containing this table or null if the table
- reference is specified entirely by the table argument.
- schema: The schema to be used if the BigQuery table to write has to be
- created. This can be either specified as a 'bigquery.TableSchema' object
- or a single string of the form 'field1:type1,field2:type2,field3:type3'
- that defines a comma separated list of fields. Here 'type' should
- specify the BigQuery type of the field. Single string based schemas do
- not support nested fields, repeated fields, or specifying a BigQuery
- mode for fields (mode will always be set to 'NULLABLE').
- create_disposition: A string describing what happens if the table does not
- exist. Possible values are:
- - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
- - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
- write_disposition: A string describing what happens if the table has
- already some data. Possible values are:
- - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
- - BigQueryDisposition.WRITE_APPEND: add to existing rows.
- - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
- validate: If true, various checks will be done when sink gets
- initialized (e.g., is table present given the disposition arguments?).
- This should be True for most scenarios in order to catch errors as early
- as possible (pipeline construction instead of pipeline execution). It
- should be False if the table is created during pipeline execution by a
- previous step.
- coder: The coder for the table rows if serialized to disk. If None, then
- the default coder is RowAsDictJsonCoder, which will interpret every
- element written to the sink as a dictionary that will be JSON serialized
- as a line in a file. This argument needs a value only in special cases
- when writing table rows as dictionaries is not desirable.
+ table (str): The ID of the table. The ID must contain only letters
+ ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If
+ **dataset** argument is :data:`None` then the table argument must
+ contain the entire table reference specified as: ``'DATASET.TABLE'`` or
+ ``'PROJECT:DATASET.TABLE'``.
+ dataset (str): The ID of the dataset containing this table or
+ :data:`None` if the table reference is specified entirely by the table
+ argument.
+ project (str): The ID of the project containing this table or
+ :data:`None` if the table reference is specified entirely by the table
+ argument.
+ schema (str): The schema to be used if the BigQuery table to write has
+ to be created. This can be either specified as a
+ :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` object or a single string of the form
+ ``'field1:type1,field2:type2,field3:type3'`` that defines a comma
+ separated list of fields. Here ``'type'`` should specify the BigQuery
+ type of the field. Single string based schemas do not support nested
+ fields, repeated fields, or specifying a BigQuery mode for fields (mode
+ will always be set to ``'NULLABLE'``).
+ create_disposition (BigQueryDisposition): A string describing what
+ happens if the table does not exist. Possible values are:
+
+ * :attr:`BigQueryDisposition.CREATE_IF_NEEDED`: create if does not
+ exist.
+ * :attr:`BigQueryDisposition.CREATE_NEVER`: fail the write if does not
+ exist.
+
+ write_disposition (BigQueryDisposition): A string describing what
+ happens if the table has already some data. Possible values are:
+
+ * :attr:`BigQueryDisposition.WRITE_TRUNCATE`: delete existing rows.
+ * :attr:`BigQueryDisposition.WRITE_APPEND`: add to existing rows.
+ * :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not
+ empty.
+
+ validate (bool): If :data:`True`, various checks will be done when sink
+ gets initialized (e.g., is table present given the disposition
+ arguments?). This should be :data:`True` for most scenarios in order to
+ catch errors as early as possible (pipeline construction instead of
+ pipeline execution). It should be :data:`False` if the table is created
+ during pipeline execution by a previous step.
+ coder (~apache_beam.coders.coders.Coder): The coder for the
+ table rows if serialized to disk. If :data:`None`, then the default
+ coder is :class:`~apache_beam.io.gcp.bigquery.RowAsDictJsonCoder`,
+ which will interpret every element written to the sink as a dictionary
+ that will be JSON serialized as a line in a file. This argument needs a
+ value only in special cases when writing table rows as dictionaries is
+ not desirable.
Raises:
- TypeError: if the schema argument is not a string or a TableSchema object.
- ValueError: if the table reference as a string does not match the expected
- format.
+ ~exceptions.TypeError: if the schema argument is not a :class:`str` or a
+ :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` object.
+ ~exceptions.ValueError: if the table reference as a string does not
+ match the expected format.
"""
# Import here to avoid adding the dependency for local running scenarios.
@@ -1261,32 +1281,47 @@ class WriteToBigQuery(PTransform):
"""Initialize a WriteToBigQuery transform.
Args:
- table: The ID of the table. The ID must contain only letters
- (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
- None then the table argument must contain the entire table reference
- specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
- dataset: The ID of the dataset containing this table or null if the table
- reference is specified entirely by the table argument.
- project: The ID of the project containing this table or null if the table
- reference is specified entirely by the table argument.
- schema: The schema to be used if the BigQuery table to write has to be
- created. This can be either specified as a 'bigquery.TableSchema' object
- or a single string of the form 'field1:type1,field2:type2,field3:type3'
- that defines a comma separated list of fields. Here 'type' should
- specify the BigQuery type of the field. Single string based schemas do
- not support nested fields, repeated fields, or specifying a BigQuery
- mode for fields (mode will always be set to 'NULLABLE').
- create_disposition: A string describing what happens if the table does not
- exist. Possible values are:
- - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
- - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
- write_disposition: A string describing what happens if the table has
- already some data. Possible values are:
- - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
- - BigQueryDisposition.WRITE_APPEND: add to existing rows.
- - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
+ table (str): The ID of the table. The ID must contain only letters
+ ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If dataset
+ argument is :data:`None` then the table argument must contain the
+ entire table reference specified as: ``'DATASET.TABLE'`` or
+ ``'PROJECT:DATASET.TABLE'``.
+ dataset (str): The ID of the dataset containing this table or
+ :data:`None` if the table reference is specified entirely by the table
+ argument.
+ project (str): The ID of the project containing this table or
+ :data:`None` if the table reference is specified entirely by the table
+ argument.
+ schema (str): The schema to be used if the BigQuery table to write has to
+ be created. This can be either specified as a
+ :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema`
+ object or a single string of the form
+ ``'field1:type1,field2:type2,field3:type3'`` that defines a comma
+ separated list of fields. Here ``'type'`` should specify the BigQuery
+ type of the field. Single string based schemas do not support nested
+ fields, repeated fields, or specifying a BigQuery mode for fields
+ (mode will always be set to ``'NULLABLE'``).
+ create_disposition (BigQueryDisposition): A string describing what
+ happens if the table does not exist. Possible values are:
+
+ * :attr:`BigQueryDisposition.CREATE_IF_NEEDED`: create if does not
+ exist.
+ * :attr:`BigQueryDisposition.CREATE_NEVER`: fail the write if does not
+ exist.
+
+ write_disposition (BigQueryDisposition): A string describing what happens
+ if the table has already some data. Possible values are:
+
+ * :attr:`BigQueryDisposition.WRITE_TRUNCATE`: delete existing rows.
+ * :attr:`BigQueryDisposition.WRITE_APPEND`: add to existing rows.
+ * :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not
+ empty.
+
For streaming pipelines WriteTruncate can not be used.
- batch_size: Number of rows to be written to BQ per streaming API insert.
+
+ batch_size (int): Number of rows to be written to BQ per streaming API
+ insert.
test_client: Override the default bigquery client used for testing.
"""
self.table_reference = _parse_table_reference(table, dataset, project)
@@ -1300,14 +1335,20 @@ class WriteToBigQuery(PTransform):
@staticmethod
def get_table_schema_from_string(schema):
- """Transform the string table schema into a bigquery.TableSchema instance.
+ """Transform the string table schema into a
+ :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` instance.
Args:
- schema: The sting schema to be used if the BigQuery table to write has
- to be created.
+ schema (str): The sting schema to be used if the BigQuery table to write
+ has to be created.
+
Returns:
- table_schema: The schema to be used if the BigQuery table to write has
- to be created but in the bigquery.TableSchema format.
+ ~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema:
+ The schema to be used if the BigQuery table to write has to be created
+ but in the :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` format.
"""
table_schema = bigquery.TableSchema()
schema_list = [s.strip() for s in schema.split(',')]
@@ -1349,12 +1390,14 @@ class WriteToBigQuery(PTransform):
"""Transform the table schema into a dictionary instance.
Args:
- schema: The schema to be used if the BigQuery table to write has to be
- created. This can either be a dict or string or in the TableSchema
- format.
+ schema (~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema):
+ The schema to be used if the BigQuery table to write has to be created.
+ This can either be a dict or string or in the TableSchema format.
+
Returns:
- table_schema: The schema to be used if the BigQuery table to write has
- to be created but in the dictionary format.
+ Dict[str, Any]: The schema to be used if the BigQuery table to write has
+ to be created but in the dictionary format.
"""
if isinstance(schema, dict):
return schema
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 643fbc7..ae71a5f 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -137,16 +137,16 @@ class GcsIO(object):
"""Open a GCS file path for reading or writing.
Args:
- filename: GCS file path in the form gs://<bucket>/<object>.
- mode: 'r' for reading or 'w' for writing.
- read_buffer_size: Buffer size to use during read operations.
- mime_type: Mime type to set for write operations.
+ filename (str): GCS file path in the form ``gs://<bucket>/<object>``.
+ mode (str): ``'r'`` for reading or ``'w'`` for writing.
+ read_buffer_size (int): Buffer size to use during read operations.
+ mime_type (str): Mime type to set for write operations.
Returns:
- file object.
+ GCS file object.
Raises:
- ValueError: Invalid open file mode.
+ ~exceptions.ValueError: Invalid open file mode.
"""
if mode == 'r' or mode == 'rb':
return GcsBufferedReader(self.client, filename, mode=mode,
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index 4bd19f8..1339b91 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -317,17 +317,19 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
class UnsplittableRangeTracker(iobase.RangeTracker):
"""A RangeTracker that always ignores split requests.
- This can be used to make a given ``RangeTracker`` object unsplittable by
- ignoring all calls to ``try_split()``. All other calls will be delegated to
- the given ``RangeTracker``.
+ This can be used to make a given
+ :class:`~apache_beam.io.iobase.RangeTracker` object unsplittable by
+ ignoring all calls to :meth:`.try_split()`. All other calls will be delegated
+ to the given :class:`~apache_beam.io.iobase.RangeTracker`.
"""
def __init__(self, range_tracker):
"""Initializes UnsplittableRangeTracker.
Args:
- range_tracker: a ``RangeTracker`` to which all method calls expect calls
- to ``try_split()`` will be delegated.
+ range_tracker (~apache_beam.io.iobase.RangeTracker): a
+ :class:`~apache_beam.io.iobase.RangeTracker` to which all method
+ calls expect calls to :meth:`.try_split()` will be delegated.
"""
assert isinstance(range_tracker, iobase.RangeTracker)
self._range_tracker = range_tracker
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index a144a8a..bea9708 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -80,12 +80,13 @@ def read_from_source(source, start_position=None, stop_position=None):
Only reads elements within the given position range.
Args:
- source: ``iobase.BoundedSource`` implementation.
- start_position: start position for reading.
- stop_position: stop position for reading.
+ source (~apache_beam.io.iobase.BoundedSource):
+ :class:`~apache_beam.io.iobase.BoundedSource` implementation.
+ start_position (int): start position for reading.
+ stop_position (int): stop position for reading.
Returns:
- the set of values read from the sources.
+ List[str]: the set of values read from the sources.
"""
values = []
range_tracker = source.get_range_tracker(start_position, stop_position)
@@ -108,21 +109,25 @@ def _ThreadPool(threads):
def assert_sources_equal_reference_source(reference_source_info, sources_info):
"""Tests if a reference source is equal to a given set of sources.
- Given a reference source (a ``BoundedSource`` and a position range) and a
- list of sources, assert that the union of the records
- read from the list of sources is equal to the records read from the
+ Given a reference source (a :class:`~apache_beam.io.iobase.BoundedSource`
+ and a position range) and a list of sources, assert that the union of the
+ records read from the list of sources is equal to the records read from the
reference source.
Args:
- reference_source_info: a three-tuple that gives the reference
- ``iobase.BoundedSource``, position to start reading
- at, and position to stop reading at.
- sources_info: a set of sources. Each source is a three-tuple that is of
- the same format described above.
+ reference_source_info\
+ (Tuple[~apache_beam.io.iobase.BoundedSource, int, int]):
+ a three-tuple that gives the reference
+ :class:`~apache_beam.io.iobase.BoundedSource`, position to start
+ reading at, and position to stop reading at.
+ sources_info\
+ (Iterable[Tuple[~apache_beam.io.iobase.BoundedSource, int, int]]):
+ a set of sources. Each source is a three-tuple that is of the same
+ format described above.
Raises:
- ValueError: if the set of data produced by the reference source and the
- given set of sources are not equivalent.
+ ~exceptions.ValueError: if the set of data produced by the reference source
+ and the given set of sources are not equivalent.
"""
@@ -172,18 +177,20 @@ def assert_sources_equal_reference_source(reference_source_info, sources_info):
def assert_reentrant_reads_succeed(source_info):
"""Tests if a given source can be read in a reentrant manner.
- Assume that given source produces the set of values {v1, v2, v3, ... vn}. For
- i in range [1, n-1] this method performs a reentrant read after reading i
- elements and verifies that both the original and reentrant read produce the
- expected set of values.
+ Assume that given source produces the set of values ``{v1, v2, v3, ... vn}``.
+ For ``i`` in range ``[1, n-1]`` this method performs a reentrant read after
+ reading ``i`` elements and verifies that both the original and reentrant read
+ produce the expected set of values.
Args:
- source_info: a three-tuple that gives the reference
- ``iobase.BoundedSource``, position to start reading at, and a
- position to stop reading at.
+ source_info (Tuple[~apache_beam.io.iobase.BoundedSource, int, int]):
+ a three-tuple that gives the reference
+ :class:`~apache_beam.io.iobase.BoundedSource`, position to start reading
+ at, and a position to stop reading at.
+
Raises:
- ValueError: if source is too trivial or reentrant read result in an
- incorrect read.
+ ~exceptions.ValueError: if source is too trivial or reentrant read result
+ in an incorrect read.
"""
source, start_position, stop_position = source_info
@@ -228,21 +235,25 @@ def assert_split_at_fraction_behavior(source, num_items_to_read_before_split,
split_fraction, expected_outcome):
"""Verifies the behaviour of splitting a source at a given fraction.
- Asserts that splitting a ``BoundedSource`` either fails after reading
- ``num_items_to_read_before_split`` items, or succeeds in a way that is
- consistent according to ``assertSplitAtFractionSucceedsAndConsistent()``.
+ Asserts that splitting a :class:`~apache_beam.io.iobase.BoundedSource` either
+ fails after reading **num_items_to_read_before_split** items, or succeeds in
+ a way that is consistent according to
+ :func:`assert_split_at_fraction_succeeds_and_consistent()`.
Args:
- source: the source to perform dynamic splitting on.
- num_items_to_read_before_split: number of items to read before splitting.
- split_fraction: fraction to split at.
- expected_outcome: a value from 'ExpectedSplitOutcome'.
+ source (~apache_beam.io.iobase.BoundedSource): the source to perform
+ dynamic splitting on.
+ num_items_to_read_before_split (int): number of items to read before
+ splitting.
+ split_fraction (float): fraction to split at.
+ expected_outcome (int): a value from
+ :class:`~apache_beam.io.source_test_utils.ExpectedSplitOutcome`.
Returns:
- a tuple that gives the number of items produced by reading the two ranges
- produced after dynamic splitting. If splitting did not occur, the first
- value of the tuple will represent the full set of records read by the
- source while the second value of the tuple will be '-1'.
+ Tuple[int, int]: a tuple that gives the number of items produced by reading
+ the two ranges produced after dynamic splitting. If splitting did not
+ occur, the first value of the tuple will represent the full set of records
+ read by the source while the second value of the tuple will be ``-1``.
"""
assert isinstance(source, iobase.BoundedSource)
expected_items = read_from_source(source, None, None)
@@ -503,12 +514,13 @@ def assert_split_at_fraction_exhaustive(
Verifies multi threaded splitting as well.
Args:
- source: the source to perform dynamic splitting on.
- perform_multi_threaded_test: if true performs a multi-threaded test
- otherwise this test is skipped.
+ source (~apache_beam.io.iobase.BoundedSource): the source to perform
+ dynamic splitting on.
+ perform_multi_threaded_test (bool): if :data:`True` performs a
+ multi-threaded test, otherwise this test is skipped.
Raises:
- ValueError: if the exhaustive splitting test fails.
+ ~exceptions.ValueError: if the exhaustive splitting test fails.
"""
expected_items = read_from_source(source, start_position, stop_position)
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 9c6532e..9708df7 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -417,13 +417,15 @@ class ReadAllFromText(PTransform):
class ReadFromText(PTransform):
- """A ``PTransform`` for reading text files.
+ r"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading text
+ files.
Parses a text file as newline-delimited elements, by default assuming
- UTF-8 encoding. Supports newline delimiters '\\n' and '\\r\\n'.
+ ``UTF-8`` encoding. Supports newline delimiters ``\n`` and ``\r\n``.
- This implementation only supports reading text encoded using UTF-8 or ASCII.
- This does not support other encodings such as UTF-16 or UTF-32.
+ This implementation only supports reading text encoded using ``UTF-8`` or
+ ``ASCII``.
+ This does not support other encodings such as ``UTF-16`` or ``UTF-32``.
"""
def __init__(
self,
@@ -435,26 +437,28 @@ class ReadFromText(PTransform):
validate=True,
skip_header_lines=0,
**kwargs):
- """Initialize the ``ReadFromText`` transform.
+ """Initialize the :class:`ReadFromText` transform.
Args:
- file_pattern: The file path to read from as a local file path or a GCS
- ``gs://`` path. The path can contain glob characters
- ``(*, ?, and [...] sets)``.
- min_bundle_size: Minimum size of bundles that should be generated when
- splitting this source into bundles. See ``FileBasedSource`` for more
+ file_pattern (str): The file path to read from as a local file path or a
+ GCS ``gs://`` path. The path can contain glob characters
+ (``*``, ``?``, and ``[...]`` sets).
+ min_bundle_size (int): Minimum size of bundles that should be generated
+ when splitting this source into bundles. See
+ :class:`~apache_beam.io.filebasedsource.FileBasedSource` for more
details.
- compression_type: Used to handle compressed input files. Typical value
- is ``CompressionTypes.AUTO``, in which case the underlying file_path's
- extension will be used to detect the compression.
- strip_trailing_newlines: Indicates whether this source should remove
- the newline char in each line it reads before decoding that line.
- validate: flag to verify that the files exist during the pipeline
+ compression_type (str): Used to handle compressed input files.
+ Typical value is :attr:`CompressionTypes.AUTO
+ <apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the
+ underlying file_path's extension will be used to detect the compression.
+ strip_trailing_newlines (bool): Indicates whether this source should
+ remove the newline char in each line it reads before decoding that line.
+ validate (bool): flag to verify that the files exist during the pipeline
creation time.
- skip_header_lines: Number of header lines to skip. Same number is skipped
- from each source file. Must be 0 or higher. Large number of skipped
- lines might impact performance.
- coder: Coder used to decode each line.
+ skip_header_lines (int): Number of header lines to skip. Same number is
+ skipped from each source file. Must be 0 or higher. Large number of
+ skipped lines might impact performance.
+ coder (~apache_beam.coders.coders.Coder): Coder used to decode each line.
"""
super(ReadFromText, self).__init__(**kwargs)
@@ -468,49 +472,54 @@ class ReadFromText(PTransform):
class WriteToText(PTransform):
- """A PTransform for writing to text files."""
+ """A :class:`~apache_beam.transforms.ptransform.PTransform` for writing to
+ text files."""
- def __init__(self,
- file_path_prefix,
- file_name_suffix='',
- append_trailing_newlines=True,
- num_shards=0,
- shard_name_template=None,
- coder=coders.ToStringCoder(),
- compression_type=CompressionTypes.AUTO,
- header=None):
- """Initialize a WriteToText PTransform.
+ def __init__(
+ self,
+ file_path_prefix,
+ file_name_suffix='',
+ append_trailing_newlines=True,
+ num_shards=0,
+ shard_name_template=None,
+ coder=coders.ToStringCoder(),
+ compression_type=CompressionTypes.AUTO,
+ header=None):
+ r"""Initialize a :class:`WriteToText` transform.
Args:
- file_path_prefix: The file path to write to. The files written will begin
- with this prefix, followed by a shard identifier (see num_shards), and
- end in a common extension, if given by file_name_suffix. In most cases,
- only this argument is specified and num_shards, shard_name_template, and
- file_name_suffix use default values.
- file_name_suffix: Suffix for the files written.
- append_trailing_newlines: indicate whether this sink should write an
- additional newline char after writing each element.
- num_shards: The number of files (shards) used for output. If not set, the
- service will decide on the optimal number of shards.
+ file_path_prefix (str): The file path to write to. The files written will
+ begin with this prefix, followed by a shard identifier (see
+ **num_shards**), and end in a common extension, if given by
+ **file_name_suffix**. In most cases, only this argument is specified and
+ **num_shards**, **shard_name_template**, and **file_name_suffix** use
+ default values.
+ file_name_suffix (str): Suffix for the files written.
+ append_trailing_newlines (bool): indicate whether this sink should write
+ an additional newline char after writing each element.
+ num_shards (int): The number of files (shards) used for output.
+ If not set, the service will decide on the optimal number of shards.
Constraining the number of shards is likely to reduce
the performance of a pipeline. Setting this value is not recommended
unless you require a specific number of output files.
- shard_name_template: A template string containing placeholders for
- the shard number and shard count. Currently only '' and
- '-SSSSS-of-NNNNN' are patterns accepted by the service.
+ shard_name_template (str): A template string containing placeholders for
+ the shard number and shard count. Currently only ``''`` and
+ ``'-SSSSS-of-NNNNN'`` are patterns accepted by the service.
When constructing a filename for a particular shard number, the
- upper-case letters 'S' and 'N' are replaced with the 0-padded shard
- number and shard count respectively. This argument can be '' in which
- case it behaves as if num_shards was set to 1 and only one file will be
- generated. The default pattern used is '-SSSSS-of-NNNNN'.
- coder: Coder used to encode each line.
- compression_type: Used to handle compressed output files. Typical value
- is CompressionTypes.AUTO, in which case the final file path's
- extension (as determined by file_path_prefix, file_name_suffix,
- num_shards and shard_name_template) will be used to detect the
- compression.
- header: String to write at beginning of file as a header. If not None and
- append_trailing_newlines is set, '\n' will be added.
+ upper-case letters ``S`` and ``N`` are replaced with the ``0``-padded
+ shard number and shard count respectively. This argument can be ``''``
+ in which case it behaves as if num_shards was set to 1 and only one file
+ will be generated. The default pattern used is ``'-SSSSS-of-NNNNN'``.
+ coder (~apache_beam.coders.coders.Coder): Coder used to encode each line.
+ compression_type (str): Used to handle compressed output files.
+ Typical value is :class:`CompressionTypes.AUTO
+ <apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the
+ final file path's extension (as determined by **file_path_prefix**,
+ **file_name_suffix**, **num_shards** and **shard_name_template**) will
+ be used to detect the compression.
+ header (str): String to write at beginning of file as a header.
+ If not :data:`None` and **append_trailing_newlines** is set, ``\n`` will
+ be added.
"""
self._sink = _TextSink(file_path_prefix, file_name_suffix,
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index e7c2322..1ade6c0 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -15,17 +15,18 @@
# limitations under the License.
#
-"""Pipeline, the top-level Dataflow object.
+"""Pipeline, the top-level Beam object.
A pipeline holds a DAG of data transforms. Conceptually the nodes of the DAG
-are transforms (PTransform objects) and the edges are values (mostly PCollection
+are transforms (:class:`~apache_beam.transforms.ptransform.PTransform` objects)
+and the edges are values (mostly :class:`~apache_beam.pvalue.PCollection`
objects). The transforms take as inputs one or more PValues and output one or
-more PValues.
+more :class:`~apache_beam.pvalue.PValue` s.
The pipeline offers functionality to traverse the graph. The actual operation
to be executed for each node visited is specified through a runner object.
-Typical usage:
+Typical usage::
# Create a pipeline object using a local runner for execution.
with beam.Pipeline('DirectRunner') as p:
@@ -73,32 +74,40 @@ __all__ = ['Pipeline']
class Pipeline(object):
- """A pipeline object that manages a DAG of PValues and their PTransforms.
+ """A pipeline object that manages a DAG of
+ :class:`~apache_beam.pvalue.PValue` s and their
+ :class:`~apache_beam.transforms.ptransform.PTransform` s.
- Conceptually the PValues are the DAG's nodes and the PTransforms computing
- the PValues are the edges.
+ Conceptually the :class:`~apache_beam.pvalue.PValue` s are the DAG's nodes and
+ the :class:`~apache_beam.transforms.ptransform.PTransform` s computing
+ the :class:`~apache_beam.pvalue.PValue` s are the edges.
All the transforms applied to the pipeline must have distinct full labels.
If same transform instance needs to be applied then the right shift operator
- should be used to designate new names (e.g. `input | "label" >> my_tranform`).
+ should be used to designate new names
+ (e.g. ``input | "label" >> my_tranform``).
"""
def __init__(self, runner=None, options=None, argv=None):
"""Initialize a pipeline object.
Args:
- runner: An object of type 'PipelineRunner' that will be used to execute
- the pipeline. For registered runners, the runner name can be specified,
- otherwise a runner object must be supplied.
- options: A configured 'PipelineOptions' object containing arguments
- that should be used for running the Dataflow job.
- argv: a list of arguments (such as sys.argv) to be used for building a
- 'PipelineOptions' object. This will only be used if argument 'options'
- is None.
+ runner (~apache_beam.runners.runner.PipelineRunner): An object of
+ type :class:`~apache_beam.runners.runner.PipelineRunner` that will be
+ used to execute the pipeline. For registered runners, the runner name
+ can be specified, otherwise a runner object must be supplied.
+ options (~apache_beam.options.pipeline_options.PipelineOptions):
+ A configured
+ :class:`~apache_beam.options.pipeline_options.PipelineOptions` object
+ containing arguments that should be used for running the Beam job.
+ argv (List[str]): a list of arguments (such as :data:`sys.argv`)
+ to be used for building a
+ :class:`~apache_beam.options.pipeline_options.PipelineOptions` object.
+ This will only be used if argument **options** is :data:`None`.
Raises:
- ValueError: if either the runner or options argument is not of the
- expected type.
+ ~exceptions.ValueError: if either the runner or options argument is not
+ of the expected type.
"""
if options is not None:
if isinstance(options, PipelineOptions):
@@ -292,13 +301,15 @@ class Pipeline(object):
def replace_all(self, replacements):
""" Dynamically replaces PTransforms in the currently populated hierarchy.
- Currently this only works for replacements where input and output types
- are exactly the same.
- TODO: Update this to also work for transform overrides where input and
- output types are different.
+ Currently this only works for replacements where input and output types
+ are exactly the same.
+
+ TODO: Update this to also work for transform overrides where input and
+ output types are different.
Args:
- replacements a list of PTransformOverride objects.
+ replacements (List[~apache_beam.pipeline.PTransformOverride]): a list of
+ :class:`~apache_beam.pipeline.PTransformOverride` objects.
"""
for override in replacements:
assert isinstance(override, PTransformOverride)
@@ -341,13 +352,16 @@ class Pipeline(object):
Runner-internal implementation detail; no backwards-compatibility guarantees
Args:
- visitor: PipelineVisitor object whose callbacks will be called for each
- node visited. See PipelineVisitor comments.
+ visitor (~apache_beam.pipeline.PipelineVisitor):
+ :class:`~apache_beam.pipeline.PipelineVisitor` object whose callbacks
+ will be called for each node visited. See
+ :class:`~apache_beam.pipeline.PipelineVisitor` comments.
Raises:
- TypeError: if node is specified and is not a PValue.
- pipeline.PipelineError: if node is specified and does not belong to this
- pipeline instance.
+ ~exceptions.TypeError: if node is specified and is not a
+ :class:`~apache_beam.pvalue.PValue`.
+ ~apache_beam.error.PipelineError: if node is specified and does not
+ belong to this pipeline instance.
"""
visited = set()
@@ -357,15 +371,20 @@ class Pipeline(object):
"""Applies a custom transform using the pvalueish specified.
Args:
- transform: the PTranform to apply.
- pvalueish: the input for the PTransform (typically a PCollection).
- label: label of the PTransform.
+ transform (~apache_beam.transforms.ptransform.PTransform): the
+ :class:`~apache_beam.transforms.ptransform.PTransform` to apply.
+ pvalueish (~apache_beam.pvalue.PCollection): the input for the
+ :class:`~apache_beam.transforms.ptransform.PTransform` (typically a
+ :class:`~apache_beam.pvalue.PCollection`).
+ label (str): label of the
+ :class:`~apache_beam.transforms.ptransform.PTransform`.
Raises:
- TypeError: if the transform object extracted from the argument list is
- not a PTransform.
- RuntimeError: if the transform object was already applied to this pipeline
- and needs to be cloned in order to apply again.
+ ~exceptions.TypeError: if the transform object extracted from the
+ argument list is not a
+ :class:`~apache_beam.transforms.ptransform.PTransform`.
+ ~exceptions.RuntimeError: if the transform object was already applied to
+ this pipeline and needs to be cloned in order to apply again.
"""
if isinstance(transform, ptransform._NamedPTransform):
return self.apply(transform.transform, pvalueish,
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 7ce9a03..a3c6b34 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -283,10 +283,10 @@ class PValueCache(object):
class PipelineState(object):
- """State of the Pipeline, as returned by PipelineResult.state.
+ """State of the Pipeline, as returned by :attr:`PipelineResult.state`.
This is meant to be the union of all the states any runner can put a
- pipeline in. Currently, it represents the values of the dataflow
+ pipeline in. Currently, it represents the values of the dataflow
API JobState enum.
"""
UNKNOWN = 'UNKNOWN' # not specified
@@ -301,7 +301,7 @@ class PipelineState(object):
class PipelineResult(object):
- """A PipelineResult provides access to info about a pipeline."""
+ """A :class:`PipelineResult` provides access to info about a pipeline."""
def __init__(self, state):
self._state = state
@@ -315,15 +315,18 @@ class PipelineResult(object):
"""Waits until the pipeline finishes and returns the final status.
Args:
- duration: The time to wait (in milliseconds) for job to finish. If it is
- set to None, it will wait indefinitely until the job is finished.
+ duration (int): The time to wait (in milliseconds) for job to finish.
+ If it is set to :data:`None`, it will wait indefinitely until the job
+ is finished.
Raises:
- IOError: If there is a persistent problem getting job information.
- NotImplementedError: If the runner does not support this operation.
+ ~exceptions.IOError: If there is a persistent problem getting job
+ information.
+ ~exceptions.NotImplementedError: If the runner does not support this
+ operation.
Returns:
- The final state of the pipeline, or None on timeout.
+ The final state of the pipeline, or :data:`None` on timeout.
"""
raise NotImplementedError
@@ -331,8 +334,10 @@ class PipelineResult(object):
"""Cancels the pipeline execution.
Raises:
- IOError: If there is a persistent problem getting job information.
- NotImplementedError: If the runner does not support this operation.
+ ~exceptions.IOError: If there is a persistent problem getting job
+ information.
+ ~exceptions.NotImplementedError: If the runner does not support this
+ operation.
Returns:
The final state of the pipeline.
@@ -340,10 +345,12 @@ class PipelineResult(object):
raise NotImplementedError
def metrics(self):
- """Returns MetricsResult object to query metrics from the runner.
+ """Returns :class:`~apache_beam.metrics.metric.MetricResults` object to
+ query metrics from the runner.
Raises:
- NotImplementedError: If the runner does not support this operation.
+ ~exceptions.NotImplementedError: If the runner does not support this
+ operation.
"""
raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/testing/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py
index 13b1639..8380242 100644
--- a/sdks/python/apache_beam/testing/test_pipeline.py
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -33,23 +33,23 @@ __all__ = [
class TestPipeline(Pipeline):
- """TestPipeline class is used inside of Beam tests that can be configured to
- run against pipeline runner.
+ """:class:`TestPipeline` class is used inside of Beam tests that can be
+ configured to run against pipeline runner.
It has a functionality to parse arguments from command line and build pipeline
options for tests who runs against a pipeline runner and utilizes resources
of the pipeline runner. Those test functions are recommended to be tagged by
- @attr("ValidatesRunner") annotation.
+ ``@attr("ValidatesRunner")`` annotation.
In order to configure the test with customized pipeline options from command
- line, system argument 'test-pipeline-options' can be used to obtains a list
- of pipeline options. If no options specified, default value will be used.
+ line, system argument ``--test-pipeline-options`` can be used to obtains a
+ list of pipeline options. If no options specified, default value will be used.
For example, use following command line to execute all ValidatesRunner tests::
- python setup.py nosetests -a ValidatesRunner \
- --test-pipeline-options="--runner=DirectRunner \
- --job_name=myJobName \
+ python setup.py nosetests -a ValidatesRunner \\
+ --test-pipeline-options="--runner=DirectRunner \\
+ --job_name=myJobName \\
--num_workers=1"
For example, use assert_that for test validation::
@@ -69,21 +69,27 @@ class TestPipeline(Pipeline):
"""Initialize a pipeline object for test.
Args:
- runner: An object of type 'PipelineRunner' that will be used to execute
- the pipeline. For registered runners, the runner name can be specified,
- otherwise a runner object must be supplied.
- options: A configured 'PipelineOptions' object containing arguments
- that should be used for running the pipeline job.
- argv: A list of arguments (such as sys.argv) to be used for building a
- 'PipelineOptions' object. This will only be used if argument 'options'
- is None.
- is_integration_test: True if the test is an integration test, False
- otherwise.
- blocking: Run method will wait until pipeline execution is completed.
+ runner (~apache_beam.runners.runner.PipelineRunner): An object of type
+ :class:`~apache_beam.runners.runner.PipelineRunner` that will be used
+ to execute the pipeline. For registered runners, the runner name can be
+ specified, otherwise a runner object must be supplied.
+ options (~apache_beam.options.pipeline_options.PipelineOptions):
+ A configured
+ :class:`~apache_beam.options.pipeline_options.PipelineOptions`
+ object containing arguments that should be used for running the
+ pipeline job.
+ argv (List[str]): A list of arguments (such as :data:`sys.argv`) to be
+ used for building a
+ :class:`~apache_beam.options.pipeline_options.PipelineOptions` object.
+ This will only be used if argument **options** is :data:`None`.
+ is_integration_test (bool): :data:`True` if the test is an integration
+ test, :data:`False` otherwise.
+ blocking (bool): Run method will wait until pipeline execution is
+ completed.
Raises:
- ValueError: if either the runner or options argument is not of the
- expected type.
+ ~exceptions.ValueError: if either the runner or options argument is not
+ of the expected type.
"""
self.is_integration_test = is_integration_test
self.options_list = self._parse_test_option_args(argv)
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 9018a49..d6f56d2 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -601,31 +601,35 @@ class CallableWrapperPartitionFn(PartitionFn):
class ParDo(PTransformWithSideInputs):
- """A ParDo transform.
+ """A :class:`ParDo` transform.
- Processes an input PCollection by applying a DoFn to each element and
- returning the accumulated results into an output PCollection. The type of the
- elements is not fixed as long as the DoFn can deal with it. In reality
- the type is restrained to some extent because the elements sometimes must be
- persisted to external storage. See the expand() method comments for a detailed
- description of all possible arguments.
+ Processes an input :class:`~apache_beam.pvalue.PCollection` by applying a
+ :class:`DoFn` to each element and returning the accumulated results into an
+ output :class:`~apache_beam.pvalue.PCollection`. The type of the elements is
+ not fixed as long as the :class:`DoFn` can deal with it. In reality the type
+ is restrained to some extent because the elements sometimes must be persisted
+ to external storage. See the :meth:`.expand()` method comments for a
+ detailed description of all possible arguments.
- Note that the DoFn must return an iterable for each element of the input
- PCollection. An easy way to do this is to use the yield keyword in the
- process method.
+ Note that the :class:`DoFn` must return an iterable for each element of the
+ input :class:`~apache_beam.pvalue.PCollection`. An easy way to do this is to
+ use the ``yield`` keyword in the process method.
Args:
- pcoll: a PCollection to be processed.
- fn: a DoFn object to be applied to each element of pcoll argument.
- *args: positional arguments passed to the dofn object.
- **kwargs: keyword arguments passed to the dofn object.
+ pcoll (~apache_beam.pvalue.PCollection):
+ a :class:`~apache_beam.pvalue.PCollection` to be processed.
+ fn (DoFn): a :class:`DoFn` object to be applied to each element
+ of **pcoll** argument.
+ *args: positional arguments passed to the :class:`DoFn` object.
+ **kwargs: keyword arguments passed to the :class:`DoFn` object.
Note that the positional and keyword arguments will be processed in order
- to detect PCollections that will be computed as side inputs to the
- transform. During pipeline execution whenever the DoFn object gets executed
- (its apply() method gets called) the PCollection arguments will be replaced
- by values from the PCollection in the exact positions where they appear in
- the argument lists.
+ to detect :class:`~apache_beam.pvalue.PCollection` s that will be computed as
+ side inputs to the transform. During pipeline execution whenever the
+ :class:`DoFn` object gets executed (its :meth:`DoFn.process()` method gets
+ called) the :class:`~apache_beam.pvalue.PCollection` arguments will be
+ replaced by values from the :class:`~apache_beam.pvalue.PCollection` in the
+ exact positions where they appear in the argument lists.
"""
def __init__(self, fn, *args, **kwargs):
@@ -665,27 +669,34 @@ class ParDo(PTransformWithSideInputs):
return pvalue.PCollection(pcoll.pipeline)
def with_outputs(self, *tags, **main_kw):
- """Returns a tagged tuple allowing access to the outputs of a ParDo.
+ """Returns a tagged tuple allowing access to the outputs of a
+ :class:`ParDo`.
The resulting object supports access to the
- PCollection associated with a tag (e.g., o.tag, o[tag]) and iterating over
- the available tags (e.g., for tag in o: ...).
+ :class:`~apache_beam.pvalue.PCollection` associated with a tag
+ (e.g. ``o.tag``, ``o[tag]``) and iterating over the available tags
+ (e.g. ``for tag in o: ...``).
Args:
*tags: if non-empty, list of valid tags. If a list of valid tags is given,
it will be an error to use an undeclared tag later in the pipeline.
- **main_kw: dictionary empty or with one key 'main' defining the tag to be
- used for the main output (which will not have a tag associated with it).
+ **main_kw: dictionary empty or with one key ``'main'`` defining the tag to
+ be used for the main output (which will not have a tag associated with
+ it).
Returns:
- An object of type DoOutputsTuple that bundles together all the outputs
- of a ParDo transform and allows accessing the individual
- PCollections for each output using an object.tag syntax.
+ ~apache_beam.pvalue.DoOutputsTuple: An object of type
+ :class:`~apache_beam.pvalue.DoOutputsTuple` that bundles together all
+ the outputs of a :class:`ParDo` transform and allows accessing the
+ individual :class:`~apache_beam.pvalue.PCollection` s for each output
+ using an ``object.tag`` syntax.
Raises:
- TypeError: if the self object is not a PCollection that is the result of
- a ParDo transform.
- ValueError: if main_kw contains any key other than 'main'.
+ ~exceptions.TypeError: if the **self** object is not a
+ :class:`~apache_beam.pvalue.PCollection` that is the result of a
+ :class:`ParDo` transform.
+ ~exceptions.ValueError: if **main_kw** contains any key other than
+ ``'main'``.
"""
main_tag = main_kw.pop('main', None)
if main_kw:
@@ -739,24 +750,27 @@ class _MultiParDo(PTransform):
def FlatMap(fn, *args, **kwargs): # pylint: disable=invalid-name
- """FlatMap is like ParDo except it takes a callable to specify the
- transformation.
+ """:func:`FlatMap` is like :class:`ParDo` except it takes a callable to
+ specify the transformation.
The callable must return an iterable for each element of the input
- PCollection. The elements of these iterables will be flattened into
- the output PCollection.
+ :class:`~apache_beam.pvalue.PCollection`. The elements of these iterables will
+ be flattened into the output :class:`~apache_beam.pvalue.PCollection`.
Args:
- fn: a callable object.
+ fn (callable): a callable object.
*args: positional arguments passed to the transform callable.
**kwargs: keyword arguments passed to the transform callable.
Returns:
- A PCollection containing the Map outputs.
+ ~apache_beam.pvalue.PCollection:
+ A :class:`~apache_beam.pvalue.PCollection` containing the
+ :func:`FlatMap` outputs.
Raises:
- TypeError: If the fn passed as argument is not a callable. Typical error
- is to pass a DoFn instance which is supported only for ParDo.
+ ~exceptions.TypeError: If the **fn** passed as argument is not a callable.
+ Typical error is to pass a :class:`DoFn` instance which is supported only
+ for :class:`ParDo`.
"""
label = 'FlatMap(%s)' % ptransform.label_from_callable(fn)
if not callable(fn):
@@ -770,19 +784,23 @@ def FlatMap(fn, *args, **kwargs): # pylint: disable=invalid-name
def Map(fn, *args, **kwargs): # pylint: disable=invalid-name
- """Map is like FlatMap except its callable returns only a single element.
+ """:func:`Map` is like :func:`FlatMap` except its callable returns only a
+ single element.
Args:
- fn: a callable object.
+ fn (callable): a callable object.
*args: positional arguments passed to the transform callable.
**kwargs: keyword arguments passed to the transform callable.
Returns:
- A PCollection containing the Map outputs.
+ ~apache_beam.pvalue.PCollection:
+ A :class:`~apache_beam.pvalue.PCollection` containing the
+ :func:`Map` outputs.
Raises:
- TypeError: If the fn passed as argument is not a callable. Typical error
- is to pass a DoFn instance which is supported only for ParDo.
+ ~exceptions.TypeError: If the **fn** passed as argument is not a callable.
+ Typical error is to pass a :class:`DoFn` instance which is supported only
+ for :class:`ParDo`.
"""
if not callable(fn):
raise TypeError(
@@ -815,19 +833,23 @@ def Map(fn, *args, **kwargs): # pylint: disable=invalid-name
def Filter(fn, *args, **kwargs): # pylint: disable=invalid-name
- """Filter is a FlatMap with its callable filtering out elements.
+ """:func:`Filter` is a :func:`FlatMap` with its callable filtering out
+ elements.
Args:
- fn: a callable object.
+ fn (callable): a callable object.
*args: positional arguments passed to the transform callable.
**kwargs: keyword arguments passed to the transform callable.
Returns:
- A PCollection containing the Filter outputs.
+ ~apache_beam.pvalue.PCollection:
+ A :class:`~apache_beam.pvalue.PCollection` containing the
+ :func:`Filter` outputs.
Raises:
- TypeError: If the fn passed as argument is not a callable. Typical error
- is to pass a DoFn instance which is supported only for FlatMap.
+ ~exceptions.TypeError: If the **fn** passed as argument is not a callable.
+ Typical error is to pass a :class:`DoFn` instance which is supported only
+ for :class:`ParDo`.
"""
if not callable(fn):
raise TypeError(
@@ -867,35 +889,42 @@ def _combine_payload(combine_fn, context):
class CombineGlobally(PTransform):
- """A CombineGlobally transform.
+ """A :class:`CombineGlobally` transform.
- Reduces a PCollection to a single value by progressively applying a CombineFn
- to portions of the PCollection (and to intermediate values created thereby).
- See documentation in CombineFn for details on the specifics on how CombineFns
- are applied.
+ Reduces a :class:`~apache_beam.pvalue.PCollection` to a single value by
+ progressively applying a :class:`CombineFn` to portions of the
+ :class:`~apache_beam.pvalue.PCollection` (and to intermediate values created
+ thereby). See documentation in :class:`CombineFn` for details on the specifics
+ on how :class:`CombineFn` s are applied.
Args:
- pcoll: a PCollection to be reduced into a single value.
- fn: a CombineFn object that will be called to progressively reduce the
- PCollection into single values, or a callable suitable for wrapping
- by CallableWrapperCombineFn.
- *args: positional arguments passed to the CombineFn object.
- **kwargs: keyword arguments passed to the CombineFn object.
+ pcoll (~apache_beam.pvalue.PCollection):
+ a :class:`~apache_beam.pvalue.PCollection` to be reduced into a single
+ value.
+ fn (callable): a :class:`CombineFn` object that will be called to
+ progressively reduce the :class:`~apache_beam.pvalue.PCollection` into
+ single values, or a callable suitable for wrapping by
+ :class:`~apache_beam.transforms.core.CallableWrapperCombineFn`.
+ *args: positional arguments passed to the :class:`CombineFn` object.
+ **kwargs: keyword arguments passed to the :class:`CombineFn` object.
Raises:
- TypeError: If the output type of the input PCollection is not compatible
- with Iterable[A].
+ ~exceptions.TypeError: If the output type of the input
+ :class:`~apache_beam.pvalue.PCollection` is not compatible
+ with ``Iterable[A]``.
Returns:
- A single-element PCollection containing the main output of the Combine
- transform.
+ ~apache_beam.pvalue.PCollection: A single-element
+ :class:`~apache_beam.pvalue.PCollection` containing the main output of
+ the :class:`CombineGlobally` transform.
Note that the positional and keyword arguments will be processed in order
- to detect PObjects that will be computed as side inputs to the transform.
- During pipeline execution whenever the CombineFn object gets executed (i.e.,
- any of the CombineFn methods get called), the PObject arguments will be
- replaced by their actual value in the exact position where they appear in
- the argument lists.
+ to detect :class:`~apache_beam.pvalue.PValue` s that will be computed as side
+ inputs to the transform.
+ During pipeline execution whenever the :class:`CombineFn` object gets executed
+ (i.e. any of the :class:`CombineFn` methods get called), the
+ :class:`~apache_beam.pvalue.PValue` arguments will be replaced by their
+ actual value in the exact position where they appear in the argument lists.
"""
has_defaults = True
as_view = False