You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/08/12 00:02:50 UTC

[2/3] beam git commit: Makes Python API reference generation more strict

Makes Python API reference generation more strict


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e1baf55d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e1baf55d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e1baf55d

Branch: refs/heads/master
Commit: e1baf55d82fcc4a3951057b2321f77319d88b6c3
Parents: d035a34
Author: David Cavazos <dc...@google.com>
Authored: Fri Jul 21 09:58:11 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Fri Aug 11 17:01:34 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/__init__.py             |  47 ++--
 .../apache_beam/internal/gcp/json_value.py      |  45 ++--
 sdks/python/apache_beam/io/avroio.py            |  79 ++++--
 sdks/python/apache_beam/io/filebasedsink.py     |  18 +-
 sdks/python/apache_beam/io/filebasedsource.py   |  67 +++--
 sdks/python/apache_beam/io/filesystem.py        |  27 +-
 sdks/python/apache_beam/io/gcp/bigquery.py      | 257 +++++++++++--------
 sdks/python/apache_beam/io/gcp/gcsio.py         |  12 +-
 sdks/python/apache_beam/io/range_trackers.py    |  12 +-
 sdks/python/apache_beam/io/source_test_utils.py |  88 ++++---
 sdks/python/apache_beam/io/textio.py            | 121 +++++----
 sdks/python/apache_beam/pipeline.py             |  89 ++++---
 sdks/python/apache_beam/runners/runner.py       |  31 ++-
 .../python/apache_beam/testing/test_pipeline.py |  48 ++--
 sdks/python/apache_beam/transforms/core.py      | 165 +++++++-----
 sdks/python/apache_beam/transforms/display.py   |  87 ++++---
 .../python/apache_beam/transforms/ptransform.py |  61 +++--
 sdks/python/apache_beam/typehints/decorators.py | 104 +++++---
 .../typehints/native_type_compatibility.py      |   7 +-
 sdks/python/apache_beam/typehints/typehints.py  |  38 +--
 sdks/python/generate_pydoc.sh                   | 134 ++++++++--
 sdks/python/tox.ini                             |   1 +
 22 files changed, 953 insertions(+), 585 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/__init__.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py
index 8b772c9..791ebb7 100644
--- a/sdks/python/apache_beam/__init__.py
+++ b/sdks/python/apache_beam/__init__.py
@@ -15,11 +15,12 @@
 # limitations under the License.
 #
 
-"""Apache Beam SDK for Python.
+"""
+Apache Beam SDK for Python
+==========================
 
-Apache Beam <https://beam.apache.org/>
-provides a simple, powerful programming model for building both batch
-and streaming parallel data processing pipelines.
+`Apache Beam <https://beam.apache.org>`_ provides a simple, powerful programming
+model for building both batch and streaming parallel data processing pipelines.
 
 The Apache Beam SDK for Python provides access to Apache Beam capabilities
 from the Python programming language.
@@ -33,32 +34,40 @@ Overview
 --------
 The key concepts in this programming model are
 
-* PCollection:  represents a collection of data, which could be
-  bounded or unbounded in size.
-* PTransform:  represents a computation that transforms input
-  PCollections into output PCollections.
-* Pipeline:  manages a directed acyclic graph of PTransforms and
-  PCollections that is ready for execution.
-* Runner:  specifies where and how the Pipeline should execute.
-* Reading and Writing Data:  your pipeline can read from an external
-  source and write to an external data sink.
+* :class:`~apache_beam.pvalue.PCollection`: represents a collection of data,
+  which could be bounded or unbounded in size.
+* :class:`~apache_beam.transforms.ptransform.PTransform`: represents a
+  computation that transforms input PCollections into output PCollections.
+* :class:`~apache_beam.pipeline.Pipeline`: manages a directed acyclic graph of
+  :class:`~apache_beam.transforms.ptransform.PTransform` s and
+  :class:`~apache_beam.pvalue.PCollection` s that is ready for execution.
+* :class:`~apache_beam.runners.runner.PipelineRunner`: specifies where and how
+  the pipeline should execute.
+* :class:`~apache_beam.io.iobase.Read`: read from an external source.
+* :class:`~apache_beam.io.iobase.Write`: write to an external data sink.
 
 Typical usage
 -------------
 At the top of your source file::
 
-    import apache_beam as beam
+  import apache_beam as beam
 
 After this import statement
 
-* transform classes are available as beam.FlatMap, beam.GroupByKey, etc.
-* Pipeline class is available as beam.Pipeline
-* text read/write transforms are available as beam.io.ReadfromText,
-  beam.io.WriteToText
+* Transform classes are available as
+  :class:`beam.FlatMap <apache_beam.transforms.core.FlatMap>`,
+  :class:`beam.GroupByKey <apache_beam.transforms.core.GroupByKey>`, etc.
+* Pipeline class is available as
+  :class:`beam.Pipeline <apache_beam.pipeline.Pipeline>`
+* Text read/write transforms are available as
+  :class:`beam.io.ReadFromText <apache_beam.io.textio.ReadFromText>`,
+  :class:`beam.io.WriteToText <apache_beam.io.textio.WriteToText>`.
 
 Examples
 --------
-The examples subdirectory has some examples.
+The `examples subdirectory
+<https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples>`_
+has some examples.
 
 """
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/internal/gcp/json_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py b/sdks/python/apache_beam/internal/gcp/json_value.py
index 59f8b60..167b173 100644
--- a/sdks/python/apache_beam/internal/gcp/json_value.py
+++ b/sdks/python/apache_beam/internal/gcp/json_value.py
@@ -41,11 +41,12 @@ def get_typed_value_descriptor(obj):
     obj: A basestring, bool, int, or float to be converted.
 
   Returns:
-    A dictionary containing the keys '@type' and 'value' with the value for
-    the @type of appropriate type.
+    A dictionary containing the keys ``@type`` and ``value`` with the value for
+    the ``@type`` of appropriate type.
 
   Raises:
-    TypeError: if the Python object has a type that is not supported.
+    ~exceptions.TypeError: if the Python object has a type that is not
+      supported.
   """
   if isinstance(obj, basestring):
     type_name = 'Text'
@@ -66,21 +67,23 @@ def to_json_value(obj, with_type=False):
   Converts Python objects into extra_types.JsonValue objects.
 
   Args:
-    obj: Python object to be converted. Can be 'None'.
-    with_type: If true then the basic types (string, int, float, bool) will
-      be wrapped in @type/value dictionaries. Otherwise the straight value is
-      encoded into a JsonValue.
+    obj: Python object to be converted. Can be :data:`None`.
+    with_type: If true then the basic types (``string``, ``int``, ``float``,
+      ``bool``) will be wrapped in ``@type:value`` dictionaries. Otherwise the
+      straight value is encoded into a ``JsonValue``.
 
   Returns:
-    A JsonValue object using JsonValue, JsonArray and JsonObject types for the
-    corresponding values, lists, or dictionaries.
+    A ``JsonValue`` object using ``JsonValue``, ``JsonArray`` and ``JsonObject``
+    types for the corresponding values, lists, or dictionaries.
 
   Raises:
-    TypeError: if the Python object contains a type that is not supported.
+    ~exceptions.TypeError: if the Python object contains a type that is not
+      supported.
 
-  The types supported are str, bool, list, tuple, dict, and None. The Dataflow
-  API requires JsonValue(s) in many places, and it is quite convenient to be
-  able to specify these hierarchical objects using Python syntax.
+  The types supported are ``str``, ``bool``, ``list``, ``tuple``, ``dict``, and
+  ``None``. The Dataflow API requires JsonValue(s) in many places, and it is
+  quite convenient to be able to specify these hierarchical objects using
+  Python syntax.
   """
   if obj is None:
     return extra_types.JsonValue(is_null=True)
@@ -121,21 +124,23 @@ def to_json_value(obj, with_type=False):
 def from_json_value(v):
   """For internal use only; no backwards-compatibility guarantees.
 
-  Converts extra_types.JsonValue objects into Python objects.
+  Converts ``extra_types.JsonValue`` objects into Python objects.
 
   Args:
-    v: JsonValue object to be converted.
+    v: ``JsonValue`` object to be converted.
 
   Returns:
     A Python object structured as values, lists, and dictionaries corresponding
-    to JsonValue, JsonArray and JsonObject types.
+    to ``JsonValue``, ``JsonArray`` and ``JsonObject`` types.
 
   Raises:
-    TypeError: if the JsonValue object contains a type that is not supported.
+    ~exceptions.TypeError: if the ``JsonValue`` object contains a type that is
+      not supported.
 
-  The types supported are str, bool, list, dict, and None. The Dataflow API
-  returns JsonValue(s) in many places and it is quite convenient to be able to
-  convert these hierarchical objects to much simpler Python objects.
+  The types supported are ``str``, ``bool``, ``list``, ``dict``, and ``None``.
+  The Dataflow API returns JsonValue(s) in many places and it is quite
+  convenient to be able to convert these hierarchical objects to much simpler
+  Python objects.
   """
   if isinstance(v, extra_types.JsonValue):
     if v.string_value is not None:

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/avroio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py
index 47ea282..cb14c65 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -64,27 +64,74 @@ __all__ = ['ReadFromAvro', 'ReadAllFromAvro', 'WriteToAvro']
 
 
 class ReadFromAvro(PTransform):
-  """A ``PTransform`` for reading Avro files.
+  """A :class:`~apache_beam.transforms.ptransform.PTransform` for reading avro
+  files."""
 
