You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/12 00:02:50 UTC
[2/3] beam git commit: Makes Python API reference generation more
strict
Makes Python API reference generation more strict
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e1baf55d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e1baf55d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e1baf55d
Branch: refs/heads/master
Commit: e1baf55d82fcc4a3951057b2321f77319d88b6c3
Parents: d035a34
Author: David Cavazos <dc...@google.com>
Authored: Fri Jul 21 09:58:11 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Aug 11 17:01:34 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/__init__.py | 47 ++--
.../apache_beam/internal/gcp/json_value.py | 45 ++--
sdks/python/apache_beam/io/avroio.py | 79 ++++--
sdks/python/apache_beam/io/filebasedsink.py | 18 +-
sdks/python/apache_beam/io/filebasedsource.py | 67 +++--
sdks/python/apache_beam/io/filesystem.py | 27 +-
sdks/python/apache_beam/io/gcp/bigquery.py | 257 +++++++++++--------
sdks/python/apache_beam/io/gcp/gcsio.py | 12 +-
sdks/python/apache_beam/io/range_trackers.py | 12 +-
sdks/python/apache_beam/io/source_test_utils.py | 88 ++++---
sdks/python/apache_beam/io/textio.py | 121 +++++----
sdks/python/apache_beam/pipeline.py | 89 ++++---
sdks/python/apache_beam/runners/runner.py | 31 ++-
.../python/apache_beam/testing/test_pipeline.py | 48 ++--
sdks/python/apache_beam/transforms/core.py | 165 +++++++-----
sdks/python/apache_beam/transforms/display.py | 87 ++++---
.../python/apache_beam/transforms/ptransform.py | 61 +++--
sdks/python/apache_beam/typehints/decorators.py | 104 +++++---
.../typehints/native_type_compatibility.py | 7 +-
sdks/python/apache_beam/typehints/typehints.py | 38 +--
sdks/python/generate_pydoc.sh | 134 ++++++++--
sdks/python/tox.ini | 1 +
22 files changed, 953 insertions(+), 585 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py
index 8b772c9..791ebb7 100644
--- a/sdks/python/apache_beam/__init__.py
+++ b/sdks/python/apache_beam/__init__.py
@@ -15,11 +15,12 @@
# limitations under the License.
#
-"""Apache Beam SDK for Python.
+"""
+Apache Beam SDK for Python
+==========================
-Apache Beam <https://beam.apache.org/>
-provides a simple, powerful programming model for building both batch
-and streaming parallel data processing pipelines.
+`Apache Beam <https://beam.apache.org>`_ provides a simple, powerful programming
+model for building both batch and streaming parallel data processing pipelines.
The Apache Beam SDK for Python provides access to Apache Beam capabilities
from the Python programming language.
@@ -33,32 +34,40 @@ Overview
--------
The key concepts in this programming model are
-* PCollection: represents a collection of data, which could be
- bounded or unbounded in size.
-* PTransform: represents a computation that transforms input
- PCollections into output PCollections.
-* Pipeline: manages a directed acyclic graph of PTransforms and
- PCollections that is ready for execution.
-* Runner: specifies where and how the Pipeline should execute.
-* Reading and Writing Data: your pipeline can read from an external
- source and write to an external data sink.
+* :class:`~apache_beam.pvalue.PCollection`: represents a collection of data,
+ which could be bounded or unbounded in size.
+* :class:`~apache_beam.transforms.ptransform.PTransform`: represents a
+ computation that transforms input PCollections into output PCollections.
+* :class:`~apache_beam.pipeline.Pipeline`: manages a directed acyclic graph of
+ :class:`~apache_beam.transforms.ptransform.PTransform` s and
+ :class:`~apache_beam.pvalue.PCollection` s that is ready for execution.
+* :class:`~apache_beam.runners.runner.PipelineRunner`: specifies where and how
+ the pipeline should execute.
+* :class:`~apache_beam.io.iobase.Read`: read from an external source.
+* :class:`~apache_beam.io.iobase.Write`: write to an external data sink.
Typical usage
-------------
At the top of your source file::
- import apache_beam as beam
+ import apache_beam as beam
After this import statement
-* transform classes are available as beam.FlatMap, beam.GroupByKey, etc.
-* Pipeline class is available as beam.Pipeline
-* text read/write transforms are available as beam.io.ReadfromText,
- beam.io.WriteToText
+* Transform classes are available as
+ :class:`beam.FlatMap <apache_beam.transforms.core.FlatMap>`,
+ :class:`beam.GroupByKey <apache_beam.transforms.core.GroupByKey>`, etc.
+* Pipeline class is available as
+ :class:`beam.Pipeline <apache_beam.pipeline.Pipeline>`
+* Text read/write transforms are available as
+ :class:`beam.io.ReadFromText <apache_beam.io.textio.ReadFromText>`,
+ :class:`beam.io.WriteToText <apache_beam.io.textio.WriteToText>`.
Examples
--------
-The examples subdirectory has some examples.
+The `examples subdirectory
+<https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples>`_
+has some examples.
"""
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/internal/gcp/json_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py b/sdks/python/apache_beam/internal/gcp/json_value.py
index 59f8b60..167b173 100644
--- a/sdks/python/apache_beam/internal/gcp/json_value.py
+++ b/sdks/python/apache_beam/internal/gcp/json_value.py
@@ -41,11 +41,12 @@ def get_typed_value_descriptor(obj):
obj: A basestring, bool, int, or float to be converted.
Returns:
- A dictionary containing the keys '@type' and 'value' with the value for
- the @type of appropriate type.
+ A dictionary containing the keys ``@type`` and ``value`` with the value for
+ the ``@type`` of appropriate type.
Raises:
- TypeError: if the Python object has a type that is not supported.
+ ~exceptions.TypeError: if the Python object has a type that is not
+ supported.
"""
if isinstance(obj, basestring):
type_name = 'Text'
@@ -66,21 +67,23 @@ def to_json_value(obj, with_type=False):
Converts Python objects into extra_types.JsonValue objects.
Args:
- obj: Python object to be converted. Can be 'None'.
- with_type: If true then the basic types (string, int, float, bool) will
- be wrapped in @type/value dictionaries. Otherwise the straight value is
- encoded into a JsonValue.
+ obj: Python object to be converted. Can be :data:`None`.
+ with_type: If true then the basic types (``string``, ``int``, ``float``,
+ ``bool``) will be wrapped in ``@type:value`` dictionaries. Otherwise the
+ straight value is encoded into a ``JsonValue``.
Returns:
- A JsonValue object using JsonValue, JsonArray and JsonObject types for the
- corresponding values, lists, or dictionaries.
+ A ``JsonValue`` object using ``JsonValue``, ``JsonArray`` and ``JsonObject``
+ types for the corresponding values, lists, or dictionaries.
Raises:
- TypeError: if the Python object contains a type that is not supported.
+ ~exceptions.TypeError: if the Python object contains a type that is not
+ supported.
- The types supported are str, bool, list, tuple, dict, and None. The Dataflow
- API requires JsonValue(s) in many places, and it is quite convenient to be
- able to specify these hierarchical objects using Python syntax.
+ The types supported are ``str``, ``bool``, ``list``, ``tuple``, ``dict``, and
+ ``None``. The Dataflow API requires JsonValue(s) in many places, and it is
+ quite convenient to be able to specify these hierarchical objects using
+ Python syntax.
"""
if obj is None:
return extra_types.JsonValue(is_null=True)
@@ -121,21 +124,23 @@ def to_json_value(obj, with_type=False):
def from_json_value(v):
"""For internal use only; no backwards-compatibility guarantees.
- Converts extra_types.JsonValue objects into Python objects.
+ Converts ``extra_types.JsonValue`` objects into Python objects.
Args:
- v: JsonValue object to be converted.
+ v: ``JsonValue`` object to be converted.
Returns:
A Python object structured as values, lists, and dictionaries corresponding
- to JsonValue, JsonArray and JsonObject types.
+ to ``JsonValue``, ``JsonArray`` and ``JsonObject`` types.
Raises:
- TypeError: if the JsonValue object contains a type that is not supported.
+ ~exceptions.TypeError: if the ``JsonValue`` object contains a type that is
+ not supported.
- The types supported are str, bool, list, dict, and None. The Dataflow API
- returns JsonValue(s) in many places and it is quite convenient to be able to
- convert these hierarchical objects to much simpler Python objects.
+ The types supported are ``str``, ``bool``, ``list``, ``dict``, and ``None``.
+ The Dataflow API returns JsonValue(s) in many places and it is quite
+ convenient to be able to convert these hierarchical objects to much simpler
+ Python objects.
"""
if isinstance(v, extra_types.JsonValue):
if v.string_value is not None:
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 47ea282..cb14c65 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -64,27 +64,74 @@ __all__ = ['ReadFromAvro', 'ReadAllFromAvro', 'WriteToAvro']
class ReadFromAvro(PTransform):
- """A ``PTransform`` for reading Avro files.
+ """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading avro
+ files."""
- Uses source '_AvroSource' to read a set of Avro files defined by a given
- file pattern.
- If '/mypath/myavrofiles*' is a file-pattern that points to a set of Avro
- files, a ``PCollection`` for the records in these Avro files can be created
- in the following manner.
+ def __init__(self, file_pattern=None, min_bundle_size=0, validate=True):
+ """Initializes :class:`ReadFromAvro`.
- p = df.Pipeline(argv=pipeline_args)
- records = p | 'Read' >> df.io.ReadFromAvro('/mypath/myavrofiles*')
- """
+ Uses source :class:`~apache_beam.io._AvroSource` to read a set of Avro
+ files defined by a given file pattern.
- def __init__(self, file_pattern=None, min_bundle_size=0, validate=True):
- """Initializes ``ReadFromAvro``.
+ If ``/mypath/myavrofiles*`` is a file-pattern that points to a set of Avro
+ files, a :class:`~apache_beam.pvalue.PCollection` for the records in
+ these Avro files can be created in the following manner.
+
+ .. testcode::
+
+ with beam.Pipeline() as p:
+ records = p | 'Read' >> beam.io.ReadFromAvro('/mypath/myavrofiles*')
+
+ .. NOTE: We're not actually interested in this error; but if we get here,
+ it means that the way of calling this transform hasn't changed.
+
+ .. testoutput::
+ :hide:
+
+ Traceback (most recent call last):
+ ...
+ IOError: No files found based on the file pattern
+
+ Each record of this :class:`~apache_beam.pvalue.PCollection` will contain
+ a single record read from a source. Records that are of simple types will be
+ mapped into corresponding Python types. Records that are of Avro type
+ ``RECORD`` will be mapped to Python dictionaries that comply with the schema
+ contained in the Avro file that contains those records. In this case, keys
+ of each dictionary will contain the corresponding field names and will be of
+ type :class:`str` while the values of the dictionary will be of the type
+ defined in the corresponding Avro schema.
+
+ For example, if schema of the Avro file is the following. ::
+
+ {
+ "namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+
+ {"name": "name",
+ "type": "string"},
+
+ {"name": "favorite_number",
+ "type": ["int", "null"]},
+
+ {"name": "favorite_color",
+ "type": ["string", "null"]}
+
+ ]
+ }
+
+ Then records generated by :class:`~apache_beam.io._AvroSource` will be
+ dictionaries of the following form. ::
+
+ {u'name': u'Alyssa', u'favorite_number': 256, u'favorite_color': None}).
Args:
- file_pattern: the set of files to be read.
- min_bundle_size: the minimum size in bytes, to be considered when
- splitting the input into bundles.
- validate: flag to verify that the files exist during the pipeline
- creation time.
+ file_pattern (str): the file glob to read
+ min_bundle_size (int): the minimum size in bytes, to be considered when
+ splitting the input into bundles.
+ validate (bool): flag to verify that the files exist during the pipeline
+ creation time.
"""
super(ReadFromAvro, self).__init__()
self._source = _AvroSource(file_pattern, min_bundle_size, validate=validate)
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/filebasedsink.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py
index 76c09fc..eb99d08 100644
--- a/sdks/python/apache_beam/io/filebasedsink.py
+++ b/sdks/python/apache_beam/io/filebasedsink.py
@@ -44,12 +44,13 @@ class FileBasedSink(iobase.Sink):
"""A sink to a GCS or local files.
To implement a file-based sink, extend this class and override
- either ``write_record()`` or ``write_encoded_record()``.
+ either :meth:`.write_record()` or :meth:`.write_encoded_record()`.
- If needed, also overwrite ``open()`` and/or ``close()`` to customize the
- file handling or write headers and footers.
+ If needed, also overwrite :meth:`.open()` and/or :meth:`.close()` to customize
+ the file handling or write headers and footers.
- The output of this write is a PCollection of all written shards.
+ The output of this write is a :class:`~apache_beam.pvalue.PCollection` of
+ all written shards.
"""
# Max number of threads to be used for renaming.
@@ -65,9 +66,12 @@ class FileBasedSink(iobase.Sink):
compression_type=CompressionTypes.AUTO):
"""
Raises:
- TypeError: if file path parameters are not a string or ValueProvider,
- or if compression_type is not member of CompressionTypes.
- ValueError: if shard_name_template is not of expected format.
+ ~exceptions.TypeError: if file path parameters are not a :class:`str` or
+ :class:`~apache_beam.options.value_provider.ValueProvider`, or if
+ **compression_type** is not member of
+ :class:`~apache_beam.io.filesystem.CompressionTypes`.
+ ~exceptions.ValueError: if **shard_name_template** is not of expected
+ format.
"""
if not isinstance(file_path_prefix, (basestring, ValueProvider)):
raise TypeError('file_path_prefix must be a string or ValueProvider;'
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index f78bf3f..6496930 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -17,12 +17,13 @@
"""A framework for developing sources for new file types.
-To create a source for a new file type a sub-class of ``FileBasedSource`` should
-be created. Sub-classes of ``FileBasedSource`` must implement the method
-``FileBasedSource.read_records()``. Please read the documentation of that method
-for more details.
+To create a source for a new file type a sub-class of :class:`FileBasedSource`
+should be created. Sub-classes of :class:`FileBasedSource` must implement the
+method :meth:`FileBasedSource.read_records()`. Please read the documentation of
+that method for more details.
-For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
+For an example implementation of :class:`FileBasedSource` see
+:class:`~apache_beam.io._AvroSource`.
"""
import uuid
@@ -51,7 +52,8 @@ __all__ = ['FileBasedSource']
class FileBasedSource(iobase.BoundedSource):
- """A ``BoundedSource`` for reading a file glob of a given type."""
+ """A :class:`~apache_beam.io.iobase.BoundedSource` for reading a file glob of
+ a given type."""
MIN_NUMBER_OF_FILES_TO_STAT = 100
MIN_FRACTION_OF_FILES_TO_STAT = 0.01
@@ -62,31 +64,40 @@ class FileBasedSource(iobase.BoundedSource):
compression_type=CompressionTypes.AUTO,
splittable=True,
validate=True):
- """Initializes ``FileBasedSource``.
+ """Initializes :class:`FileBasedSource`.
Args:
- file_pattern: the file glob to read a string or a ValueProvider
- (placeholder to inject a runtime value).
- min_bundle_size: minimum size of bundles that should be generated when
- performing initial splitting on this source.
- compression_type: compression type to use
- splittable: whether FileBasedSource should try to logically split a single
- file into data ranges so that different parts of the same file
- can be read in parallel. If set to False, FileBasedSource will
- prevent both initial and dynamic splitting of sources for
- single files. File patterns that represent multiple files may
- still get split into sources for individual files. Even if set
- to True by the user, FileBasedSource may choose to not split
- the file, for example, for compressed files where currently
- it is not possible to efficiently read a data range without
- decompressing the whole file.
- validate: Boolean flag to verify that the files exist during the pipeline
- creation time.
+ file_pattern (str): the file glob to read a string or a
+ :class:`~apache_beam.options.value_provider.ValueProvider`
+ (placeholder to inject a runtime value).
+ min_bundle_size (str): minimum size of bundles that should be generated
+ when performing initial splitting on this source.
+ compression_type (str): Used to handle compressed output files.
+ Typical value is :attr:`CompressionTypes.AUTO
+ <apache_beam.io.filesystem.CompressionTypes.AUTO>`,
+ in which case the final file path's extension will be used to detect
+ the compression.
+ splittable (bool): whether :class:`FileBasedSource` should try to
+ logically split a single file into data ranges so that different parts
+ of the same file can be read in parallel. If set to :data:`False`,
+ :class:`FileBasedSource` will prevent both initial and dynamic splitting
+ of sources for single files. File patterns that represent multiple files
+ may still get split into sources for individual files. Even if set to
+ :data:`True` by the user, :class:`FileBasedSource` may choose to not
+ split the file, for example, for compressed files where currently it is
+ not possible to efficiently read a data range without decompressing the
+ whole file.
+ validate (bool): Boolean flag to verify that the files exist during the
+ pipeline creation time.
+
Raises:
- TypeError: when compression_type is not valid or if file_pattern is not a
- string or a ValueProvider.
- ValueError: when compression and splittable files are specified.
- IOError: when the file pattern specified yields an empty result.
+ ~exceptions.TypeError: when **compression_type** is not valid or if
+ **file_pattern** is not a :class:`str` or a
+ :class:`~apache_beam.options.value_provider.ValueProvider`.
+ ~exceptions.ValueError: when compression and splittable files are
+ specified.
+ ~exceptions.IOError: when the file pattern specified yields an empty
+ result.
"""
if not isinstance(file_pattern, (basestring, ValueProvider)):
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index ef3040c..5804d00 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -299,23 +299,28 @@ class CompressedFile(object):
"""Set the file's current offset.
Seeking behavior:
- * seeking from the end (SEEK_END) the whole file is decompressed once to
- determine it's size. Therefore it is preferred to use
- SEEK_SET or SEEK_CUR to avoid the processing overhead
- * seeking backwards from the current position rewinds the file to 0
+
+ * seeking from the end :data:`os.SEEK_END` the whole file is decompressed
+ once to determine it's size. Therefore it is preferred to use
+ :data:`os.SEEK_SET` or :data:`os.SEEK_CUR` to avoid the processing
+ overhead
+ * seeking backwards from the current position rewinds the file to ``0``
and decompresses the chunks to the requested offset
* seeking is only supported in files opened for reading
- * if the new offset is out of bound, it is adjusted to either 0 or EOF.
+ * if the new offset is out of bound, it is adjusted to either ``0`` or
+ ``EOF``.
Args:
- offset: seek offset in the uncompressed content represented as number
- whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
- os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
- (seek relative to the end, offset should be negative).
+ offset (int): seek offset in the uncompressed content represented as
+ number
+ whence (int): seek mode. Supported modes are :data:`os.SEEK_SET`
+ (absolute seek), :data:`os.SEEK_CUR` (seek relative to the current
+ position), and :data:`os.SEEK_END` (seek relative to the end, offset
+ should be negative).
Raises:
- IOError: When this buffer is closed.
- ValueError: When whence is invalid or the file is not seekable
+ ~exceptions.IOError: When this buffer is closed.
+ ~exceptions.ValueError: When whence is invalid or the file is not seekable
"""
if whence == os.SEEK_SET:
absolute_offset = offset
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index db6715a..33d67bf 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -330,45 +330,49 @@ class BigQuerySource(dataflow_io.NativeSource):
def __init__(self, table=None, dataset=None, project=None, query=None,
validate=False, coder=None, use_standard_sql=False,
flatten_results=True):
- """Initialize a BigQuerySource.
+ """Initialize a :class:`BigQuerySource`.
Args:
- table: The ID of a BigQuery table. If specified all data of the table
- will be used as input of the current source. The ID must contain only
- letters (a-z, A-Z), numbers (0-9), or underscores (_). If dataset
- and query arguments are None then the table argument must contain the
- entire table reference specified as: 'DATASET.TABLE' or
- 'PROJECT:DATASET.TABLE'.
- dataset: The ID of the dataset containing this table or null if the table
- reference is specified entirely by the table argument or a query is
- specified.
- project: The ID of the project containing this table or null if the table
- reference is specified entirely by the table argument or a query is
- specified.
- query: A query to be used instead of arguments table, dataset, and
+ table (str): The ID of a BigQuery table. If specified all data of the
+ table will be used as input of the current source. The ID must contain
+ only letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores
+ ``_``. If dataset and query arguments are :data:`None` then the table
+ argument must contain the entire table reference specified as:
+ ``'DATASET.TABLE'`` or ``'PROJECT:DATASET.TABLE'``.
+ dataset (str): The ID of the dataset containing this table or
+ :data:`None` if the table reference is specified entirely by the table
+ argument or a query is specified.
+ project (str): The ID of the project containing this table or
+ :data:`None` if the table reference is specified entirely by the table
+ argument or a query is specified.
+ query (str): A query to be used instead of arguments table, dataset, and
project.
- validate: If true, various checks will be done when source gets
- initialized (e.g., is table present?). This should be True for most
- scenarios in order to catch errors as early as possible (pipeline
- construction instead of pipeline execution). It should be False if the
- table is created during pipeline execution by a previous step.
- coder: The coder for the table rows if serialized to disk. If None, then
- the default coder is RowAsDictJsonCoder, which will interpret every line
- in a file as a JSON serialized dictionary. This argument needs a value
- only in special cases when returning table rows as dictionaries is not
- desirable.
- use_standard_sql: Specifies whether to use BigQuery's standard
- SQL dialect for this query. The default value is False. If set to True,
- the query will use BigQuery's updated SQL dialect with improved
- standards compliance. This parameter is ignored for table inputs.
- flatten_results: Flattens all nested and repeated fields in the
- query results. The default value is true.
+ validate (bool): If :data:`True`, various checks will be done when source
+ gets initialized (e.g., is table present?). This should be
+ :data:`True` for most scenarios in order to catch errors as early as
+ possible (pipeline construction instead of pipeline execution). It
+ should be :data:`False` if the table is created during pipeline
+ execution by a previous step.
+ coder (~apache_beam.coders.coders.Coder): The coder for the table
+ rows if serialized to disk. If :data:`None`, then the default coder is
+ :class:`~apache_beam.io.gcp.bigquery.RowAsDictJsonCoder`,
+ which will interpret every line in a file as a JSON serialized
+ dictionary. This argument needs a value only in special cases when
+ returning table rows as dictionaries is not desirable.
+ use_standard_sql (bool): Specifies whether to use BigQuery's standard SQL
+ dialect for this query. The default value is :data:`False`.
+ If set to :data:`True`, the query will use BigQuery's updated SQL
+ dialect with improved standards compliance.
+ This parameter is ignored for table inputs.
+ flatten_results (bool): Flattens all nested and repeated fields in the
+ query results. The default value is :data:`True`.
Raises:
- ValueError: if any of the following is true
- (1) the table reference as a string does not match the expected format
- (2) neither a table nor a query is specified
- (3) both a table and a query is specified.
+ ~exceptions.ValueError: if any of the following is true:
+
+ 1) the table reference as a string does not match the expected format
+ 2) neither a table nor a query is specified
+ 3) both a table and a query is specified.
"""
# Import here to avoid adding the dependency for local running scenarios.
@@ -439,46 +443,62 @@ class BigQuerySink(dataflow_io.NativeSink):
"""Initialize a BigQuerySink.
Args:
- table: The ID of the table. The ID must contain only letters
- (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
- None then the table argument must contain the entire table reference
- specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
- dataset: The ID of the dataset containing this table or null if the table
- reference is specified entirely by the table argument.
- project: The ID of the project containing this table or null if the table
- reference is specified entirely by the table argument.
- schema: The schema to be used if the BigQuery table to write has to be
- created. This can be either specified as a 'bigquery.TableSchema' object
- or a single string of the form 'field1:type1,field2:type2,field3:type3'
- that defines a comma separated list of fields. Here 'type' should
- specify the BigQuery type of the field. Single string based schemas do
- not support nested fields, repeated fields, or specifying a BigQuery
- mode for fields (mode will always be set to 'NULLABLE').
- create_disposition: A string describing what happens if the table does not
- exist. Possible values are:
- - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
- - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
- write_disposition: A string describing what happens if the table has
- already some data. Possible values are:
- - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
- - BigQueryDisposition.WRITE_APPEND: add to existing rows.
- - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
- validate: If true, various checks will be done when sink gets
- initialized (e.g., is table present given the disposition arguments?).
- This should be True for most scenarios in order to catch errors as early
- as possible (pipeline construction instead of pipeline execution). It
- should be False if the table is created during pipeline execution by a
- previous step.
- coder: The coder for the table rows if serialized to disk. If None, then
- the default coder is RowAsDictJsonCoder, which will interpret every
- element written to the sink as a dictionary that will be JSON serialized
- as a line in a file. This argument needs a value only in special cases
- when writing table rows as dictionaries is not desirable.
+ table (str): The ID of the table. The ID must contain only letters
+ ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If
+ **dataset** argument is :data:`None` then the table argument must
+ contain the entire table reference specified as: ``'DATASET.TABLE'`` or
+ ``'PROJECT:DATASET.TABLE'``.
+ dataset (str): The ID of the dataset containing this table or
+ :data:`None` if the table reference is specified entirely by the table
+ argument.
+ project (str): The ID of the project containing this table or
+ :data:`None` if the table reference is specified entirely by the table
+ argument.
+ schema (str): The schema to be used if the BigQuery table to write has
+ to be created. This can be either specified as a
+ :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` object or a single string of the form
+ ``'field1:type1,field2:type2,field3:type3'`` that defines a comma
+ separated list of fields. Here ``'type'`` should specify the BigQuery
+ type of the field. Single string based schemas do not support nested
+ fields, repeated fields, or specifying a BigQuery mode for fields (mode
+ will always be set to ``'NULLABLE'``).
+ create_disposition (BigQueryDisposition): A string describing what
+ happens if the table does not exist. Possible values are:
+
+ * :attr:`BigQueryDisposition.CREATE_IF_NEEDED`: create if does not
+ exist.
+ * :attr:`BigQueryDisposition.CREATE_NEVER`: fail the write if does not
+ exist.
+
+ write_disposition (BigQueryDisposition): A string describing what
+ happens if the table has already some data. Possible values are:
+
+ * :attr:`BigQueryDisposition.WRITE_TRUNCATE`: delete existing rows.
+ * :attr:`BigQueryDisposition.WRITE_APPEND`: add to existing rows.
+ * :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not
+ empty.
+
+ validate (bool): If :data:`True`, various checks will be done when sink
+ gets initialized (e.g., is table present given the disposition
+ arguments?). This should be :data:`True` for most scenarios in order to
+ catch errors as early as possible (pipeline construction instead of
+ pipeline execution). It should be :data:`False` if the table is created
+ during pipeline execution by a previous step.
+ coder (~apache_beam.coders.coders.Coder): The coder for the
+ table rows if serialized to disk. If :data:`None`, then the default
+ coder is :class:`~apache_beam.io.gcp.bigquery.RowAsDictJsonCoder`,
+ which will interpret every element written to the sink as a dictionary
+ that will be JSON serialized as a line in a file. This argument needs a
+ value only in special cases when writing table rows as dictionaries is
+ not desirable.
Raises:
- TypeError: if the schema argument is not a string or a TableSchema object.
- ValueError: if the table reference as a string does not match the expected
- format.
+ ~exceptions.TypeError: if the schema argument is not a :class:`str` or a
+ :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` object.
+ ~exceptions.ValueError: if the table reference as a string does not
+ match the expected format.
"""
# Import here to avoid adding the dependency for local running scenarios.
@@ -1261,32 +1281,47 @@ class WriteToBigQuery(PTransform):
"""Initialize a WriteToBigQuery transform.
Args:
- table: The ID of the table. The ID must contain only letters
- (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
- None then the table argument must contain the entire table reference
- specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
- dataset: The ID of the dataset containing this table or null if the table
- reference is specified entirely by the table argument.
- project: The ID of the project containing this table or null if the table
- reference is specified entirely by the table argument.
- schema: The schema to be used if the BigQuery table to write has to be
- created. This can be either specified as a 'bigquery.TableSchema' object
- or a single string of the form 'field1:type1,field2:type2,field3:type3'
- that defines a comma separated list of fields. Here 'type' should
- specify the BigQuery type of the field. Single string based schemas do
- not support nested fields, repeated fields, or specifying a BigQuery
- mode for fields (mode will always be set to 'NULLABLE').
- create_disposition: A string describing what happens if the table does not
- exist. Possible values are:
- - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
- - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
- write_disposition: A string describing what happens if the table has
- already some data. Possible values are:
- - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
- - BigQueryDisposition.WRITE_APPEND: add to existing rows.
- - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
+ table (str): The ID of the table. The ID must contain only letters
+ ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If dataset
+ argument is :data:`None` then the table argument must contain the
+ entire table reference specified as: ``'DATASET.TABLE'`` or
+ ``'PROJECT:DATASET.TABLE'``.
+ dataset (str): The ID of the dataset containing this table or
+ :data:`None` if the table reference is specified entirely by the table
+ argument.
+ project (str): The ID of the project containing this table or
+ :data:`None` if the table reference is specified entirely by the table
+ argument.
+ schema (str): The schema to be used if the BigQuery table to write has to
+ be created. This can be either specified as a
+ :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema`
+ object or a single string of the form
+ ``'field1:type1,field2:type2,field3:type3'`` that defines a comma
+ separated list of fields. Here ``'type'`` should specify the BigQuery
+ type of the field. Single string based schemas do not support nested
+ fields, repeated fields, or specifying a BigQuery mode for fields
+ (mode will always be set to ``'NULLABLE'``).
+ create_disposition (BigQueryDisposition): A string describing what
+ happens if the table does not exist. Possible values are:
+
+ * :attr:`BigQueryDisposition.CREATE_IF_NEEDED`: create if does not
+ exist.
+ * :attr:`BigQueryDisposition.CREATE_NEVER`: fail the write if does not
+ exist.
+
+ write_disposition (BigQueryDisposition): A string describing what happens
+ if the table has already some data. Possible values are:
+
+ * :attr:`BigQueryDisposition.WRITE_TRUNCATE`: delete existing rows.
+ * :attr:`BigQueryDisposition.WRITE_APPEND`: add to existing rows.
+ * :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not
+ empty.
+
For streaming pipelines WriteTruncate can not be used.
- batch_size: Number of rows to be written to BQ per streaming API insert.
+
+ batch_size (int): Number of rows to be written to BQ per streaming API
+ insert.
test_client: Override the default bigquery client used for testing.
"""
self.table_reference = _parse_table_reference(table, dataset, project)
@@ -1300,14 +1335,20 @@ class WriteToBigQuery(PTransform):
@staticmethod
def get_table_schema_from_string(schema):
- """Transform the string table schema into a bigquery.TableSchema instance.
+ """Transform the string table schema into a
+ :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` instance.
Args:
- schema: The sting schema to be used if the BigQuery table to write has
- to be created.
+ schema (str): The sting schema to be used if the BigQuery table to write
+ has to be created.
+
Returns:
- table_schema: The schema to be used if the BigQuery table to write has
- to be created but in the bigquery.TableSchema format.
+ ~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema:
+ The schema to be used if the BigQuery table to write has to be created
+ but in the :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` format.
"""
table_schema = bigquery.TableSchema()
schema_list = [s.strip() for s in schema.split(',')]
@@ -1349,12 +1390,14 @@ class WriteToBigQuery(PTransform):
"""Transform the table schema into a dictionary instance.
Args:
- schema: The schema to be used if the BigQuery table to write has to be
- created. This can either be a dict or string or in the TableSchema
- format.
+ schema (~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema):
+ The schema to be used if the BigQuery table to write has to be created.
+ This can either be a dict or string or in the TableSchema format.
+
Returns:
- table_schema: The schema to be used if the BigQuery table to write has
- to be created but in the dictionary format.
+ Dict[str, Any]: The schema to be used if the BigQuery table to write has
+ to be created but in the dictionary format.
"""
if isinstance(schema, dict):
return schema
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 643fbc7..ae71a5f 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -137,16 +137,16 @@ class GcsIO(object):
"""Open a GCS file path for reading or writing.
Args:
- filename: GCS file path in the form gs://<bucket>/<object>.
- mode: 'r' for reading or 'w' for writing.
- read_buffer_size: Buffer size to use during read operations.
- mime_type: Mime type to set for write operations.
+ filename (str): GCS file path in the form ``gs://<bucket>/<object>``.
+ mode (str): ``'r'`` for reading or ``'w'`` for writing.
+ read_buffer_size (int): Buffer size to use during read operations.
+ mime_type (str): Mime type to set for write operations.
Returns:
- file object.
+ GCS file object.
Raises:
- ValueError: Invalid open file mode.
+ ~exceptions.ValueError: Invalid open file mode.
"""
if mode == 'r' or mode == 'rb':
return GcsBufferedReader(self.client, filename, mode=mode,
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index 4bd19f8..1339b91 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -317,17 +317,19 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
class UnsplittableRangeTracker(iobase.RangeTracker):
"""A RangeTracker that always ignores split requests.
- This can be used to make a given ``RangeTracker`` object unsplittable by
- ignoring all calls to ``try_split()``. All other calls will be delegated to
- the given ``RangeTracker``.
+ This can be used to make a given
+ :class:`~apache_beam.io.iobase.RangeTracker` object unsplittable by
+ ignoring all calls to :meth:`.try_split()`. All other calls will be delegated
+ to the given :class:`~apache_beam.io.iobase.RangeTracker`.
"""
def __init__(self, range_tracker):
"""Initializes UnsplittableRangeTracker.
Args:
- range_tracker: a ``RangeTracker`` to which all method calls expect calls
- to ``try_split()`` will be delegated.
+ range_tracker (~apache_beam.io.iobase.RangeTracker): a
+ :class:`~apache_beam.io.iobase.RangeTracker` to which all method
+ calls expect calls to :meth:`.try_split()` will be delegated.
"""
assert isinstance(range_tracker, iobase.RangeTracker)
self._range_tracker = range_tracker
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index a144a8a..bea9708 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -80,12 +80,13 @@ def read_from_source(source, start_position=None, stop_position=None):
Only reads elements within the given position range.
Args:
- source: ``iobase.BoundedSource`` implementation.
- start_position: start position for reading.
- stop_position: stop position for reading.
+ source (~apache_beam.io.iobase.BoundedSource):
+ :class:`~apache_beam.io.iobase.BoundedSource` implementation.
+ start_position (int): start position for reading.
+ stop_position (int): stop position for reading.
Returns:
- the set of values read from the sources.
+ List[str]: the set of values read from the sources.
"""
values = []
range_tracker = source.get_range_tracker(start_position, stop_position)
@@ -108,21 +109,25 @@ def _ThreadPool(threads):
def assert_sources_equal_reference_source(reference_source_info, sources_info):
"""Tests if a reference source is equal to a given set of sources.
- Given a reference source (a ``BoundedSource`` and a position range) and a
- list of sources, assert that the union of the records
- read from the list of sources is equal to the records read from the
+ Given a reference source (a :class:`~apache_beam.io.iobase.BoundedSource`
+ and a position range) and a list of sources, assert that the union of the
+ records read from the list of sources is equal to the records read from the
reference source.
Args:
- reference_source_info: a three-tuple that gives the reference
- ``iobase.BoundedSource``, position to start reading
- at, and position to stop reading at.
- sources_info: a set of sources. Each source is a three-tuple that is of
- the same format described above.
+ reference_source_info\
+ (Tuple[~apache_beam.io.iobase.BoundedSource, int, int]):
+ a three-tuple that gives the reference
+ :class:`~apache_beam.io.iobase.BoundedSource`, position to start
+ reading at, and position to stop reading at.
+ sources_info\
+ (Iterable[Tuple[~apache_beam.io.iobase.BoundedSource, int, int]]):
+ a set of sources. Each source is a three-tuple that is of the same
+ format described above.
Raises:
- ValueError: if the set of data produced by the reference source and the
- given set of sources are not equivalent.
+ ~exceptions.ValueError: if the set of data produced by the reference source
+ and the given set of sources are not equivalent.
"""
@@ -172,18 +177,20 @@ def assert_sources_equal_reference_source(reference_source_info, sources_info):
def assert_reentrant_reads_succeed(source_info):
"""Tests if a given source can be read in a reentrant manner.
- Assume that given source produces the set of values {v1, v2, v3, ... vn}. For
- i in range [1, n-1] this method performs a reentrant read after reading i
- elements and verifies that both the original and reentrant read produce the
- expected set of values.
+ Assume that given source produces the set of values ``{v1, v2, v3, ... vn}``.
+ For ``i`` in range ``[1, n-1]`` this method performs a reentrant read after
+ reading ``i`` elements and verifies that both the original and reentrant read
+ produce the expected set of values.
Args:
- source_info: a three-tuple that gives the reference
- ``iobase.BoundedSource``, position to start reading at, and a
- position to stop reading at.
+ source_info (Tuple[~apache_beam.io.iobase.BoundedSource, int, int]):
+ a three-tuple that gives the reference
+ :class:`~apache_beam.io.iobase.BoundedSource`, position to start reading
+ at, and a position to stop reading at.
+
Raises:
- ValueError: if source is too trivial or reentrant read result in an
- incorrect read.
+ ~exceptions.ValueError: if source is too trivial or reentrant read result
+ in an incorrect read.
"""
source, start_position, stop_position = source_info
@@ -228,21 +235,25 @@ def assert_split_at_fraction_behavior(source, num_items_to_read_before_split,
split_fraction, expected_outcome):
"""Verifies the behaviour of splitting a source at a given fraction.
- Asserts that splitting a ``BoundedSource`` either fails after reading
- ``num_items_to_read_before_split`` items, or succeeds in a way that is
- consistent according to ``assertSplitAtFractionSucceedsAndConsistent()``.
+ Asserts that splitting a :class:`~apache_beam.io.iobase.BoundedSource` either
+ fails after reading **num_items_to_read_before_split** items, or succeeds in
+ a way that is consistent according to
+ :func:`assert_split_at_fraction_succeeds_and_consistent()`.
Args:
- source: the source to perform dynamic splitting on.
- num_items_to_read_before_split: number of items to read before splitting.
- split_fraction: fraction to split at.
- expected_outcome: a value from 'ExpectedSplitOutcome'.
+ source (~apache_beam.io.iobase.BoundedSource): the source to perform
+ dynamic splitting on.
+ num_items_to_read_before_split (int): number of items to read before
+ splitting.
+ split_fraction (float): fraction to split at.
+ expected_outcome (int): a value from
+ :class:`~apache_beam.io.source_test_utils.ExpectedSplitOutcome`.
Returns:
- a tuple that gives the number of items produced by reading the two ranges
- produced after dynamic splitting. If splitting did not occur, the first
- value of the tuple will represent the full set of records read by the
- source while the second value of the tuple will be '-1'.
+ Tuple[int, int]: a tuple that gives the number of items produced by reading
+ the two ranges produced after dynamic splitting. If splitting did not
+ occur, the first value of the tuple will represent the full set of records
+ read by the source while the second value of the tuple will be ``-1``.
"""
assert isinstance(source, iobase.BoundedSource)
expected_items = read_from_source(source, None, None)
@@ -503,12 +514,13 @@ def assert_split_at_fraction_exhaustive(
Verifies multi threaded splitting as well.
Args:
- source: the source to perform dynamic splitting on.
- perform_multi_threaded_test: if true performs a multi-threaded test
- otherwise this test is skipped.
+ source (~apache_beam.io.iobase.BoundedSource): the source to perform
+ dynamic splitting on.
+ perform_multi_threaded_test (bool): if :data:`True` performs a
+ multi-threaded test, otherwise this test is skipped.
Raises:
- ValueError: if the exhaustive splitting test fails.
+ ~exceptions.ValueError: if the exhaustive splitting test fails.
"""
expected_items = read_from_source(source, start_position, stop_position)
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 9c6532e..9708df7 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -417,13 +417,15 @@ class ReadAllFromText(PTransform):
class ReadFromText(PTransform):
- """A ``PTransform`` for reading text files.
+ r"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading text
+ files.
Parses a text file as newline-delimited elements, by default assuming
- UTF-8 encoding. Supports newline delimiters '\\n' and '\\r\\n'.
+ ``UTF-8`` encoding. Supports newline delimiters ``\n`` and ``\r\n``.
- This implementation only supports reading text encoded using UTF-8 or ASCII.
- This does not support other encodings such as UTF-16 or UTF-32.
+ This implementation only supports reading text encoded using ``UTF-8`` or
+ ``ASCII``.
+ This does not support other encodings such as ``UTF-16`` or ``UTF-32``.
"""
def __init__(
self,
@@ -435,26 +437,28 @@ class ReadFromText(PTransform):
validate=True,
skip_header_lines=0,
**kwargs):
- """Initialize the ``ReadFromText`` transform.
+ """Initialize the :class:`ReadFromText` transform.
Args:
- file_pattern: The file path to read from as a local file path or a GCS
- ``gs://`` path. The path can contain glob characters
- ``(*, ?, and [...] sets)``.
- min_bundle_size: Minimum size of bundles that should be generated when
- splitting this source into bundles. See ``FileBasedSource`` for more
+ file_pattern (str): The file path to read from as a local file path or a
+ GCS ``gs://`` path. The path can contain glob characters
+ (``*``, ``?``, and ``[...]`` sets).
+ min_bundle_size (int): Minimum size of bundles that should be generated
+ when splitting this source into bundles. See
+ :class:`~apache_beam.io.filebasedsource.FileBasedSource` for more
details.
- compression_type: Used to handle compressed input files. Typical value
- is ``CompressionTypes.AUTO``, in which case the underlying file_path's
- extension will be used to detect the compression.
- strip_trailing_newlines: Indicates whether this source should remove
- the newline char in each line it reads before decoding that line.
- validate: flag to verify that the files exist during the pipeline
+ compression_type (str): Used to handle compressed input files.
+ Typical value is :attr:`CompressionTypes.AUTO
+ <apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the
+ underlying file_path's extension will be used to detect the compression.
+ strip_trailing_newlines (bool): Indicates whether this source should
+ remove the newline char in each line it reads before decoding that line.
+ validate (bool): flag to verify that the files exist during the pipeline
creation time.
- skip_header_lines: Number of header lines to skip. Same number is skipped
- from each source file. Must be 0 or higher. Large number of skipped
- lines might impact performance.
- coder: Coder used to decode each line.
+ skip_header_lines (int): Number of header lines to skip. Same number is
+ skipped from each source file. Must be 0 or higher. Large number of
+ skipped lines might impact performance.
+ coder (~apache_beam.coders.coders.Coder): Coder used to decode each line.
"""
super(ReadFromText, self).__init__(**kwargs)
@@ -468,49 +472,54 @@ class ReadFromText(PTransform):
class WriteToText(PTransform):
- """A PTransform for writing to text files."""
+ """A :class:`~apache_beam.transforms.ptransform.PTransform` for writing to
+ text files."""
- def __init__(self,
- file_path_prefix,
- file_name_suffix='',
- append_trailing_newlines=True,
- num_shards=0,
- shard_name_template=None,
- coder=coders.ToStringCoder(),
- compression_type=CompressionTypes.AUTO,
- header=None):
- """Initialize a WriteToText PTransform.
+ def __init__(
+ self,
+ file_path_prefix,
+ file_name_suffix='',
+ append_trailing_newlines=True,
+ num_shards=0,
+ shard_name_template=None,
+ coder=coders.ToStringCoder(),
+ compression_type=CompressionTypes.AUTO,
+ header=None):
+ r"""Initialize a :class:`WriteToText` transform.
Args:
- file_path_prefix: The file path to write to. The files written will begin
- with this prefix, followed by a shard identifier (see num_shards), and
- end in a common extension, if given by file_name_suffix. In most cases,
- only this argument is specified and num_shards, shard_name_template, and
- file_name_suffix use default values.
- file_name_suffix: Suffix for the files written.
- append_trailing_newlines: indicate whether this sink should write an
- additional newline char after writing each element.
- num_shards: The number of files (shards) used for output. If not set, the
- service will decide on the optimal number of shards.
+ file_path_prefix (str): The file path to write to. The files written will
+ begin with this prefix, followed by a shard identifier (see
+ **num_shards**), and end in a common extension, if given by
+ **file_name_suffix**. In most cases, only this argument is specified and
+ **num_shards**, **shard_name_template**, and **file_name_suffix** use
+ default values.
+ file_name_suffix (str): Suffix for the files written.
+ append_trailing_newlines (bool): indicate whether this sink should write
+ an additional newline char after writing each element.
+ num_shards (int): The number of files (shards) used for output.
+ If not set, the service will decide on the optimal number of shards.
Constraining the number of shards is likely to reduce
the performance of a pipeline. Setting this value is not recommended
unless you require a specific number of output files.
- shard_name_template: A template string containing placeholders for
- the shard number and shard count. Currently only '' and
- '-SSSSS-of-NNNNN' are patterns accepted by the service.
+ shard_name_template (str): A template string containing placeholders for
+ the shard number and shard count. Currently only ``''`` and
+ ``'-SSSSS-of-NNNNN'`` are patterns accepted by the service.
When constructing a filename for a particular shard number, the
- upper-case letters 'S' and 'N' are replaced with the 0-padded shard
- number and shard count respectively. This argument can be '' in which
- case it behaves as if num_shards was set to 1 and only one file will be
- generated. The default pattern used is '-SSSSS-of-NNNNN'.
- coder: Coder used to encode each line.
- compression_type: Used to handle compressed output files. Typical value
- is CompressionTypes.AUTO, in which case the final file path's
- extension (as determined by file_path_prefix, file_name_suffix,
- num_shards and shard_name_template) will be used to detect the
- compression.
- header: String to write at beginning of file as a header. If not None and
- append_trailing_newlines is set, '\n' will be added.
+ upper-case letters ``S`` and ``N`` are replaced with the ``0``-padded
+ shard number and shard count respectively. This argument can be ``''``
+ in which case it behaves as if num_shards was set to 1 and only one file
+ will be generated. The default pattern used is ``'-SSSSS-of-NNNNN'``.
+ coder (~apache_beam.coders.coders.Coder): Coder used to encode each line.
+ compression_type (str): Used to handle compressed output files.
+ Typical value is :class:`CompressionTypes.AUTO
+ <apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the
+ final file path's extension (as determined by **file_path_prefix**,
+ **file_name_suffix**, **num_shards** and **shard_name_template**) will
+ be used to detect the compression.
+ header (str): String to write at beginning of file as a header.
+ If not :data:`None` and **append_trailing_newlines** is set, ``\n`` will
+ be added.
"""
self._sink = _TextSink(file_path_prefix, file_name_suffix,
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index e7c2322..1ade6c0 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -15,17 +15,18 @@
# limitations under the License.
#
-"""Pipeline, the top-level Dataflow object.
+"""Pipeline, the top-level Beam object.
A pipeline holds a DAG of data transforms. Conceptually the nodes of the DAG
-are transforms (PTransform objects) and the edges are values (mostly PCollection
+are transforms (:class:`~apache_beam.transforms.ptransform.PTransform` objects)
+and the edges are values (mostly :class:`~apache_beam.pvalue.PCollection`
objects). The transforms take as inputs one or more PValues and output one or
-more PValues.
+more :class:`~apache_beam.pvalue.PValue` s.
The pipeline offers functionality to traverse the graph. The actual operation
to be executed for each node visited is specified through a runner object.
-Typical usage:
+Typical usage::
# Create a pipeline object using a local runner for execution.
with beam.Pipeline('DirectRunner') as p:
@@ -73,32 +74,40 @@ __all__ = ['Pipeline']
class Pipeline(object):
- """A pipeline object that manages a DAG of PValues and their PTransforms.
+ """A pipeline object that manages a DAG of
+ :class:`~apache_beam.pvalue.PValue` s and their
+ :class:`~apache_beam.transforms.ptransform.PTransform` s.
- Conceptually the PValues are the DAG's nodes and the PTransforms computing
- the PValues are the edges.
+ Conceptually the :class:`~apache_beam.pvalue.PValue` s are the DAG's nodes and
+ the :class:`~apache_beam.transforms.ptransform.PTransform` s computing
+ the :class:`~apache_beam.pvalue.PValue` s are the edges.
All the transforms applied to the pipeline must have distinct full labels.
If same transform instance needs to be applied then the right shift operator
- should be used to designate new names (e.g. `input | "label" >> my_tranform`).
+ should be used to designate new names
+ (e.g. ``input | "label" >> my_tranform``).
"""
def __init__(self, runner=None, options=None, argv=None):
"""Initialize a pipeline object.
Args:
- runner: An object of type 'PipelineRunner' that will be used to execute
- the pipeline. For registered runners, the runner name can be specified,
- otherwise a runner object must be supplied.
- options: A configured 'PipelineOptions' object containing arguments
- that should be used for running the Dataflow job.
- argv: a list of arguments (such as sys.argv) to be used for building a
- 'PipelineOptions' object. This will only be used if argument 'options'
- is None.
+ runner (~apache_beam.runners.runner.PipelineRunner): An object of
+ type :class:`~apache_beam.runners.runner.PipelineRunner` that will be
+ used to execute the pipeline. For registered runners, the runner name
+ can be specified, otherwise a runner object must be supplied.
+ options (~apache_beam.options.pipeline_options.PipelineOptions):
+ A configured
+ :class:`~apache_beam.options.pipeline_options.PipelineOptions` object
+ containing arguments that should be used for running the Beam job.
+ argv (List[str]): a list of arguments (such as :data:`sys.argv`)
+ to be used for building a
+ :class:`~apache_beam.options.pipeline_options.PipelineOptions` object.
+ This will only be used if argument **options** is :data:`None`.
Raises:
- ValueError: if either the runner or options argument is not of the
- expected type.
+ ~exceptions.ValueError: if either the runner or options argument is not
+ of the expected type.
"""
if options is not None:
if isinstance(options, PipelineOptions):
@@ -292,13 +301,15 @@ class Pipeline(object):
def replace_all(self, replacements):
""" Dynamically replaces PTransforms in the currently populated hierarchy.
- Currently this only works for replacements where input and output types
- are exactly the same.
- TODO: Update this to also work for transform overrides where input and
- output types are different.
+ Currently this only works for replacements where input and output types
+ are exactly the same.
+
+ TODO: Update this to also work for transform overrides where input and
+ output types are different.
Args:
- replacements a list of PTransformOverride objects.
+ replacements (List[~apache_beam.pipeline.PTransformOverride]): a list of
+ :class:`~apache_beam.pipeline.PTransformOverride` objects.
"""
for override in replacements:
assert isinstance(override, PTransformOverride)
@@ -341,13 +352,16 @@ class Pipeline(object):
Runner-internal implementation detail; no backwards-compatibility guarantees
Args:
- visitor: PipelineVisitor object whose callbacks will be called for each
- node visited. See PipelineVisitor comments.
+ visitor (~apache_beam.pipeline.PipelineVisitor):
+ :class:`~apache_beam.pipeline.PipelineVisitor` object whose callbacks
+ will be called for each node visited. See
+ :class:`~apache_beam.pipeline.PipelineVisitor` comments.
Raises:
- TypeError: if node is specified and is not a PValue.
- pipeline.PipelineError: if node is specified and does not belong to this
- pipeline instance.
+ ~exceptions.TypeError: if node is specified and is not a
+ :class:`~apache_beam.pvalue.PValue`.
+ ~apache_beam.error.PipelineError: if node is specified and does not
+ belong to this pipeline instance.
"""
visited = set()
@@ -357,15 +371,20 @@ class Pipeline(object):
"""Applies a custom transform using the pvalueish specified.
Args:
- transform: the PTranform to apply.
- pvalueish: the input for the PTransform (typically a PCollection).
- label: label of the PTransform.
+ transform (~apache_beam.transforms.ptransform.PTransform): the
+ :class:`~apache_beam.transforms.ptransform.PTransform` to apply.
+ pvalueish (~apache_beam.pvalue.PCollection): the input for the
+ :class:`~apache_beam.transforms.ptransform.PTransform` (typically a
+ :class:`~apache_beam.pvalue.PCollection`).
+ label (str): label of the
+ :class:`~apache_beam.transforms.ptransform.PTransform`.
Raises:
- TypeError: if the transform object extracted from the argument list is
- not a PTransform.
- RuntimeError: if the transform object was already applied to this pipeline
- and needs to be cloned in order to apply again.
+ ~exceptions.TypeError: if the transform object extracted from the
+ argument list is not a
+ :class:`~apache_beam.transforms.ptransform.PTransform`.
+ ~exceptions.RuntimeError: if the transform object was already applied to
+ this pipeline and needs to be cloned in order to apply again.
"""
if isinstance(transform, ptransform._NamedPTransform):
return self.apply(transform.transform, pvalueish,
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 7ce9a03..a3c6b34 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -283,10 +283,10 @@ class PValueCache(object):
class PipelineState(object):
- """State of the Pipeline, as returned by PipelineResult.state.
+ """State of the Pipeline, as returned by :attr:`PipelineResult.state`.
This is meant to be the union of all the states any runner can put a
- pipeline in. Currently, it represents the values of the dataflow
+ pipeline in. Currently, it represents the values of the dataflow
API JobState enum.
"""
UNKNOWN = 'UNKNOWN' # not specified
@@ -301,7 +301,7 @@ class PipelineState(object):
class PipelineResult(object):
- """A PipelineResult provides access to info about a pipeline."""
+ """A :class:`PipelineResult` provides access to info about a pipeline."""
def __init__(self, state):
self._state = state
@@ -315,15 +315,18 @@ class PipelineResult(object):
"""Waits until the pipeline finishes and returns the final status.
Args:
- duration: The time to wait (in milliseconds) for job to finish. If it is
- set to None, it will wait indefinitely until the job is finished.
+ duration (int): The time to wait (in milliseconds) for job to finish.
+ If it is set to :data:`None`, it will wait indefinitely until the job
+ is finished.
Raises:
- IOError: If there is a persistent problem getting job information.
- NotImplementedError: If the runner does not support this operation.
+ ~exceptions.IOError: If there is a persistent problem getting job
+ information.
+ ~exceptions.NotImplementedError: If the runner does not support this
+ operation.
Returns:
- The final state of the pipeline, or None on timeout.
+ The final state of the pipeline, or :data:`None` on timeout.
"""
raise NotImplementedError
@@ -331,8 +334,10 @@ class PipelineResult(object):
"""Cancels the pipeline execution.
Raises:
- IOError: If there is a persistent problem getting job information.
- NotImplementedError: If the runner does not support this operation.
+ ~exceptions.IOError: If there is a persistent problem getting job
+ information.
+ ~exceptions.NotImplementedError: If the runner does not support this
+ operation.
Returns:
The final state of the pipeline.
@@ -340,10 +345,12 @@ class PipelineResult(object):
raise NotImplementedError
def metrics(self):
- """Returns MetricsResult object to query metrics from the runner.
+ """Returns :class:`~apache_beam.metrics.metric.MetricResults` object to
+ query metrics from the runner.
Raises:
- NotImplementedError: If the runner does not support this operation.
+ ~exceptions.NotImplementedError: If the runner does not support this
+ operation.
"""
raise NotImplementedError
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/testing/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py
index 13b1639..8380242 100644
--- a/sdks/python/apache_beam/testing/test_pipeline.py
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -33,23 +33,23 @@ __all__ = [
class TestPipeline(Pipeline):
- """TestPipeline class is used inside of Beam tests that can be configured to
- run against pipeline runner.
+ """:class:`TestPipeline` class is used inside of Beam tests that can be
+ configured to run against pipeline runner.
It has a functionality to parse arguments from command line and build pipeline
options for tests who runs against a pipeline runner and utilizes resources
of the pipeline runner. Those test functions are recommended to be tagged by
- @attr("ValidatesRunner") annotation.
+ ``@attr("ValidatesRunner")`` annotation.
In order to configure the test with customized pipeline options from command
- line, system argument 'test-pipeline-options' can be used to obtains a list
- of pipeline options. If no options specified, default value will be used.
+ line, system argument ``--test-pipeline-options`` can be used to obtains a
+ list of pipeline options. If no options specified, default value will be used.
For example, use following command line to execute all ValidatesRunner tests::
- python setup.py nosetests -a ValidatesRunner \
- --test-pipeline-options="--runner=DirectRunner \
- --job_name=myJobName \
+ python setup.py nosetests -a ValidatesRunner \\
+ --test-pipeline-options="--runner=DirectRunner \\
+ --job_name=myJobName \\
--num_workers=1"
For example, use assert_that for test validation::
@@ -69,21 +69,27 @@ class TestPipeline(Pipeline):
"""Initialize a pipeline object for test.
Args:
- runner: An object of type 'PipelineRunner' that will be used to execute
- the pipeline. For registered runners, the runner name can be specified,
- otherwise a runner object must be supplied.
- options: A configured 'PipelineOptions' object containing arguments
- that should be used for running the pipeline job.
- argv: A list of arguments (such as sys.argv) to be used for building a
- 'PipelineOptions' object. This will only be used if argument 'options'
- is None.
- is_integration_test: True if the test is an integration test, False
- otherwise.
- blocking: Run method will wait until pipeline execution is completed.
+ runner (~apache_beam.runners.runner.PipelineRunner): An object of type
+ :class:`~apache_beam.runners.runner.PipelineRunner` that will be used
+ to execute the pipeline. For registered runners, the runner name can be
+ specified, otherwise a runner object must be supplied.
+ options (~apache_beam.options.pipeline_options.PipelineOptions):
+ A configured
+ :class:`~apache_beam.options.pipeline_options.PipelineOptions`
+ object containing arguments that should be used for running the
+ pipeline job.
+ argv (List[str]): A list of arguments (such as :data:`sys.argv`) to be
+ used for building a
+ :class:`~apache_beam.options.pipeline_options.PipelineOptions` object.
+ This will only be used if argument **options** is :data:`None`.
+ is_integration_test (bool): :data:`True` if the test is an integration
+ test, :data:`False` otherwise.
+ blocking (bool): Run method will wait until pipeline execution is
+ completed.
Raises:
- ValueError: if either the runner or options argument is not of the
- expected type.
+ ~exceptions.ValueError: if either the runner or options argument is not
+ of the expected type.
"""
self.is_integration_test = is_integration_test
self.options_list = self._parse_test_option_args(argv)
http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 9018a49..d6f56d2 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -601,31 +601,35 @@ class CallableWrapperPartitionFn(PartitionFn):
class ParDo(PTransformWithSideInputs):
- """A ParDo transform.
+ """A :class:`ParDo` transform.
- Processes an input PCollection by applying a DoFn to each element and
- returning the accumulated results into an output PCollection. The type of the
- elements is not fixed as long as the DoFn can deal with it. In reality
- the type is restrained to some extent because the elements sometimes must be
- persisted to external storage. See the expand() method comments for a detailed
- description of all possible arguments.
+ Processes an input :class:`~apache_beam.pvalue.PCollection` by applying a
+ :class:`DoFn` to each element and returning the accumulated results into an
+ output :class:`~apache_beam.pvalue.PCollection`. The type of the elements is
+ not fixed as long as the :class:`DoFn` can deal with it. In reality the type
+ is restrained to some extent because the elements sometimes must be persisted
+ to external storage. See the :meth:`.expand()` method comments for a
+ detailed description of all possible arguments.
- Note that the DoFn must return an iterable for each element of the input
- PCollection. An easy way to do this is to use the yield keyword in the
- process method.
+ Note that the :class:`DoFn` must return an iterable for each element of the
+ input :class:`~apache_beam.pvalue.PCollection`. An easy way to do this is to
+ use the ``yield`` keyword in the process method.
Args:
- pcoll: a PCollection to be processed.
- fn: a DoFn object to be applied to each element of pcoll argument.
- *args: positional arguments passed to the dofn object.
- **kwargs: keyword arguments passed to the dofn object.
+ pcoll (~apache_beam.pvalue.PCollection):
+ a :class:`~apache_beam.pvalue.PCollection` to be processed.
+ fn (DoFn): a :class:`DoFn` object to be applied to each element
+ of **pcoll** argument.
+ *args: positional arguments passed to the :class:`DoFn` object.
+ **kwargs: keyword arguments passed to the :class:`DoFn` object.
Note that the positional and keyword arguments will be processed in order
- to detect PCollections that will be computed as side inputs to the
- transform. During pipeline execution whenever the DoFn object gets executed
- (its apply() method gets called) the PCollection arguments will be replaced
- by values from the PCollection in the exact positions where they appear in
- the argument lists.
+ to detect :class:`~apache_beam.pvalue.PCollection` s that will be computed as
+ side inputs to the transform. During pipeline execution whenever the
+ :class:`DoFn` object gets executed (its :meth:`DoFn.process()` method gets
+ called) the :class:`~apache_beam.pvalue.PCollection` arguments will be
+ replaced by values from the :class:`~apache_beam.pvalue.PCollection` in the
+ exact positions where they appear in the argument lists.
"""
def __init__(self, fn, *args, **kwargs):
@@ -665,27 +669,34 @@ class ParDo(PTransformWithSideInputs):
return pvalue.PCollection(pcoll.pipeline)
def with_outputs(self, *tags, **main_kw):
- """Returns a tagged tuple allowing access to the outputs of a ParDo.
+ """Returns a tagged tuple allowing access to the outputs of a
+ :class:`ParDo`.
The resulting object supports access to the
- PCollection associated with a tag (e.g., o.tag, o[tag]) and iterating over
- the available tags (e.g., for tag in o: ...).
+ :class:`~apache_beam.pvalue.PCollection` associated with a tag
+ (e.g. ``o.tag``, ``o[tag]``) and iterating over the available tags
+ (e.g. ``for tag in o: ...``).
Args:
*tags: if non-empty, list of valid tags. If a list of valid tags is given,
it will be an error to use an undeclared tag later in the pipeline.
- **main_kw: dictionary empty or with one key 'main' defining the tag to be
- used for the main output (which will not have a tag associated with it).
+ **main_kw: dictionary empty or with one key ``'main'`` defining the tag to
+ be used for the main output (which will not have a tag associated with
+ it).
Returns:
- An object of type DoOutputsTuple that bundles together all the outputs
- of a ParDo transform and allows accessing the individual
- PCollections for each output using an object.tag syntax.
+ ~apache_beam.pvalue.DoOutputsTuple: An object of type
+ :class:`~apache_beam.pvalue.DoOutputsTuple` that bundles together all
+ the outputs of a :class:`ParDo` transform and allows accessing the
+ individual :class:`~apache_beam.pvalue.PCollection` s for each output
+ using an ``object.tag`` syntax.
Raises:
- TypeError: if the self object is not a PCollection that is the result of
- a ParDo transform.
- ValueError: if main_kw contains any key other than 'main'.
+ ~exceptions.TypeError: if the **self** object is not a
+ :class:`~apache_beam.pvalue.PCollection` that is the result of a
+ :class:`ParDo` transform.
+ ~exceptions.ValueError: if **main_kw** contains any key other than
+ ``'main'``.
"""
main_tag = main_kw.pop('main', None)
if main_kw:
@@ -739,24 +750,27 @@ class _MultiParDo(PTransform):
def FlatMap(fn, *args, **kwargs): # pylint: disable=invalid-name
- """FlatMap is like ParDo except it takes a callable to specify the
- transformation.
+ """:func:`FlatMap` is like :class:`ParDo` except it takes a callable to
+ specify the transformation.
The callable must return an iterable for each element of the input
- PCollection. The elements of these iterables will be flattened into
- the output PCollection.
+ :class:`~apache_beam.pvalue.PCollection`. The elements of these iterables will
+ be flattened into the output :class:`~apache_beam.pvalue.PCollection`.
Args:
- fn: a callable object.
+ fn (callable): a callable object.
*args: positional arguments passed to the transform callable.
**kwargs: keyword arguments passed to the transform callable.
Returns:
- A PCollection containing the Map outputs.
+ ~apache_beam.pvalue.PCollection:
+ A :class:`~apache_beam.pvalue.PCollection` containing the
+ :func:`FlatMap` outputs.
Raises:
- TypeError: If the fn passed as argument is not a callable. Typical error
- is to pass a DoFn instance which is supported only for ParDo.
+ ~exceptions.TypeError: If the **fn** passed as argument is not a callable.
+ Typical error is to pass a :class:`DoFn` instance which is supported only
+ for :class:`ParDo`.
"""
label = 'FlatMap(%s)' % ptransform.label_from_callable(fn)
if not callable(fn):
@@ -770,19 +784,23 @@ def FlatMap(fn, *args, **kwargs): # pylint: disable=invalid-name
def Map(fn, *args, **kwargs): # pylint: disable=invalid-name
- """Map is like FlatMap except its callable returns only a single element.
+ """:func:`Map` is like :func:`FlatMap` except its callable returns only a
+ single element.
Args:
- fn: a callable object.
+ fn (callable): a callable object.
*args: positional arguments passed to the transform callable.
**kwargs: keyword arguments passed to the transform callable.
Returns:
- A PCollection containing the Map outputs.
+ ~apache_beam.pvalue.PCollection:
+ A :class:`~apache_beam.pvalue.PCollection` containing the
+ :func:`Map` outputs.
Raises:
- TypeError: If the fn passed as argument is not a callable. Typical error
- is to pass a DoFn instance which is supported only for ParDo.
+ ~exceptions.TypeError: If the **fn** passed as argument is not a callable.
+ Typical error is to pass a :class:`DoFn` instance which is supported only
+ for :class:`ParDo`.
"""
if not callable(fn):
raise TypeError(
@@ -815,19 +833,23 @@ def Map(fn, *args, **kwargs): # pylint: disable=invalid-name
def Filter(fn, *args, **kwargs): # pylint: disable=invalid-name
- """Filter is a FlatMap with its callable filtering out elements.
+ """:func:`Filter` is a :func:`FlatMap` with its callable filtering out
+ elements.
Args:
- fn: a callable object.
+ fn (callable): a callable object.
*args: positional arguments passed to the transform callable.
**kwargs: keyword arguments passed to the transform callable.
Returns:
- A PCollection containing the Filter outputs.
+ ~apache_beam.pvalue.PCollection:
+ A :class:`~apache_beam.pvalue.PCollection` containing the
+ :func:`Filter` outputs.
Raises:
- TypeError: If the fn passed as argument is not a callable. Typical error
- is to pass a DoFn instance which is supported only for FlatMap.
+ ~exceptions.TypeError: If the **fn** passed as argument is not a callable.
+ Typical error is to pass a :class:`DoFn` instance which is supported only
+ for :class:`ParDo`.
"""
if not callable(fn):
raise TypeError(
@@ -867,35 +889,42 @@ def _combine_payload(combine_fn, context):
class CombineGlobally(PTransform):
- """A CombineGlobally transform.
+ """A :class:`CombineGlobally` transform.
- Reduces a PCollection to a single value by progressively applying a CombineFn
- to portions of the PCollection (and to intermediate values created thereby).
- See documentation in CombineFn for details on the specifics on how CombineFns
- are applied.
+ Reduces a :class:`~apache_beam.pvalue.PCollection` to a single value by
+ progressively applying a :class:`CombineFn` to portions of the
+ :class:`~apache_beam.pvalue.PCollection` (and to intermediate values created
+ thereby). See documentation in :class:`CombineFn` for details on the specifics
+ on how :class:`CombineFn` s are applied.
Args:
- pcoll: a PCollection to be reduced into a single value.
- fn: a CombineFn object that will be called to progressively reduce the
- PCollection into single values, or a callable suitable for wrapping
- by CallableWrapperCombineFn.
- *args: positional arguments passed to the CombineFn object.
- **kwargs: keyword arguments passed to the CombineFn object.
+ pcoll (~apache_beam.pvalue.PCollection):
+ a :class:`~apache_beam.pvalue.PCollection` to be reduced into a single
+ value.
+ fn (callable): a :class:`CombineFn` object that will be called to
+ progressively reduce the :class:`~apache_beam.pvalue.PCollection` into
+ single values, or a callable suitable for wrapping by
+ :class:`~apache_beam.transforms.core.CallableWrapperCombineFn`.
+ *args: positional arguments passed to the :class:`CombineFn` object.
+ **kwargs: keyword arguments passed to the :class:`CombineFn` object.
Raises:
- TypeError: If the output type of the input PCollection is not compatible
- with Iterable[A].
+ ~exceptions.TypeError: If the output type of the input
+ :class:`~apache_beam.pvalue.PCollection` is not compatible
+ with ``Iterable[A]``.
Returns:
- A single-element PCollection containing the main output of the Combine
- transform.
+ ~apache_beam.pvalue.PCollection: A single-element
+ :class:`~apache_beam.pvalue.PCollection` containing the main output of
+ the :class:`CombineGlobally` transform.
Note that the positional and keyword arguments will be processed in order
- to detect PObjects that will be computed as side inputs to the transform.
- During pipeline execution whenever the CombineFn object gets executed (i.e.,
- any of the CombineFn methods get called), the PObject arguments will be
- replaced by their actual value in the exact position where they appear in
- the argument lists.
+ to detect :class:`~apache_beam.pvalue.PValue` s that will be computed as side
+ inputs to the transform.
+ During pipeline execution whenever the :class:`CombineFn` object gets executed
+ (i.e. any of the :class:`CombineFn` methods get called), the
+ :class:`~apache_beam.pvalue.PValue` arguments will be replaced by their
+ actual value in the exact position where they appear in the argument lists.
"""
has_defaults = True
as_view = False