-  Uses source '_AvroSource' to read a set of Avro files defined by a given
-  file pattern.
-  If '/mypath/myavrofiles*' is a file-pattern that points to a set of Avro
-  files, a ``PCollection`` for the records in these Avro files can be created
-  in the following manner.
+  def __init__(self, file_pattern=None, min_bundle_size=0, validate=True):
+    """Initializes :class:`ReadFromAvro`.
 
-  p = df.Pipeline(argv=pipeline_args)
-  records = p | 'Read' >> df.io.ReadFromAvro('/mypath/myavrofiles*')
-  """
+    Uses source :class:`~apache_beam.io._AvroSource` to read a set of Avro
+    files defined by a given file pattern.
 
-  def __init__(self, file_pattern=None, min_bundle_size=0, validate=True):
-    """Initializes ``ReadFromAvro``.
+    If ``/mypath/myavrofiles*`` is a file-pattern that points to a set of Avro
+    files, a :class:`~apache_beam.pvalue.PCollection` for the records in
+    these Avro files can be created in the following manner.
+
+    .. testcode::
+
+      with beam.Pipeline() as p:
+        records = p | 'Read' >> beam.io.ReadFromAvro('/mypath/myavrofiles*')
+
+    .. NOTE: We're not actually interested in this error; but if we get here,
+       it means that the way of calling this transform hasn't changed.
+
+    .. testoutput::
+      :hide:
+
+      Traceback (most recent call last):
+       ...
+      IOError: No files found based on the file pattern
+
+    Each record of this :class:`~apache_beam.pvalue.PCollection` will contain
+    a single record read from a source. Records that are of simple types will be
+    mapped into corresponding Python types. Records that are of Avro type
+    ``RECORD`` will be mapped to Python dictionaries that comply with the schema
+    contained in the Avro file that contains those records. In this case, keys
+    of each dictionary will contain the corresponding field names and will be of
+    type :class:`str` while the values of the dictionary will be of the type
+    defined in the corresponding Avro schema.
+
+    For example, if schema of the Avro file is the following. ::
+
+      {
+        "namespace": "example.avro",
+        "type": "record",
+        "name": "User",
+        "fields": [
+
+          {"name": "name",
+           "type": "string"},
+
+          {"name": "favorite_number",
+           "type": ["int", "null"]},
+
+          {"name": "favorite_color",
+           "type": ["string", "null"]}
+
+        ]
+      }
+
+    Then records generated by :class:`~apache_beam.io._AvroSource` will be
+    dictionaries of the following form. ::
+
+      {u'name': u'Alyssa', u'favorite_number': 256, u'favorite_color': None}).
 
     Args:
-      file_pattern: the set of files to be read.
-      min_bundle_size: the minimum size in bytes, to be considered when
-                       splitting the input into bundles.
-      validate: flag to verify that the files exist during the pipeline
-                creation time.
+      file_pattern (str): the file glob to read
+      min_bundle_size (int): the minimum size in bytes, to be considered when
+        splitting the input into bundles.
+      validate (bool): flag to verify that the files exist during the pipeline
+        creation time.
     """
     super(ReadFromAvro, self).__init__()
     self._source = _AvroSource(file_pattern, min_bundle_size, validate=validate)

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/filebasedsink.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsink.py b/sdks/python/apache_beam/io/filebasedsink.py
index 76c09fc..eb99d08 100644
--- a/sdks/python/apache_beam/io/filebasedsink.py
+++ b/sdks/python/apache_beam/io/filebasedsink.py
@@ -44,12 +44,13 @@ class FileBasedSink(iobase.Sink):
   """A sink to a GCS or local files.
 
   To implement a file-based sink, extend this class and override
-  either ``write_record()`` or ``write_encoded_record()``.
+  either :meth:`.write_record()` or :meth:`.write_encoded_record()`.
 
-  If needed, also overwrite ``open()`` and/or ``close()`` to customize the
-  file handling or write headers and footers.
+  If needed, also overwrite :meth:`.open()` and/or :meth:`.close()` to customize
+  the file handling or write headers and footers.
 
-  The output of this write is a PCollection of all written shards.
+  The output of this write is a :class:`~apache_beam.pvalue.PCollection` of
+  all written shards.
   """
 
   # Max number of threads to be used for renaming.
@@ -65,9 +66,12 @@ class FileBasedSink(iobase.Sink):
                compression_type=CompressionTypes.AUTO):
     """
      Raises:
-      TypeError: if file path parameters are not a string or ValueProvider,
-                 or if compression_type is not member of CompressionTypes.
-      ValueError: if shard_name_template is not of expected format.
+      ~exceptions.TypeError: if file path parameters are not a :class:`str` or
+        :class:`~apache_beam.options.value_provider.ValueProvider`, or if
+        **compression_type** is not member of
+        :class:`~apache_beam.io.filesystem.CompressionTypes`.
+      ~exceptions.ValueError: if **shard_name_template** is not of expected
+        format.
     """
     if not isinstance(file_path_prefix, (basestring, ValueProvider)):
       raise TypeError('file_path_prefix must be a string or ValueProvider;'

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index f78bf3f..6496930 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -17,12 +17,13 @@
 
 """A framework for developing sources for new file types.
 
-To create a source for a new file type a sub-class of ``FileBasedSource`` should
-be created. Sub-classes of ``FileBasedSource`` must implement the method
-``FileBasedSource.read_records()``. Please read the documentation of that method
-for more details.
+To create a source for a new file type a sub-class of :class:`FileBasedSource`
+should be created. Sub-classes of :class:`FileBasedSource` must implement the
+method :meth:`FileBasedSource.read_records()`. Please read the documentation of
+that method for more details.
 
-For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
+For an example implementation of :class:`FileBasedSource` see
+:class:`~apache_beam.io._AvroSource`.
 """
 import uuid
 
@@ -51,7 +52,8 @@ __all__ = ['FileBasedSource']
 
 
 class FileBasedSource(iobase.BoundedSource):
-  """A ``BoundedSource`` for reading a file glob of a given type."""
+  """A :class:`~apache_beam.io.iobase.BoundedSource` for reading a file glob of
+  a given type."""
 
   MIN_NUMBER_OF_FILES_TO_STAT = 100
   MIN_FRACTION_OF_FILES_TO_STAT = 0.01
@@ -62,31 +64,40 @@ class FileBasedSource(iobase.BoundedSource):
                compression_type=CompressionTypes.AUTO,
                splittable=True,
                validate=True):
-    """Initializes ``FileBasedSource``.
+    """Initializes :class:`FileBasedSource`.
 
     Args:
-      file_pattern: the file glob to read a string or a ValueProvider
-                    (placeholder to inject a runtime value).
-      min_bundle_size: minimum size of bundles that should be generated when
-                       performing initial splitting on this source.
-      compression_type: compression type to use
-      splittable: whether FileBasedSource should try to logically split a single
-                  file into data ranges so that different parts of the same file
-                  can be read in parallel. If set to False, FileBasedSource will
-                  prevent both initial and dynamic splitting of sources for
-                  single files. File patterns that represent multiple files may
-                  still get split into sources for individual files. Even if set
-                  to True by the user, FileBasedSource may choose to not split
-                  the file, for example, for compressed files where currently
-                  it is not possible to efficiently read a data range without
-                  decompressing the whole file.
-      validate: Boolean flag to verify that the files exist during the pipeline
-                creation time.
+      file_pattern (str): the file glob to read a string or a
+        :class:`~apache_beam.options.value_provider.ValueProvider`
+        (placeholder to inject a runtime value).
+      min_bundle_size (str): minimum size of bundles that should be generated
+        when performing initial splitting on this source.
+      compression_type (str): Used to handle compressed output files.
+        Typical value is :attr:`CompressionTypes.AUTO
+        <apache_beam.io.filesystem.CompressionTypes.AUTO>`,
+        in which case the final file path's extension will be used to detect
+        the compression.
+      splittable (bool): whether :class:`FileBasedSource` should try to
+        logically split a single file into data ranges so that different parts
+        of the same file can be read in parallel. If set to :data:`False`,
+        :class:`FileBasedSource` will prevent both initial and dynamic splitting
+        of sources for single files. File patterns that represent multiple files
+        may still get split into sources for individual files. Even if set to
+        :data:`True` by the user, :class:`FileBasedSource` may choose to not
+        split the file, for example, for compressed files where currently it is
+        not possible to efficiently read a data range without decompressing the
+        whole file.
+      validate (bool): Boolean flag to verify that the files exist during the
+        pipeline creation time.
+
     Raises:
-      TypeError: when compression_type is not valid or if file_pattern is not a
-                 string or a ValueProvider.
-      ValueError: when compression and splittable files are specified.
-      IOError: when the file pattern specified yields an empty result.
+      ~exceptions.TypeError: when **compression_type** is not valid or if
+        **file_pattern** is not a :class:`str` or a
+        :class:`~apache_beam.options.value_provider.ValueProvider`.
+      ~exceptions.ValueError: when compression and splittable files are
+        specified.
+      ~exceptions.IOError: when the file pattern specified yields an empty
+        result.
     """
 
     if not isinstance(file_pattern, (basestring, ValueProvider)):

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/filesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index ef3040c..5804d00 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -299,23 +299,28 @@ class CompressedFile(object):
     """Set the file's current offset.
 
     Seeking behavior:
-      * seeking from the end (SEEK_END) the whole file is decompressed once to
-        determine it's size. Therefore it is preferred to use
-        SEEK_SET or SEEK_CUR to avoid the processing overhead
-      * seeking backwards from the current position rewinds the file to 0
+
+      * seeking from the end :data:`os.SEEK_END` the whole file is decompressed
+        once to determine it's size. Therefore it is preferred to use
+        :data:`os.SEEK_SET` or :data:`os.SEEK_CUR` to avoid the processing
+        overhead
+      * seeking backwards from the current position rewinds the file to ``0``
         and decompresses the chunks to the requested offset
       * seeking is only supported in files opened for reading
-      * if the new offset is out of bound, it is adjusted to either 0 or EOF.
+      * if the new offset is out of bound, it is adjusted to either ``0`` or
+        ``EOF``.
 
     Args:
-      offset: seek offset in the uncompressed content represented as number
-      whence: seek mode. Supported modes are os.SEEK_SET (absolute seek),
-        os.SEEK_CUR (seek relative to the current position), and os.SEEK_END
-        (seek relative to the end, offset should be negative).
+      offset (int): seek offset in the uncompressed content represented as
+        number
+      whence (int): seek mode. Supported modes are :data:`os.SEEK_SET`
+        (absolute seek), :data:`os.SEEK_CUR` (seek relative to the current
+        position), and :data:`os.SEEK_END` (seek relative to the end, offset
+        should be negative).
 
     Raises:
-      IOError: When this buffer is closed.
-      ValueError: When whence is invalid or the file is not seekable
+      ~exceptions.IOError: When this buffer is closed.
+      ~exceptions.ValueError: When whence is invalid or the file is not seekable
     """
     if whence == os.SEEK_SET:
       absolute_offset = offset

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index db6715a..33d67bf 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -330,45 +330,49 @@ class BigQuerySource(dataflow_io.NativeSource):
   def __init__(self, table=None, dataset=None, project=None, query=None,
                validate=False, coder=None, use_standard_sql=False,
                flatten_results=True):
-    """Initialize a BigQuerySource.
+    """Initialize a :class:`BigQuerySource`.
 
     Args:
-      table: The ID of a BigQuery table. If specified all data of the table
-        will be used as input of the current source. The ID must contain only
-        letters (a-z, A-Z), numbers (0-9), or underscores (_). If dataset
-        and query arguments are None then the table argument must contain the
-        entire table reference specified as: 'DATASET.TABLE' or
-        'PROJECT:DATASET.TABLE'.
-      dataset: The ID of the dataset containing this table or null if the table
-        reference is specified entirely by the table argument or a query is
-        specified.
-      project: The ID of the project containing this table or null if the table
-        reference is specified entirely by the table argument or a query is
-        specified.
-      query: A query to be used instead of arguments table, dataset, and
+      table (str): The ID of a BigQuery table. If specified all data of the
+        table will be used as input of the current source. The ID must contain
+        only letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores
+        ``_``. If dataset and query arguments are :data:`None` then the table
+        argument must contain the entire table reference specified as:
+        ``'DATASET.TABLE'`` or ``'PROJECT:DATASET.TABLE'``.
+      dataset (str): The ID of the dataset containing this table or
+        :data:`None` if the table reference is specified entirely by the table
+        argument or a query is specified.
+      project (str): The ID of the project containing this table or
+        :data:`None` if the table reference is specified entirely by the table
+        argument or a query is specified.
+      query (str): A query to be used instead of arguments table, dataset, and
         project.
-      validate: If true, various checks will be done when source gets
-        initialized (e.g., is table present?). This should be True for most
-        scenarios in order to catch errors as early as possible (pipeline
-        construction instead of pipeline execution). It should be False if the
-        table is created during pipeline execution by a previous step.
-      coder: The coder for the table rows if serialized to disk. If None, then
-        the default coder is RowAsDictJsonCoder, which will interpret every line
-        in a file as a JSON serialized dictionary. This argument needs a value
-        only in special cases when returning table rows as dictionaries is not
-        desirable.
-      use_standard_sql: Specifies whether to use BigQuery's standard
-        SQL dialect for this query. The default value is False. If set to True,
-        the query will use BigQuery's updated SQL dialect with improved
-        standards compliance. This parameter is ignored for table inputs.
-      flatten_results: Flattens all nested and repeated fields in the
-        query results. The default value is true.
+      validate (bool): If :data:`True`, various checks will be done when source
+        gets initialized (e.g., is table present?). This should be
+        :data:`True` for most scenarios in order to catch errors as early as
+        possible (pipeline construction instead of pipeline execution). It
+        should be :data:`False` if the table is created during pipeline
+        execution by a previous step.
+      coder (~apache_beam.coders.coders.Coder): The coder for the table
+        rows if serialized to disk. If :data:`None`, then the default coder is
+        :class:`~apache_beam.io.gcp.bigquery.RowAsDictJsonCoder`,
+        which will interpret every line in a file as a JSON serialized
+        dictionary. This argument needs a value only in special cases when
+        returning table rows as dictionaries is not desirable.
+      use_standard_sql (bool): Specifies whether to use BigQuery's standard SQL
+        dialect for this query. The default value is :data:`False`.
+        If set to :data:`True`, the query will use BigQuery's updated SQL
+        dialect with improved standards compliance.
+        This parameter is ignored for table inputs.
+      flatten_results (bool): Flattens all nested and repeated fields in the
+        query results. The default value is :data:`True`.
 
     Raises:
-      ValueError: if any of the following is true
-      (1) the table reference as a string does not match the expected format
-      (2) neither a table nor a query is specified
-      (3) both a table and a query is specified.
+      ~exceptions.ValueError: if any of the following is true:
+
+        1) the table reference as a string does not match the expected format
+        2) neither a table nor a query is specified
+        3) both a table and a query is specified.
     """
 
     # Import here to avoid adding the dependency for local running scenarios.
@@ -439,46 +443,62 @@ class BigQuerySink(dataflow_io.NativeSink):
     """Initialize a BigQuerySink.
 
     Args:
-      table: The ID of the table. The ID must contain only letters
-        (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
-        None then the table argument must contain the entire table reference
-        specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
-      dataset: The ID of the dataset containing this table or null if the table
-        reference is specified entirely by the table argument.
-      project: The ID of the project containing this table or null if the table
-        reference is specified entirely by the table argument.
-      schema: The schema to be used if the BigQuery table to write has to be
-        created. This can be either specified as a 'bigquery.TableSchema' object
-        or a single string  of the form 'field1:type1,field2:type2,field3:type3'
-        that defines a comma separated list of fields. Here 'type' should
-        specify the BigQuery type of the field. Single string based schemas do
-        not support nested fields, repeated fields, or specifying a BigQuery
-        mode for fields (mode will always be set to 'NULLABLE').
-      create_disposition: A string describing what happens if the table does not
-        exist. Possible values are:
-        - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
-        - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
-      write_disposition: A string describing what happens if the table has
-        already some data. Possible values are:
-        -  BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
-        -  BigQueryDisposition.WRITE_APPEND: add to existing rows.
-        -  BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
-      validate: If true, various checks will be done when sink gets
-        initialized (e.g., is table present given the disposition arguments?).
-        This should be True for most scenarios in order to catch errors as early
-        as possible (pipeline construction instead of pipeline execution). It
-        should be False if the table is created during pipeline execution by a
-        previous step.
-      coder: The coder for the table rows if serialized to disk. If None, then
-        the default coder is RowAsDictJsonCoder, which will interpret every
-        element written to the sink as a dictionary that will be JSON serialized
-        as a line in a file. This argument needs a value only in special cases
-        when writing table rows as dictionaries is not desirable.
+      table (str): The ID of the table. The ID must contain only letters
+        ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If
+        **dataset** argument is :data:`None` then the table argument must
+        contain the entire table reference specified as: ``'DATASET.TABLE'`` or
+        ``'PROJECT:DATASET.TABLE'``.
+      dataset (str): The ID of the dataset containing this table or
+        :data:`None` if the table reference is specified entirely by the table
+        argument.
+      project (str): The ID of the project containing this table or
+        :data:`None` if the table reference is specified entirely by the table
+        argument.
+      schema (str): The schema to be used if the BigQuery table to write has
+        to be created. This can be either specified as a
+        :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` object or a single string  of the form
+        ``'field1:type1,field2:type2,field3:type3'`` that defines a comma
+        separated list of fields. Here ``'type'`` should specify the BigQuery
+        type of the field. Single string based schemas do not support nested
+        fields, repeated fields, or specifying a BigQuery mode for fields (mode
+        will always be set to ``'NULLABLE'``).
+      create_disposition (BigQueryDisposition): A string describing what
+        happens if the table does not exist. Possible values are:
+
+          * :attr:`BigQueryDisposition.CREATE_IF_NEEDED`: create if does not
+            exist.
+          * :attr:`BigQueryDisposition.CREATE_NEVER`: fail the write if does not
+            exist.
+
+      write_disposition (BigQueryDisposition): A string describing what
+        happens if the table has already some data. Possible values are:
+
+          * :attr:`BigQueryDisposition.WRITE_TRUNCATE`: delete existing rows.
+          * :attr:`BigQueryDisposition.WRITE_APPEND`: add to existing rows.
+          * :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not
+            empty.
+
+      validate (bool): If :data:`True`, various checks will be done when sink
+        gets initialized (e.g., is table present given the disposition
+        arguments?). This should be :data:`True` for most scenarios in order to
+        catch errors as early as possible (pipeline construction instead of
+        pipeline execution). It should be :data:`False` if the table is created
+        during pipeline execution by a previous step.
+      coder (~apache_beam.coders.coders.Coder): The coder for the
+        table rows if serialized to disk. If :data:`None`, then the default
+        coder is :class:`~apache_beam.io.gcp.bigquery.RowAsDictJsonCoder`,
+        which will interpret every element written to the sink as a dictionary
+        that will be JSON serialized as a line in a file. This argument needs a
+        value only in special cases when writing table rows as dictionaries is
+        not desirable.
 
     Raises:
-      TypeError: if the schema argument is not a string or a TableSchema object.
-      ValueError: if the table reference as a string does not match the expected
-      format.
+      ~exceptions.TypeError: if the schema argument is not a :class:`str` or a
+        :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` object.
+      ~exceptions.ValueError: if the table reference as a string does not
+        match the expected format.
     """
 
     # Import here to avoid adding the dependency for local running scenarios.
@@ -1261,32 +1281,47 @@ class WriteToBigQuery(PTransform):
     """Initialize a WriteToBigQuery transform.
 
     Args:
-      table: The ID of the table. The ID must contain only letters
-        (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is
-        None then the table argument must contain the entire table reference
-        specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'.
-      dataset: The ID of the dataset containing this table or null if the table
-        reference is specified entirely by the table argument.
-      project: The ID of the project containing this table or null if the table
-        reference is specified entirely by the table argument.
-      schema: The schema to be used if the BigQuery table to write has to be
-        created. This can be either specified as a 'bigquery.TableSchema' object
-        or a single string  of the form 'field1:type1,field2:type2,field3:type3'
-        that defines a comma separated list of fields. Here 'type' should
-        specify the BigQuery type of the field. Single string based schemas do
-        not support nested fields, repeated fields, or specifying a BigQuery
-        mode for fields (mode will always be set to 'NULLABLE').
-      create_disposition: A string describing what happens if the table does not
-        exist. Possible values are:
-        - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist.
-        - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist.
-      write_disposition: A string describing what happens if the table has
-        already some data. Possible values are:
-        -  BigQueryDisposition.WRITE_TRUNCATE: delete existing rows.
-        -  BigQueryDisposition.WRITE_APPEND: add to existing rows.
-        -  BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty.
+      table (str): The ID of the table. The ID must contain only letters
+        ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``. If dataset
+        argument is :data:`None` then the table argument must contain the
+        entire table reference specified as: ``'DATASET.TABLE'`` or
+        ``'PROJECT:DATASET.TABLE'``.
+      dataset (str): The ID of the dataset containing this table or
+        :data:`None` if the table reference is specified entirely by the table
+        argument.
+      project (str): The ID of the project containing this table or
+        :data:`None` if the table reference is specified entirely by the table
+        argument.
+      schema (str): The schema to be used if the BigQuery table to write has to
+        be created. This can be either specified as a
+        :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema`
+        object or a single string  of the form
+        ``'field1:type1,field2:type2,field3:type3'`` that defines a comma
+        separated list of fields. Here ``'type'`` should specify the BigQuery
+        type of the field. Single string based schemas do not support nested
+        fields, repeated fields, or specifying a BigQuery mode for fields
+        (mode will always be set to ``'NULLABLE'``).
+      create_disposition (BigQueryDisposition): A string describing what
+        happens if the table does not exist. Possible values are:
+
+        * :attr:`BigQueryDisposition.CREATE_IF_NEEDED`: create if does not
+          exist.
+        * :attr:`BigQueryDisposition.CREATE_NEVER`: fail the write if does not
+          exist.
+
+      write_disposition (BigQueryDisposition): A string describing what happens
+        if the table has already some data. Possible values are:
+
+        * :attr:`BigQueryDisposition.WRITE_TRUNCATE`: delete existing rows.
+        * :attr:`BigQueryDisposition.WRITE_APPEND`: add to existing rows.
+        * :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not
+          empty.
+
         For streaming pipelines WriteTruncate can not be used.
-      batch_size: Number of rows to be written to BQ per streaming API insert.
+
+      batch_size (int): Number of rows to be written to BQ per streaming API
+        insert.
       test_client: Override the default bigquery client used for testing.
     """
     self.table_reference = _parse_table_reference(table, dataset, project)
@@ -1300,14 +1335,20 @@ class WriteToBigQuery(PTransform):
 
   @staticmethod
   def get_table_schema_from_string(schema):
-    """Transform the string table schema into a bigquery.TableSchema instance.
+    """Transform the string table schema into a
+    :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` instance.
 
     Args:
-      schema: The sting schema to be used if the BigQuery table to write has
-         to be created.
+      schema (str): The sting schema to be used if the BigQuery table to write
+        has to be created.
+
     Returns:
-      table_schema: The schema to be used if the BigQuery table to write has
-         to be created but in the bigquery.TableSchema format.
+      ~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema:
+      The schema to be used if the BigQuery table to write has to be created
+      but in the :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema` format.
     """
     table_schema = bigquery.TableSchema()
     schema_list = [s.strip() for s in schema.split(',')]
@@ -1349,12 +1390,14 @@ class WriteToBigQuery(PTransform):
     """Transform the table schema into a dictionary instance.
 
     Args:
-      schema: The schema to be used if the BigQuery table to write has to be
-        created. This can either be a dict or string or in the TableSchema
-        format.
+      schema (~apache_beam.io.gcp.internal.clients.bigquery.\
+bigquery_v2_messages.TableSchema):
+        The schema to be used if the BigQuery table to write has to be created.
+        This can either be a dict or string or in the TableSchema format.
+
     Returns:
-      table_schema: The schema to be used if the BigQuery table to write has
-         to be created but in the dictionary format.
+      Dict[str, Any]: The schema to be used if the BigQuery table to write has
+      to be created but in the dictionary format.
     """
     if isinstance(schema, dict):
       return schema

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 643fbc7..ae71a5f 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -137,16 +137,16 @@ class GcsIO(object):
     """Open a GCS file path for reading or writing.
 
     Args:
-      filename: GCS file path in the form gs://<bucket>/<object>.
-      mode: 'r' for reading or 'w' for writing.
-      read_buffer_size: Buffer size to use during read operations.
-      mime_type: Mime type to set for write operations.
+      filename (str): GCS file path in the form ``gs://<bucket>/<object>``.
+      mode (str): ``'r'`` for reading or ``'w'`` for writing.
+      read_buffer_size (int): Buffer size to use during read operations.
+      mime_type (str): Mime type to set for write operations.
 
     Returns:
-      file object.
+      GCS file object.
 
     Raises:
-      ValueError: Invalid open file mode.
+      ~exceptions.ValueError: Invalid open file mode.
     """
     if mode == 'r' or mode == 'rb':
       return GcsBufferedReader(self.client, filename, mode=mode,

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index 4bd19f8..1339b91 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -317,17 +317,19 @@ class OrderedPositionRangeTracker(iobase.RangeTracker):
 class UnsplittableRangeTracker(iobase.RangeTracker):
   """A RangeTracker that always ignores split requests.
 
-  This can be used to make a given ``RangeTracker`` object unsplittable by
-  ignoring all calls to ``try_split()``. All other calls will be delegated to
-  the given ``RangeTracker``.
+  This can be used to make a given
+  :class:`~apache_beam.io.iobase.RangeTracker` object unsplittable by
+  ignoring all calls to :meth:`.try_split()`. All other calls will be delegated
+  to the given :class:`~apache_beam.io.iobase.RangeTracker`.
   """
 
   def __init__(self, range_tracker):
     """Initializes UnsplittableRangeTracker.
 
     Args:
-      range_tracker: a ``RangeTracker`` to which all method calls expect calls
-      to ``try_split()`` will be delegated.
+      range_tracker (~apache_beam.io.iobase.RangeTracker): a
+        :class:`~apache_beam.io.iobase.RangeTracker` to which all method
+        calls expect calls to :meth:`.try_split()` will be delegated.
     """
     assert isinstance(range_tracker, iobase.RangeTracker)
     self._range_tracker = range_tracker

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/source_test_utils.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/source_test_utils.py b/sdks/python/apache_beam/io/source_test_utils.py
index a144a8a..bea9708 100644
--- a/sdks/python/apache_beam/io/source_test_utils.py
+++ b/sdks/python/apache_beam/io/source_test_utils.py
@@ -80,12 +80,13 @@ def read_from_source(source, start_position=None, stop_position=None):
 
   Only reads elements within the given position range.
   Args:
-    source: ``iobase.BoundedSource`` implementation.
-    start_position: start position for reading.
-    stop_position: stop position for reading.
+    source (~apache_beam.io.iobase.BoundedSource):
+      :class:`~apache_beam.io.iobase.BoundedSource` implementation.
+    start_position (int): start position for reading.
+    stop_position (int): stop position for reading.
 
   Returns:
-    the set of values read from the sources.
+    List[str]: the set of values read from the sources.
   """
   values = []
   range_tracker = source.get_range_tracker(start_position, stop_position)
@@ -108,21 +109,25 @@ def _ThreadPool(threads):
 def assert_sources_equal_reference_source(reference_source_info, sources_info):
   """Tests if a reference source is equal to a given set of sources.
 
-  Given a reference source (a ``BoundedSource`` and a position range) and a
-  list of sources, assert that the union of the records
-  read from the list of sources is equal to the records read from the
+  Given a reference source (a :class:`~apache_beam.io.iobase.BoundedSource`
+  and a position range) and a list of sources, assert that the union of the
+  records read from the list of sources is equal to the records read from the
   reference source.
 
   Args:
-    reference_source_info: a three-tuple that gives the reference
-                           ``iobase.BoundedSource``, position to start reading
-                           at, and position to stop reading at.
-    sources_info: a set of sources. Each source is a three-tuple that is of
-                  the same format described above.
+    reference_source_info\
+        (Tuple[~apache_beam.io.iobase.BoundedSource, int, int]):
+      a three-tuple that gives the reference
+      :class:`~apache_beam.io.iobase.BoundedSource`, position to start
+      reading at, and position to stop reading at.
+    sources_info\
+        (Iterable[Tuple[~apache_beam.io.iobase.BoundedSource, int, int]]):
+      a set of sources. Each source is a three-tuple that is of the same
+      format described above.
 
   Raises:
-    ValueError: if the set of data produced by the reference source and the
-                given set of sources are not equivalent.
+    ~exceptions.ValueError: if the set of data produced by the reference source
+      and the given set of sources are not equivalent.
 
   """
 
@@ -172,18 +177,20 @@ def assert_sources_equal_reference_source(reference_source_info, sources_info):
 def assert_reentrant_reads_succeed(source_info):
   """Tests if a given source can be read in a reentrant manner.
 
-  Assume that given source produces the set of values {v1, v2, v3, ... vn}. For
-  i in range [1, n-1] this method performs a reentrant read after reading i
-  elements and verifies that both the original and reentrant read produce the
-  expected set of values.
+  Assume that given source produces the set of values ``{v1, v2, v3, ... vn}``.
+  For ``i`` in range ``[1, n-1]`` this method performs a reentrant read after
+  reading ``i`` elements and verifies that both the original and reentrant read
+  produce the expected set of values.
 
   Args:
-    source_info: a three-tuple that gives the reference
-                 ``iobase.BoundedSource``, position to start reading at, and a
-                 position to stop reading at.
+    source_info (Tuple[~apache_beam.io.iobase.BoundedSource, int, int]):
+      a three-tuple that gives the reference
+      :class:`~apache_beam.io.iobase.BoundedSource`, position to start reading
+      at, and a position to stop reading at.
+
   Raises:
-    ValueError: if source is too trivial or reentrant read result in an
-                incorrect read.
+    ~exceptions.ValueError: if source is too trivial or reentrant read result
+      in an incorrect read.
   """
 
   source, start_position, stop_position = source_info
@@ -228,21 +235,25 @@ def assert_split_at_fraction_behavior(source, num_items_to_read_before_split,
                                       split_fraction, expected_outcome):
   """Verifies the behaviour of splitting a source at a given fraction.
 
-  Asserts that splitting a ``BoundedSource`` either fails after reading
-  ``num_items_to_read_before_split`` items, or succeeds in a way that is
-  consistent according to ``assertSplitAtFractionSucceedsAndConsistent()``.
+  Asserts that splitting a :class:`~apache_beam.io.iobase.BoundedSource` either
+  fails after reading **num_items_to_read_before_split** items, or succeeds in
+  a way that is consistent according to
+  :func:`assert_split_at_fraction_succeeds_and_consistent()`.
 
   Args:
-    source: the source to perform dynamic splitting on.
-    num_items_to_read_before_split: number of items to read before splitting.
-    split_fraction: fraction to split at.
-    expected_outcome: a value from 'ExpectedSplitOutcome'.
+    source (~apache_beam.io.iobase.BoundedSource): the source to perform
+      dynamic splitting on.
+    num_items_to_read_before_split (int): number of items to read before
+      splitting.
+    split_fraction (float): fraction to split at.
+    expected_outcome (int): a value from
+      :class:`~apache_beam.io.source_test_utils.ExpectedSplitOutcome`.
 
   Returns:
-    a tuple that gives the number of items produced by reading the two ranges
-    produced after dynamic splitting. If splitting did not occur, the first
-    value of the tuple will represent the full set of records read by the
-    source while the second value of the tuple will be '-1'.
+    Tuple[int, int]: a tuple that gives the number of items produced by reading
+    the two ranges produced after dynamic splitting. If splitting did not
+    occur, the first value of the tuple will represent the full set of records
+    read by the source while the second value of the tuple will be ``-1``.
   """
   assert isinstance(source, iobase.BoundedSource)
   expected_items = read_from_source(source, None, None)
@@ -503,12 +514,13 @@ def assert_split_at_fraction_exhaustive(
   Verifies multi threaded splitting as well.
 
   Args:
-    source: the source to perform dynamic splitting on.
-    perform_multi_threaded_test: if true performs a multi-threaded test
-                                 otherwise this test is skipped.
+    source (~apache_beam.io.iobase.BoundedSource): the source to perform
+      dynamic splitting on.
+    perform_multi_threaded_test (bool): if :data:`True` performs a
+      multi-threaded test, otherwise this test is skipped.
 
   Raises:
-    ValueError: if the exhaustive splitting test fails.
+    ~exceptions.ValueError: if the exhaustive splitting test fails.
   """
 
   expected_items = read_from_source(source, start_position, stop_position)

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/io/textio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py
index 9c6532e..9708df7 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -417,13 +417,15 @@ class ReadAllFromText(PTransform):
 
 
 class ReadFromText(PTransform):
-  """A ``PTransform`` for reading text files.
+  r"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading text
+  files.
 
   Parses a text file as newline-delimited elements, by default assuming
-  UTF-8 encoding. Supports newline delimiters '\\n' and '\\r\\n'.
+  ``UTF-8`` encoding. Supports newline delimiters ``\n`` and ``\r\n``.
 
-  This implementation only supports reading text encoded using UTF-8 or ASCII.
-  This does not support other encodings such as UTF-16 or UTF-32.
+  This implementation only supports reading text encoded using ``UTF-8`` or
+  ``ASCII``.
+  This does not support other encodings such as ``UTF-16`` or ``UTF-32``.
   """
   def __init__(
       self,
@@ -435,26 +437,28 @@ class ReadFromText(PTransform):
       validate=True,
       skip_header_lines=0,
       **kwargs):
-    """Initialize the ``ReadFromText`` transform.
+    """Initialize the :class:`ReadFromText` transform.
 
     Args:
-      file_pattern: The file path to read from as a local file path or a GCS
-        ``gs://`` path. The path can contain glob characters
-        ``(*, ?, and [...] sets)``.
-      min_bundle_size: Minimum size of bundles that should be generated when
-        splitting this source into bundles. See ``FileBasedSource`` for more
+      file_pattern (str): The file path to read from as a local file path or a
+        GCS ``gs://`` path. The path can contain glob characters
+        (``*``, ``?``, and ``[...]`` sets).
+      min_bundle_size (int): Minimum size of bundles that should be generated
+        when splitting this source into bundles. See
+        :class:`~apache_beam.io.filebasedsource.FileBasedSource` for more
         details.
-      compression_type: Used to handle compressed input files. Typical value
-        is ``CompressionTypes.AUTO``, in which case the underlying file_path's
-        extension will be used to detect the compression.
-      strip_trailing_newlines: Indicates whether this source should remove
-        the newline char in each line it reads before decoding that line.
-      validate: flag to verify that the files exist during the pipeline
+      compression_type (str): Used to handle compressed input files.
+        Typical value is :attr:`CompressionTypes.AUTO
+        <apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the
+        underlying file_path's extension will be used to detect the compression.
+      strip_trailing_newlines (bool): Indicates whether this source should
+        remove the newline char in each line it reads before decoding that line.
+      validate (bool): flag to verify that the files exist during the pipeline
         creation time.
-      skip_header_lines: Number of header lines to skip. Same number is skipped
-        from each source file. Must be 0 or higher. Large number of skipped
-        lines might impact performance.
-      coder: Coder used to decode each line.
+      skip_header_lines (int): Number of header lines to skip. Same number is
+        skipped from each source file. Must be 0 or higher. Large number of
+        skipped lines might impact performance.
+      coder (~apache_beam.coders.coders.Coder): Coder used to decode each line.
     """
 
     super(ReadFromText, self).__init__(**kwargs)
@@ -468,49 +472,54 @@ class ReadFromText(PTransform):
 
 
 class WriteToText(PTransform):
-  """A PTransform for writing to text files."""
+  """A :class:`~apache_beam.transforms.ptransform.PTransform` for writing to
+  text files."""
 
-  def __init__(self,
-               file_path_prefix,
-               file_name_suffix='',
-               append_trailing_newlines=True,
-               num_shards=0,
-               shard_name_template=None,
-               coder=coders.ToStringCoder(),
-               compression_type=CompressionTypes.AUTO,
-               header=None):
-    """Initialize a WriteToText PTransform.
+  def __init__(
+      self,
+      file_path_prefix,
+      file_name_suffix='',
+      append_trailing_newlines=True,
+      num_shards=0,
+      shard_name_template=None,
+      coder=coders.ToStringCoder(),
+      compression_type=CompressionTypes.AUTO,
+      header=None):
+    r"""Initialize a :class:`WriteToText` transform.
 
     Args:
-      file_path_prefix: The file path to write to. The files written will begin
-        with this prefix, followed by a shard identifier (see num_shards), and
-        end in a common extension, if given by file_name_suffix. In most cases,
-        only this argument is specified and num_shards, shard_name_template, and
-        file_name_suffix use default values.
-      file_name_suffix: Suffix for the files written.
-      append_trailing_newlines: indicate whether this sink should write an
-        additional newline char after writing each element.
-      num_shards: The number of files (shards) used for output. If not set, the
-        service will decide on the optimal number of shards.
+      file_path_prefix (str): The file path to write to. The files written will
+        begin with this prefix, followed by a shard identifier (see
+        **num_shards**), and end in a common extension, if given by
+        **file_name_suffix**. In most cases, only this argument is specified and
+        **num_shards**, **shard_name_template**, and **file_name_suffix** use
+        default values.
+      file_name_suffix (str): Suffix for the files written.
+      append_trailing_newlines (bool): indicate whether this sink should write
+        an additional newline char after writing each element.
+      num_shards (int): The number of files (shards) used for output.
+        If not set, the service will decide on the optimal number of shards.
         Constraining the number of shards is likely to reduce
         the performance of a pipeline.  Setting this value is not recommended
         unless you require a specific number of output files.
-      shard_name_template: A template string containing placeholders for
-        the shard number and shard count. Currently only '' and
-        '-SSSSS-of-NNNNN' are patterns accepted by the service.
+      shard_name_template (str): A template string containing placeholders for
+        the shard number and shard count. Currently only ``''`` and
+        ``'-SSSSS-of-NNNNN'`` are patterns accepted by the service.
         When constructing a filename for a particular shard number, the
-        upper-case letters 'S' and 'N' are replaced with the 0-padded shard
-        number and shard count respectively.  This argument can be '' in which
-        case it behaves as if num_shards was set to 1 and only one file will be
-        generated. The default pattern used is '-SSSSS-of-NNNNN'.
-      coder: Coder used to encode each line.
-      compression_type: Used to handle compressed output files. Typical value
-          is CompressionTypes.AUTO, in which case the final file path's
-          extension (as determined by file_path_prefix, file_name_suffix,
-          num_shards and shard_name_template) will be used to detect the
-          compression.
-      header: String to write at beginning of file as a header. If not None and
-          append_trailing_newlines is set, '\n' will be added.
+        upper-case letters ``S`` and ``N`` are replaced with the ``0``-padded
+        shard number and shard count respectively.  This argument can be ``''``
+        in which case it behaves as if num_shards was set to 1 and only one file
+        will be generated. The default pattern used is ``'-SSSSS-of-NNNNN'``.
+      coder (~apache_beam.coders.coders.Coder): Coder used to encode each line.
+      compression_type (str): Used to handle compressed output files.
+        Typical value is :class:`CompressionTypes.AUTO
+        <apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the
+        final file path's extension (as determined by **file_path_prefix**,
+        **file_name_suffix**, **num_shards** and **shard_name_template**) will
+        be used to detect the compression.
+      header (str): String to write at beginning of file as a header.
+        If not :data:`None` and **append_trailing_newlines** is set, ``\n`` will
+        be added.
     """
 
     self._sink = _TextSink(file_path_prefix, file_name_suffix,

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index e7c2322..1ade6c0 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -15,17 +15,18 @@
 # limitations under the License.
 #
 
-"""Pipeline, the top-level Dataflow object.
+"""Pipeline, the top-level Beam object.
 
 A pipeline holds a DAG of data transforms. Conceptually the nodes of the DAG
-are transforms (PTransform objects) and the edges are values (mostly PCollection
+are transforms (:class:`~apache_beam.transforms.ptransform.PTransform` objects)
+and the edges are values (mostly :class:`~apache_beam.pvalue.PCollection`
 objects). The transforms take as inputs one or more PValues and output one or
-more PValues.
+more :class:`~apache_beam.pvalue.PValue` s.
 
 The pipeline offers functionality to traverse the graph.  The actual operation
 to be executed for each node visited is specified through a runner object.
 
-Typical usage:
+Typical usage::
 
   # Create a pipeline object using a local runner for execution.
   with beam.Pipeline('DirectRunner') as p:
@@ -73,32 +74,40 @@ __all__ = ['Pipeline']
 
 
 class Pipeline(object):
-  """A pipeline object that manages a DAG of PValues and their PTransforms.
+  """A pipeline object that manages a DAG of
+  :class:`~apache_beam.pvalue.PValue` s and their
+  :class:`~apache_beam.transforms.ptransform.PTransform` s.
 
-  Conceptually the PValues are the DAG's nodes and the PTransforms computing
-  the PValues are the edges.
+  Conceptually the :class:`~apache_beam.pvalue.PValue` s are the DAG's nodes and
+  the :class:`~apache_beam.transforms.ptransform.PTransform` s computing
+  the :class:`~apache_beam.pvalue.PValue` s are the edges.
 
   All the transforms applied to the pipeline must have distinct full labels.
   If same transform instance needs to be applied then the right shift operator
-  should be used to designate new names (e.g. `input | "label" >> my_tranform`).
+  should be used to designate new names
+  (e.g. ``input | "label" >> my_tranform``).
   """
 
   def __init__(self, runner=None, options=None, argv=None):
     """Initialize a pipeline object.
 
     Args:
-      runner: An object of type 'PipelineRunner' that will be used to execute
-        the pipeline. For registered runners, the runner name can be specified,
-        otherwise a runner object must be supplied.
-      options: A configured 'PipelineOptions' object containing arguments
-        that should be used for running the Dataflow job.
-      argv: a list of arguments (such as sys.argv) to be used for building a
-        'PipelineOptions' object. This will only be used if argument 'options'
-        is None.
+      runner (~apache_beam.runners.runner.PipelineRunner): An object of
+        type :class:`~apache_beam.runners.runner.PipelineRunner` that will be
+        used to execute the pipeline. For registered runners, the runner name
+        can be specified, otherwise a runner object must be supplied.
+      options (~apache_beam.options.pipeline_options.PipelineOptions):
+        A configured
+        :class:`~apache_beam.options.pipeline_options.PipelineOptions` object
+        containing arguments that should be used for running the Beam job.
+      argv (List[str]): a list of arguments (such as :data:`sys.argv`)
+        to be used for building a
+        :class:`~apache_beam.options.pipeline_options.PipelineOptions` object.
+        This will only be used if argument **options** is :data:`None`.
 
     Raises:
-      ValueError: if either the runner or options argument is not of the
-      expected type.
+      ~exceptions.ValueError: if either the runner or options argument is not
+        of the expected type.
     """
     if options is not None:
       if isinstance(options, PipelineOptions):
@@ -292,13 +301,15 @@ class Pipeline(object):
   def replace_all(self, replacements):
     """ Dynamically replaces PTransforms in the currently populated hierarchy.
 
-     Currently this only works for replacements where input and output types
-     are exactly the same.
-     TODO: Update this to also work for transform overrides where input and
-     output types are different.
+    Currently this only works for replacements where input and output types
+    are exactly the same.
+
+    TODO: Update this to also work for transform overrides where input and
+    output types are different.
 
     Args:
-      replacements a list of PTransformOverride objects.
+      replacements (List[~apache_beam.pipeline.PTransformOverride]): a list of
+        :class:`~apache_beam.pipeline.PTransformOverride` objects.
     """
     for override in replacements:
       assert isinstance(override, PTransformOverride)
@@ -341,13 +352,16 @@ class Pipeline(object):
     Runner-internal implementation detail; no backwards-compatibility guarantees
 
     Args:
-      visitor: PipelineVisitor object whose callbacks will be called for each
-        node visited. See PipelineVisitor comments.
+      visitor (~apache_beam.pipeline.PipelineVisitor):
+        :class:`~apache_beam.pipeline.PipelineVisitor` object whose callbacks
+        will be called for each node visited. See
+        :class:`~apache_beam.pipeline.PipelineVisitor` comments.
 
     Raises:
-      TypeError: if node is specified and is not a PValue.
-      pipeline.PipelineError: if node is specified and does not belong to this
-        pipeline instance.
+      ~exceptions.TypeError: if node is specified and is not a
+        :class:`~apache_beam.pvalue.PValue`.
+      ~apache_beam.error.PipelineError: if node is specified and does not
+        belong to this pipeline instance.
     """
 
     visited = set()
@@ -357,15 +371,20 @@ class Pipeline(object):
     """Applies a custom transform using the pvalueish specified.
 
     Args:
-      transform: the PTranform to apply.
-      pvalueish: the input for the PTransform (typically a PCollection).
-      label: label of the PTransform.
+      transform (~apache_beam.transforms.ptransform.PTransform): the
+        :class:`~apache_beam.transforms.ptransform.PTransform` to apply.
+      pvalueish (~apache_beam.pvalue.PCollection): the input for the
+        :class:`~apache_beam.transforms.ptransform.PTransform` (typically a
+        :class:`~apache_beam.pvalue.PCollection`).
+      label (str): label of the
+        :class:`~apache_beam.transforms.ptransform.PTransform`.
 
     Raises:
-      TypeError: if the transform object extracted from the argument list is
-        not a PTransform.
-      RuntimeError: if the transform object was already applied to this pipeline
-        and needs to be cloned in order to apply again.
+      ~exceptions.TypeError: if the transform object extracted from the
+        argument list is not a
+        :class:`~apache_beam.transforms.ptransform.PTransform`.
+      ~exceptions.RuntimeError: if the transform object was already applied to
+        this pipeline and needs to be cloned in order to apply again.
     """
     if isinstance(transform, ptransform._NamedPTransform):
       return self.apply(transform.transform, pvalueish,

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/runners/runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 7ce9a03..a3c6b34 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -283,10 +283,10 @@ class PValueCache(object):
 
 
 class PipelineState(object):
-  """State of the Pipeline, as returned by PipelineResult.state.
+  """State of the Pipeline, as returned by :attr:`PipelineResult.state`.
 
   This is meant to be the union of all the states any runner can put a
-  pipeline in.  Currently, it represents the values of the dataflow
+  pipeline in. Currently, it represents the values of the dataflow
   API JobState enum.
   """
   UNKNOWN = 'UNKNOWN'  # not specified
@@ -301,7 +301,7 @@ class PipelineState(object):
 
 
 class PipelineResult(object):
-  """A PipelineResult provides access to info about a pipeline."""
+  """A :class:`PipelineResult` provides access to info about a pipeline."""
 
   def __init__(self, state):
     self._state = state
@@ -315,15 +315,18 @@ class PipelineResult(object):
     """Waits until the pipeline finishes and returns the final status.
 
     Args:
-      duration: The time to wait (in milliseconds) for job to finish. If it is
-        set to None, it will wait indefinitely until the job is finished.
+      duration (int): The time to wait (in milliseconds) for job to finish.
+        If it is set to :data:`None`, it will wait indefinitely until the job
+        is finished.
 
     Raises:
-      IOError: If there is a persistent problem getting job information.
-      NotImplementedError: If the runner does not support this operation.
+      ~exceptions.IOError: If there is a persistent problem getting job
+        information.
+      ~exceptions.NotImplementedError: If the runner does not support this
+        operation.
 
     Returns:
-      The final state of the pipeline, or None on timeout.
+      The final state of the pipeline, or :data:`None` on timeout.
     """
     raise NotImplementedError
 
@@ -331,8 +334,10 @@ class PipelineResult(object):
     """Cancels the pipeline execution.
 
     Raises:
-      IOError: If there is a persistent problem getting job information.
-      NotImplementedError: If the runner does not support this operation.
+      ~exceptions.IOError: If there is a persistent problem getting job
+        information.
+      ~exceptions.NotImplementedError: If the runner does not support this
+        operation.
 
     Returns:
       The final state of the pipeline.
@@ -340,10 +345,12 @@ class PipelineResult(object):
     raise NotImplementedError
 
   def metrics(self):
-    """Returns MetricsResult object to query metrics from the runner.
+    """Returns :class:`~apache_beam.metrics.metric.MetricResults` object to
+    query metrics from the runner.
 
     Raises:
-      NotImplementedError: If the runner does not support this operation.
+      ~exceptions.NotImplementedError: If the runner does not support this
+        operation.
     """
     raise NotImplementedError
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/testing/test_pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/testing/test_pipeline.py b/sdks/python/apache_beam/testing/test_pipeline.py
index 13b1639..8380242 100644
--- a/sdks/python/apache_beam/testing/test_pipeline.py
+++ b/sdks/python/apache_beam/testing/test_pipeline.py
@@ -33,23 +33,23 @@ __all__ = [
 
 
 class TestPipeline(Pipeline):
-  """TestPipeline class is used inside of Beam tests that can be configured to
-  run against pipeline runner.
+  """:class:`TestPipeline` class is used inside of Beam tests that can be
+  configured to run against pipeline runner.
 
   It has a functionality to parse arguments from command line and build pipeline
   options for tests who runs against a pipeline runner and utilizes resources
   of the pipeline runner. Those test functions are recommended to be tagged by
-  @attr("ValidatesRunner") annotation.
+  ``@attr("ValidatesRunner")`` annotation.
 
   In order to configure the test with customized pipeline options from command
-  line, system argument 'test-pipeline-options' can be used to obtains a list
-  of pipeline options. If no options specified, default value will be used.
+  line, system argument ``--test-pipeline-options`` can be used to obtains a
+  list of pipeline options. If no options specified, default value will be used.
 
   For example, use following command line to execute all ValidatesRunner tests::
 
-    python setup.py nosetests -a ValidatesRunner \
-        --test-pipeline-options="--runner=DirectRunner \
-                                 --job_name=myJobName \
+    python setup.py nosetests -a ValidatesRunner \\
+        --test-pipeline-options="--runner=DirectRunner \\
+                                 --job_name=myJobName \\
                                  --num_workers=1"
 
   For example, use assert_that for test validation::
@@ -69,21 +69,27 @@ class TestPipeline(Pipeline):
     """Initialize a pipeline object for test.
 
     Args:
-      runner: An object of type 'PipelineRunner' that will be used to execute
-        the pipeline. For registered runners, the runner name can be specified,
-        otherwise a runner object must be supplied.
-      options: A configured 'PipelineOptions' object containing arguments
-        that should be used for running the pipeline job.
-      argv: A list of arguments (such as sys.argv) to be used for building a
-        'PipelineOptions' object. This will only be used if argument 'options'
-        is None.
-      is_integration_test: True if the test is an integration test, False
-        otherwise.
-      blocking: Run method will wait until pipeline execution is completed.
+      runner (~apache_beam.runners.runner.PipelineRunner): An object of type
+        :class:`~apache_beam.runners.runner.PipelineRunner` that will be used
+        to execute the pipeline. For registered runners, the runner name can be
+        specified, otherwise a runner object must be supplied.
+      options (~apache_beam.options.pipeline_options.PipelineOptions):
+        A configured
+        :class:`~apache_beam.options.pipeline_options.PipelineOptions`
+        object containing arguments that should be used for running the
+        pipeline job.
+      argv (List[str]): A list of arguments (such as :data:`sys.argv`) to be
+        used for building a
+        :class:`~apache_beam.options.pipeline_options.PipelineOptions` object.
+        This will only be used if argument **options** is :data:`None`.
+      is_integration_test (bool): :data:`True` if the test is an integration
+        test, :data:`False` otherwise.
+      blocking (bool): Run method will wait until pipeline execution is
+        completed.
 
     Raises:
-      ValueError: if either the runner or options argument is not of the
-      expected type.
+      ~exceptions.ValueError: if either the runner or options argument is not
+        of the expected type.
     """
     self.is_integration_test = is_integration_test
     self.options_list = self._parse_test_option_args(argv)

http://git-wip-us.apache.org/repos/asf/beam/blob/e1baf55d/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 9018a49..d6f56d2 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -601,31 +601,35 @@ class CallableWrapperPartitionFn(PartitionFn):
 
 
 class ParDo(PTransformWithSideInputs):
-  """A ParDo transform.
+  """A :class:`ParDo` transform.
 
-  Processes an input PCollection by applying a DoFn to each element and
-  returning the accumulated results into an output PCollection. The type of the
-  elements is not fixed as long as the DoFn can deal with it. In reality
-  the type is restrained to some extent because the elements sometimes must be
-  persisted to external storage. See the expand() method comments for a detailed
-  description of all possible arguments.
+  Processes an input :class:`~apache_beam.pvalue.PCollection` by applying a
+  :class:`DoFn` to each element and returning the accumulated results into an
+  output :class:`~apache_beam.pvalue.PCollection`. The type of the elements is
+  not fixed as long as the :class:`DoFn` can deal with it. In reality the type
+  is restrained to some extent because the elements sometimes must be persisted
+  to external storage. See the :meth:`.expand()` method comments for a
+  detailed description of all possible arguments.
 
-  Note that the DoFn must return an iterable for each element of the input
-  PCollection.  An easy way to do this is to use the yield keyword in the
-  process method.
+  Note that the :class:`DoFn` must return an iterable for each element of the
+  input :class:`~apache_beam.pvalue.PCollection`. An easy way to do this is to
+  use the ``yield`` keyword in the process method.
 
   Args:
-      pcoll: a PCollection to be processed.
-      fn: a DoFn object to be applied to each element of pcoll argument.
-      *args: positional arguments passed to the dofn object.
-      **kwargs:  keyword arguments passed to the dofn object.
+    pcoll (~apache_beam.pvalue.PCollection):
+      a :class:`~apache_beam.pvalue.PCollection` to be processed.
+    fn (DoFn): a :class:`DoFn` object to be applied to each element
+      of **pcoll** argument.
+    *args: positional arguments passed to the :class:`DoFn` object.
+    **kwargs:  keyword arguments passed to the :class:`DoFn` object.
 
   Note that the positional and keyword arguments will be processed in order
-  to detect PCollections that will be computed as side inputs to the
-  transform. During pipeline execution whenever the DoFn object gets executed
-  (its apply() method gets called) the PCollection arguments will be replaced
-  by values from the PCollection in the exact positions where they appear in
-  the argument lists.
+  to detect :class:`~apache_beam.pvalue.PCollection` s that will be computed as
+  side inputs to the transform. During pipeline execution whenever the
+  :class:`DoFn` object gets executed (its :meth:`DoFn.process()` method gets
+  called) the :class:`~apache_beam.pvalue.PCollection` arguments will be
+  replaced by values from the :class:`~apache_beam.pvalue.PCollection` in the
+  exact positions where they appear in the argument lists.
   """
 
   def __init__(self, fn, *args, **kwargs):
@@ -665,27 +669,34 @@ class ParDo(PTransformWithSideInputs):
     return pvalue.PCollection(pcoll.pipeline)
 
   def with_outputs(self, *tags, **main_kw):
-    """Returns a tagged tuple allowing access to the outputs of a ParDo.
+    """Returns a tagged tuple allowing access to the outputs of a
+    :class:`ParDo`.
 
     The resulting object supports access to the
-    PCollection associated with a tag (e.g., o.tag, o[tag]) and iterating over
-    the available tags (e.g., for tag in o: ...).
+    :class:`~apache_beam.pvalue.PCollection` associated with a tag
+    (e.g. ``o.tag``, ``o[tag]``) and iterating over the available tags
+    (e.g. ``for tag in o: ...``).
 
     Args:
       *tags: if non-empty, list of valid tags. If a list of valid tags is given,
         it will be an error to use an undeclared tag later in the pipeline.
-      **main_kw: dictionary empty or with one key 'main' defining the tag to be
-        used for the main output (which will not have a tag associated with it).
+      **main_kw: dictionary empty or with one key ``'main'`` defining the tag to
+        be used for the main output (which will not have a tag associated with
+        it).
 
     Returns:
-      An object of type DoOutputsTuple that bundles together all the outputs
-      of a ParDo transform and allows accessing the individual
-      PCollections for each output using an object.tag syntax.
+      ~apache_beam.pvalue.DoOutputsTuple: An object of type
+      :class:`~apache_beam.pvalue.DoOutputsTuple` that bundles together all
+      the outputs of a :class:`ParDo` transform and allows accessing the
+      individual :class:`~apache_beam.pvalue.PCollection` s for each output
+      using an ``object.tag`` syntax.
 
     Raises:
-      TypeError: if the self object is not a PCollection that is the result of
-        a ParDo transform.
-      ValueError: if main_kw contains any key other than 'main'.
+      ~exceptions.TypeError: if the **self** object is not a
+        :class:`~apache_beam.pvalue.PCollection` that is the result of a
+        :class:`ParDo` transform.
+      ~exceptions.ValueError: if **main_kw** contains any key other than
+        ``'main'``.
     """
     main_tag = main_kw.pop('main', None)
     if main_kw:
@@ -739,24 +750,27 @@ class _MultiParDo(PTransform):
 
 
 def FlatMap(fn, *args, **kwargs):  # pylint: disable=invalid-name
-  """FlatMap is like ParDo except it takes a callable to specify the
-  transformation.
+  """:func:`FlatMap` is like :class:`ParDo` except it takes a callable to
+  specify the transformation.
 
   The callable must return an iterable for each element of the input
-  PCollection.  The elements of these iterables will be flattened into
-  the output PCollection.
+  :class:`~apache_beam.pvalue.PCollection`. The elements of these iterables will
+  be flattened into the output :class:`~apache_beam.pvalue.PCollection`.
 
   Args:
-    fn: a callable object.
+    fn (callable): a callable object.
     *args: positional arguments passed to the transform callable.
     **kwargs: keyword arguments passed to the transform callable.
 
   Returns:
-    A PCollection containing the Map outputs.
+    ~apache_beam.pvalue.PCollection:
+    A :class:`~apache_beam.pvalue.PCollection` containing the
+    :func:`FlatMap` outputs.
 
   Raises:
-    TypeError: If the fn passed as argument is not a callable. Typical error
-      is to pass a DoFn instance which is supported only for ParDo.
+    ~exceptions.TypeError: If the **fn** passed as argument is not a callable.
+      Typical error is to pass a :class:`DoFn` instance which is supported only
+      for :class:`ParDo`.
   """
   label = 'FlatMap(%s)' % ptransform.label_from_callable(fn)
   if not callable(fn):
@@ -770,19 +784,23 @@ def FlatMap(fn, *args, **kwargs):  # pylint: disable=invalid-name
 
 
 def Map(fn, *args, **kwargs):  # pylint: disable=invalid-name
-  """Map is like FlatMap except its callable returns only a single element.
+  """:func:`Map` is like :func:`FlatMap` except its callable returns only a
+  single element.
 
   Args:
-    fn: a callable object.
+    fn (callable): a callable object.
     *args: positional arguments passed to the transform callable.
     **kwargs: keyword arguments passed to the transform callable.
 
   Returns:
-    A PCollection containing the Map outputs.
+    ~apache_beam.pvalue.PCollection:
+    A :class:`~apache_beam.pvalue.PCollection` containing the
+    :func:`Map` outputs.
 
   Raises:
-    TypeError: If the fn passed as argument is not a callable. Typical error
-      is to pass a DoFn instance which is supported only for ParDo.
+    ~exceptions.TypeError: If the **fn** passed as argument is not a callable.
+      Typical error is to pass a :class:`DoFn` instance which is supported only
+      for :class:`ParDo`.
   """
   if not callable(fn):
     raise TypeError(
@@ -815,19 +833,23 @@ def Map(fn, *args, **kwargs):  # pylint: disable=invalid-name
 
 
 def Filter(fn, *args, **kwargs):  # pylint: disable=invalid-name
-  """Filter is a FlatMap with its callable filtering out elements.
+  """:func:`Filter` is a :func:`FlatMap` with its callable filtering out
+  elements.
 
   Args:
-    fn: a callable object.
+    fn (callable): a callable object.
     *args: positional arguments passed to the transform callable.
     **kwargs: keyword arguments passed to the transform callable.
 
   Returns:
-    A PCollection containing the Filter outputs.
+    ~apache_beam.pvalue.PCollection:
+    A :class:`~apache_beam.pvalue.PCollection` containing the
+    :func:`Filter` outputs.
 
   Raises:
-    TypeError: If the fn passed as argument is not a callable. Typical error
-      is to pass a DoFn instance which is supported only for FlatMap.
+    ~exceptions.TypeError: If the **fn** passed as argument is not a callable.
+      Typical error is to pass a :class:`DoFn` instance which is supported only
+      for :class:`ParDo`.
   """
   if not callable(fn):
     raise TypeError(
@@ -867,35 +889,42 @@ def _combine_payload(combine_fn, context):
 
 
 class CombineGlobally(PTransform):
-  """A CombineGlobally transform.
+  """A :class:`CombineGlobally` transform.
 
-  Reduces a PCollection to a single value by progressively applying a CombineFn
-  to portions of the PCollection (and to intermediate values created thereby).
-  See documentation in CombineFn for details on the specifics on how CombineFns
-  are applied.
+  Reduces a :class:`~apache_beam.pvalue.PCollection` to a single value by
+  progressively applying a :class:`CombineFn` to portions of the
+  :class:`~apache_beam.pvalue.PCollection` (and to intermediate values created
+  thereby). See documentation in :class:`CombineFn` for details on the specifics
+  on how :class:`CombineFn` s are applied.
 
   Args:
-    pcoll: a PCollection to be reduced into a single value.
-    fn: a CombineFn object that will be called to progressively reduce the
-      PCollection into single values, or a callable suitable for wrapping
-      by CallableWrapperCombineFn.
-    *args: positional arguments passed to the CombineFn object.
-    **kwargs: keyword arguments passed to the CombineFn object.
+    pcoll (~apache_beam.pvalue.PCollection):
+      a :class:`~apache_beam.pvalue.PCollection` to be reduced into a single
+      value.
+    fn (callable): a :class:`CombineFn` object that will be called to
+      progressively reduce the :class:`~apache_beam.pvalue.PCollection` into
+      single values, or a callable suitable for wrapping by
+      :class:`~apache_beam.transforms.core.CallableWrapperCombineFn`.
+    *args: positional arguments passed to the :class:`CombineFn` object.
+    **kwargs: keyword arguments passed to the :class:`CombineFn` object.
 
   Raises:
-    TypeError: If the output type of the input PCollection is not compatible
-      with Iterable[A].
+    ~exceptions.TypeError: If the output type of the input
+      :class:`~apache_beam.pvalue.PCollection` is not compatible
+      with ``Iterable[A]``.
 
   Returns:
-    A single-element PCollection containing the main output of the Combine
-    transform.
+    ~apache_beam.pvalue.PCollection: A single-element
+    :class:`~apache_beam.pvalue.PCollection` containing the main output of
+    the :class:`CombineGlobally` transform.
 
   Note that the positional and keyword arguments will be processed in order
-  to detect PObjects that will be computed as side inputs to the transform.
-  During pipeline execution whenever the CombineFn object gets executed (i.e.,
-  any of the CombineFn methods get called), the PObject arguments will be
-  replaced by their actual value in the exact position where they appear in
-  the argument lists.
+  to detect :class:`~apache_beam.pvalue.PValue` s that will be computed as side
+  inputs to the transform.
+  During pipeline execution whenever the :class:`CombineFn` object gets executed
+  (i.e. any of the :class:`CombineFn` methods get called), the
+  :class:`~apache_beam.pvalue.PValue` arguments will be replaced by their
+  actual value in the exact position where they appear in the argument lists.
   """
   has_defaults = True
   as_view = False