You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "robertwb (via GitHub)" <gi...@apache.org> on 2023/01/20 19:31:34 UTC

[GitHub] [beam] robertwb commented on a diff in pull request #24667: Initial start of a yaml-based declarative way of building pipelines.

robertwb commented on code in PR #24667:
URL: https://github.com/apache/beam/pull/24667#discussion_r1082027712


##########
sdks/python/apache_beam/yaml/main.py:
##########
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import collections
+import yaml
+
+import apache_beam as beam
+from apache_beam.yaml import yaml_transform
+
+
+def run(argv=None):
+  # Do imports here to avoid main session issues.
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--pipeline_spec')

Review Comment:
   Done.



##########
sdks/python/apache_beam/yaml/yaml_provider.py:
##########
@@ -0,0 +1,420 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This module defines Providers usable from yaml, which is a specification
+for where to find and how to invoke services that vend implementations of
+various PTransforms."""
+
+import collections
+import hashlib
+import json
+import os
+import subprocess
+import sys
+import uuid
+import yaml
+from yaml.loader import SafeLoader
+
+import apache_beam as beam
+import apache_beam.dataframe.io
+import apache_beam.io
+import apache_beam.transforms.util
+from apache_beam.portability.api import schema_pb2
+from apache_beam.transforms import external
+from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
+from apache_beam.typehints import schemas
+from apache_beam.typehints import trivial_inference
+from apache_beam.utils import python_callable
+from apache_beam.utils import subprocess_server
+from apache_beam.version import __version__ as beam_version
+
+
+class Provider:
+  """Maps transform types to concrete PTransform instances."""
+  def available(self):
+    raise NotImplementedError(type(self))
+
+  def provided_transforms(self):

Review Comment:
   Done.



##########
sdks/python/apache_beam/yaml/main.py:
##########
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import argparse
+import collections
+import yaml
+
+import apache_beam as beam
+from apache_beam.yaml import yaml_transform
+
+
+def run(argv=None):
+  # Do imports here to avoid main session issues.
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--pipeline_spec')
+  parser.add_argument('--pipeline_spec_file')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  if known_args.pipeline_spec_file:
+    with open(known_args.pipeline_spec_file) as fin:
+      known_args.pipeline_spec = fin.read()
+
+  if known_args.pipeline_spec:
+    pipeline_spec = yaml.load(
+        known_args.pipeline_spec, Loader=yaml_transform.SafeLineLoader)
+  else:
+    raise ValueError(

Review Comment:
   `known_args.pipeline_spec` is was populated from the file above in that case. But this didn't catch the case where both were set. Spelling it out more explicitly now. 



##########
sdks/python/apache_beam/yaml/yaml_provider.py:
##########
@@ -0,0 +1,420 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This module defines Providers usable from yaml, which is a specification
+for where to find and how to invoke services that vend implementations of
+various PTransforms."""
+
+import collections
+import hashlib
+import json
+import os
+import subprocess
+import sys
+import uuid
+import yaml
+from yaml.loader import SafeLoader
+
+import apache_beam as beam
+import apache_beam.dataframe.io
+import apache_beam.io
+import apache_beam.transforms.util
+from apache_beam.portability.api import schema_pb2
+from apache_beam.transforms import external
+from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
+from apache_beam.typehints import schemas
+from apache_beam.typehints import trivial_inference
+from apache_beam.utils import python_callable
+from apache_beam.utils import subprocess_server
+from apache_beam.version import __version__ as beam_version
+
+
+class Provider:
+  """Maps transform types to concrete PTransform instances."""

Review Comment:
   Done.



##########
sdks/python/apache_beam/yaml/yaml_provider.py:
##########
@@ -0,0 +1,420 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This module defines Providers usable from yaml, which is a specification
+for where to find and how to invoke services that vend implementations of
+various PTransforms."""
+
+import collections
+import hashlib
+import json
+import os
+import subprocess
+import sys
+import uuid
+import yaml
+from yaml.loader import SafeLoader
+
+import apache_beam as beam
+import apache_beam.dataframe.io
+import apache_beam.io
+import apache_beam.transforms.util
+from apache_beam.portability.api import schema_pb2
+from apache_beam.transforms import external
+from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
+from apache_beam.typehints import schemas
+from apache_beam.typehints import trivial_inference
+from apache_beam.utils import python_callable
+from apache_beam.utils import subprocess_server
+from apache_beam.version import __version__ as beam_version
+
+
+class Provider:
+  """Maps transform types to concrete PTransform instances."""
+  def available(self):
+    raise NotImplementedError(type(self))
+
+  def provided_transforms(self):
+    raise NotImplementedError(type(self))
+
+  def create_transform(self, typ, args):
+    raise NotImplementedError(type(self))
+
+
+class ExternalProvider(Provider):
+  """A Provider implemented via the cross language transform service."""
+  def __init__(self, urns, service):
+    self._urns = urns
+    self._service = service
+    self._schema_transforms = None
+
+  def provided_transforms(self):
+    return self._urns.keys()
+
+  def create_transform(self, type, args):
+    if callable(self._service):
+      self._service = self._service()
+    if self._schema_transforms is None:
+      try:
+        self._schema_transforms = [
+            config.identifier
+            for config in external.SchemaAwareExternalTransform.discover(
+                self._service)
+        ]
+      except Exception:
+        self._schema_transforms = []
+    urn = self._urns[type]
+    if urn in self._schema_transforms:
+      return external.SchemaAwareExternalTransform(urn, self._service, **args)
+    else:
+      return type >> self.create_external_transform(urn, args)
+
+  def create_external_transform(self, urn, args):
+    return external.ExternalTransform(
+        urn,
+        external.ImplicitSchemaPayloadBuilder(args).payload(),
+        self._service)
+
+  @staticmethod
+  def provider_from_spec(spec):
+    urns = spec['transforms']
+    type = spec['type']
+    if spec.get('version', None) == 'BEAM_VERSION':

Review Comment:
   This is a function of the provider type. The proper thing to do would likely be to define the set of providers, along with their arguments, in a more language-independent schema; I added a TODO to this effect in the schema.



##########
sdks/python/apache_beam/yaml/yaml_provider.py:
##########
@@ -0,0 +1,420 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This module defines Providers usable from yaml, which is a specification
+for where to find and how to invoke services that vend implementations of
+various PTransforms."""
+
+import collections
+import hashlib
+import json
+import os
+import subprocess
+import sys
+import uuid
+import yaml
+from yaml.loader import SafeLoader
+
+import apache_beam as beam
+import apache_beam.dataframe.io
+import apache_beam.io
+import apache_beam.transforms.util
+from apache_beam.portability.api import schema_pb2
+from apache_beam.transforms import external
+from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
+from apache_beam.typehints import schemas
+from apache_beam.typehints import trivial_inference
+from apache_beam.utils import python_callable
+from apache_beam.utils import subprocess_server
+from apache_beam.version import __version__ as beam_version
+
+
+class Provider:
+  """Maps transform types to concrete PTransform instances."""
+  def available(self):
+    raise NotImplementedError(type(self))
+
+  def provided_transforms(self):
+    raise NotImplementedError(type(self))
+
+  def create_transform(self, typ, args):
+    raise NotImplementedError(type(self))
+
+
+class ExternalProvider(Provider):
+  """A Provider implemented via the cross language transform service."""
+  def __init__(self, urns, service):
+    self._urns = urns
+    self._service = service
+    self._schema_transforms = None
+
+  def provided_transforms(self):
+    return self._urns.keys()
+
+  def create_transform(self, type, args):
+    if callable(self._service):
+      self._service = self._service()
+    if self._schema_transforms is None:
+      try:
+        self._schema_transforms = [
+            config.identifier
+            for config in external.SchemaAwareExternalTransform.discover(
+                self._service)
+        ]
+      except Exception:
+        self._schema_transforms = []
+    urn = self._urns[type]
+    if urn in self._schema_transforms:
+      return external.SchemaAwareExternalTransform(urn, self._service, **args)
+    else:
+      return type >> self.create_external_transform(urn, args)
+
+  def create_external_transform(self, urn, args):
+    return external.ExternalTransform(
+        urn,
+        external.ImplicitSchemaPayloadBuilder(args).payload(),
+        self._service)
+
+  @staticmethod
+  def provider_from_spec(spec):
+    urns = spec['transforms']
+    type = spec['type']
+    if spec.get('version', None) == 'BEAM_VERSION':
+      spec['version'] = beam_version
+    if type == 'jar':
+      return ExternalJavaProvider(urns, lambda: spec['jar'])
+    elif type == 'mavenJar':
+      return ExternalJavaProvider(
+          urns,
+          lambda: subprocess_server.JavaJarServer.path_to_maven_jar(
+              **{
+                  key: value
+                  for (key, value) in spec.items() if key in [
+                      'artifact_id',
+                      'group_id',
+                      'version',
+                      'repository',
+                      'classifier',
+                      'appendix'
+                  ]
+              }))
+    elif type == 'beamJar':
+      return ExternalJavaProvider(
+          urns,
+          lambda: subprocess_server.JavaJarServer.path_to_beam_jar(
+              **{
+                  key: value
+                  for (key, value) in spec.items() if key in
+                  ['gradle_target', 'version', 'appendix', 'artifact_id']
+              }))
+    elif type == 'pypi':
+      return ExternalPythonProvider(urns, spec['packages'])
+    elif type == 'remote':
+      return RemoteProvider(spec['address'])
+    elif type == 'docker':
+      raise NotImplementedError()
+    else:
+      raise NotImplementedError(f'Unknown provider type: {type}')
+
+
+class RemoteProvider(ExternalProvider):
+  _is_available = None
+
+  def available(self):
+    if self._is_available is None:
+      try:
+        with external.ExternalTransform.service(self._service) as service:
+          service.ready(1)
+          self._is_available = True
+      except Exception:
+        self._is_available = False
+    return self._is_available
+
+
+class ExternalJavaProvider(ExternalProvider):
+  def __init__(self, urns, jar_provider):
+    super().__init__(
+        urns, lambda: external.JavaJarExpansionService(jar_provider()))
+
+  def available(self):
+    # pylint: disable=subprocess-run-check
+    return subprocess.run(['which', 'java'],
+                          capture_output=True).returncode == 0
+
+
+class ExternalPythonProvider(ExternalProvider):
+  def __init__(self, urns, packages):
+    super().__init__(urns, PypiExpansionService(packages))
+
+  def available(self):
+    return True  # If we're running this script, we have Python installed.
+
+  def create_external_transform(self, urn, args):
+    # Python transforms are "registered" by fully qualified name.
+    return external.ExternalTransform(
+        "beam:transforms:python:fully_qualified_named",
+        external.ImplicitSchemaPayloadBuilder({
+            'constructor': urn,
+            'kwargs': args,
+        }).payload(),
+        self._service)
+
+
+# This is needed because type inference can't handle *args, **kwargs fowarding.
+# TODO: Fix Beam itself.
+def fix_pycallable():
+  from apache_beam.transforms.ptransform import label_from_callable
+
+  def default_label(self):
+    src = self._source.strip()
+    last_line = src.split('\n')[-1]
+    if last_line[0] != ' ' and len(last_line) < 72:
+      return last_line
+    return label_from_callable(self._callable)
+
+  def _argspec_fn(self):
+    return self._callable
+
+  python_callable.PythonCallableWithSource.default_label = default_label
+  python_callable.PythonCallableWithSource._argspec_fn = property(_argspec_fn)
+
+  original_infer_return_type = trivial_inference.infer_return_type
+
+  def infer_return_type(fn, *args, **kwargs):
+    if isinstance(fn, python_callable.PythonCallableWithSource):
+      fn = fn._callable
+    return original_infer_return_type(fn, *args, **kwargs)
+
+  trivial_inference.infer_return_type = infer_return_type
+
+  original_fn_takes_side_inputs = (
+      apache_beam.transforms.util.fn_takes_side_inputs)
+
+  def fn_takes_side_inputs(fn):
+    if isinstance(fn, python_callable.PythonCallableWithSource):
+      fn = fn._callable
+    return original_fn_takes_side_inputs(fn)
+
+  apache_beam.transforms.util.fn_takes_side_inputs = fn_takes_side_inputs
+
+
+class InlineProvider(Provider):
+  def __init__(self, transform_factories):
+    self._transform_factories = transform_factories
+
+  def available(self):
+    return True
+
+  def provided_transforms(self):
+    return self._transform_factories.keys()
+
+  def create_transform(self, type, args):
+    return self._transform_factories[type](**args)
+
+
+PRIMITIVE_NAMES_TO_ATOMIC_TYPE = {
+    py_type.__name__: schema_type
+    for (py_type, schema_type) in schemas.PRIMITIVE_TO_ATOMIC_TYPE.items()
+    if py_type.__module__ != 'typing'
+}
+
+
+def create_builtin_provider():
+  def with_schema(**args):
+    # TODO: This is preliminary.
+    def parse_type(spec):
+      if spec in PRIMITIVE_NAMES_TO_ATOMIC_TYPE:
+        return schema_pb2.FieldType(
+            atomic_type=PRIMITIVE_NAMES_TO_ATOMIC_TYPE[spec])
+      elif isinstance(spec, list):
+        if len(spec) != 1:
+          raise ValueError("Use single-element lists to denote list types.")
+        else:
+          return schema_pb2.FieldType(
+              iterable_type=schema_pb2.IterableType(
+                  element_type=parse_type(spec[0])))
+      elif isinstance(spec, dict):
+        return schema_pb2.FieldType(
+            iterable_type=schema_pb2.RowType(schema=parse_schema(spec[0])))
+      else:
+        raise ValueError("Unknown schema type: {spec}")
+
+    def parse_schema(spec):
+      return schema_pb2.Schema(
+          fields=[
+              schema_pb2.Field(name=key, type=parse_type(value), id=ix)
+              for (ix, (key, value)) in enumerate(spec.items())
+          ],
+          id=str(uuid.uuid4()))
+
+    named_tuple = schemas.named_tuple_from_schema(parse_schema(args))
+    names = list(args.keys())
+
+    def extract_field(x, name):
+      if isinstance(x, dict):
+        return x[name]
+      else:
+        return getattr(x, name)
+
+    return 'WithSchema(%s)' % ', '.join(names) >> beam.Map(
+        lambda x: named_tuple(*[extract_field(x, name) for name in names])
+    ).with_output_types(named_tuple)
+
+  # Or should this be posargs, args?
+  # pylint: disable=dangerous-default-value
+  def fully_qualified_named_transform(constructor, args=(), kwargs={}):
+    with FullyQualifiedNamedTransform.with_filter('*'):
+      return constructor >> FullyQualifiedNamedTransform(
+          constructor, args, kwargs)
+
+  class Flatten(beam.PTransform):

Review Comment:
   One could make the built-in `Flatten` accept dictionaries and ignore the keys (which would mean `x | beam.Flatten()` would have different semantics than `tuple(x) | beam.Flatten()` which is a bit odd) I don't think we want to go so far as letting it take a single PCollection or PBegin, both of which are needed here as we're constrained in representing a list of one or zero PCollections differently.



##########
sdks/python/apache_beam/yaml/yaml_provider.py:
##########
@@ -0,0 +1,420 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This module defines Providers usable from yaml, which is a specification
+for where to find and how to invoke services that vend implementations of
+various PTransforms."""
+
+import collections
+import hashlib
+import json
+import os
+import subprocess
+import sys
+import uuid
+import yaml
+from yaml.loader import SafeLoader
+
+import apache_beam as beam
+import apache_beam.dataframe.io
+import apache_beam.io
+import apache_beam.transforms.util
+from apache_beam.portability.api import schema_pb2
+from apache_beam.transforms import external
+from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
+from apache_beam.typehints import schemas
+from apache_beam.typehints import trivial_inference
+from apache_beam.utils import python_callable
+from apache_beam.utils import subprocess_server
+from apache_beam.version import __version__ as beam_version
+
+
+class Provider:
+  """Maps transform types to concrete PTransform instances."""
+  def available(self):
+    raise NotImplementedError(type(self))
+
+  def provided_transforms(self):
+    raise NotImplementedError(type(self))
+
+  def create_transform(self, typ, args):
+    raise NotImplementedError(type(self))
+
+
+class ExternalProvider(Provider):
+  """A Provider implemented via the cross language transform service."""
+  def __init__(self, urns, service):
+    self._urns = urns
+    self._service = service
+    self._schema_transforms = None
+
+  def provided_transforms(self):
+    return self._urns.keys()
+
+  def create_transform(self, type, args):
+    if callable(self._service):
+      self._service = self._service()
+    if self._schema_transforms is None:
+      try:
+        self._schema_transforms = [
+            config.identifier
+            for config in external.SchemaAwareExternalTransform.discover(
+                self._service)
+        ]
+      except Exception:
+        self._schema_transforms = []
+    urn = self._urns[type]
+    if urn in self._schema_transforms:
+      return external.SchemaAwareExternalTransform(urn, self._service, **args)
+    else:
+      return type >> self.create_external_transform(urn, args)
+
+  def create_external_transform(self, urn, args):
+    return external.ExternalTransform(
+        urn,
+        external.ImplicitSchemaPayloadBuilder(args).payload(),
+        self._service)
+
+  @staticmethod
+  def provider_from_spec(spec):
+    urns = spec['transforms']
+    type = spec['type']
+    if spec.get('version', None) == 'BEAM_VERSION':
+      spec['version'] = beam_version
+    if type == 'jar':

Review Comment:
   We do validate in the sense that we throw an informative error for an unexpected type. (I don't know that an enum would help, as these are not referenced anywhere else.) See also the comment above about schemas.



##########
sdks/python/apache_beam/yaml/yaml_provider.py:
##########
@@ -0,0 +1,420 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This module defines Providers usable from yaml, which is a specification
+for where to find and how to invoke services that vend implementations of
+various PTransforms."""
+
+import collections
+import hashlib
+import json
+import os
+import subprocess
+import sys
+import uuid
+import yaml
+from yaml.loader import SafeLoader
+
+import apache_beam as beam
+import apache_beam.dataframe.io
+import apache_beam.io
+import apache_beam.transforms.util
+from apache_beam.portability.api import schema_pb2
+from apache_beam.transforms import external
+from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
+from apache_beam.typehints import schemas
+from apache_beam.typehints import trivial_inference
+from apache_beam.utils import python_callable
+from apache_beam.utils import subprocess_server
+from apache_beam.version import __version__ as beam_version
+
+
+class Provider:
+  """Maps transform types to concrete PTransform instances."""
+  def available(self):
+    raise NotImplementedError(type(self))
+
+  def provided_transforms(self):
+    raise NotImplementedError(type(self))
+
+  def create_transform(self, typ, args):
+    raise NotImplementedError(type(self))
+
+
+class ExternalProvider(Provider):
+  """A Provider implemented via the cross language transform service."""
+  def __init__(self, urns, service):
+    self._urns = urns
+    self._service = service
+    self._schema_transforms = None
+
+  def provided_transforms(self):
+    return self._urns.keys()
+
+  def create_transform(self, type, args):
+    if callable(self._service):
+      self._service = self._service()
+    if self._schema_transforms is None:
+      try:
+        self._schema_transforms = [
+            config.identifier
+            for config in external.SchemaAwareExternalTransform.discover(
+                self._service)
+        ]
+      except Exception:
+        self._schema_transforms = []
+    urn = self._urns[type]
+    if urn in self._schema_transforms:
+      return external.SchemaAwareExternalTransform(urn, self._service, **args)
+    else:
+      return type >> self.create_external_transform(urn, args)
+
+  def create_external_transform(self, urn, args):
+    return external.ExternalTransform(
+        urn,
+        external.ImplicitSchemaPayloadBuilder(args).payload(),
+        self._service)
+
+  @staticmethod
+  def provider_from_spec(spec):
+    urns = spec['transforms']
+    type = spec['type']
+    if spec.get('version', None) == 'BEAM_VERSION':
+      spec['version'] = beam_version
+    if type == 'jar':
+      return ExternalJavaProvider(urns, lambda: spec['jar'])
+    elif type == 'mavenJar':
+      return ExternalJavaProvider(
+          urns,
+          lambda: subprocess_server.JavaJarServer.path_to_maven_jar(
+              **{
+                  key: value
+                  for (key, value) in spec.items() if key in [
+                      'artifact_id',
+                      'group_id',
+                      'version',
+                      'repository',
+                      'classifier',
+                      'appendix'
+                  ]
+              }))
+    elif type == 'beamJar':
+      return ExternalJavaProvider(
+          urns,
+          lambda: subprocess_server.JavaJarServer.path_to_beam_jar(
+              **{
+                  key: value
+                  for (key, value) in spec.items() if key in
+                  ['gradle_target', 'version', 'appendix', 'artifact_id']
+              }))
+    elif type == 'pypi':
+      return ExternalPythonProvider(urns, spec['packages'])
+    elif type == 'remote':
+      return RemoteProvider(spec['address'])
+    elif type == 'docker':
+      raise NotImplementedError()
+    else:
+      raise NotImplementedError(f'Unknown provider type: {type}')
+
+
+class RemoteProvider(ExternalProvider):
+  _is_available = None
+
+  def available(self):
+    if self._is_available is None:
+      try:
+        with external.ExternalTransform.service(self._service) as service:
+          service.ready(1)
+          self._is_available = True
+      except Exception:
+        self._is_available = False
+    return self._is_available
+
+
+class ExternalJavaProvider(ExternalProvider):
+  def __init__(self, urns, jar_provider):
+    super().__init__(
+        urns, lambda: external.JavaJarExpansionService(jar_provider()))
+
+  def available(self):
+    # pylint: disable=subprocess-run-check
+    return subprocess.run(['which', 'java'],
+                          capture_output=True).returncode == 0
+
+
+class ExternalPythonProvider(ExternalProvider):
+  def __init__(self, urns, packages):
+    super().__init__(urns, PypiExpansionService(packages))
+
+  def available(self):
+    return True  # If we're running this script, we have Python installed.
+
+  def create_external_transform(self, urn, args):
+    # Python transforms are "registered" by fully qualified name.
+    return external.ExternalTransform(
+        "beam:transforms:python:fully_qualified_named",
+        external.ImplicitSchemaPayloadBuilder({
+            'constructor': urn,
+            'kwargs': args,
+        }).payload(),
+        self._service)
+
+
+# This is needed because type inference can't handle *args, **kwargs fowarding.
+# TODO: Fix Beam itself.
+def fix_pycallable():
+  from apache_beam.transforms.ptransform import label_from_callable
+
+  def default_label(self):
+    src = self._source.strip()
+    last_line = src.split('\n')[-1]
+    if last_line[0] != ' ' and len(last_line) < 72:
+      return last_line
+    return label_from_callable(self._callable)
+
+  def _argspec_fn(self):
+    return self._callable
+
+  python_callable.PythonCallableWithSource.default_label = default_label
+  python_callable.PythonCallableWithSource._argspec_fn = property(_argspec_fn)
+
+  original_infer_return_type = trivial_inference.infer_return_type
+
+  def infer_return_type(fn, *args, **kwargs):
+    if isinstance(fn, python_callable.PythonCallableWithSource):
+      fn = fn._callable
+    return original_infer_return_type(fn, *args, **kwargs)
+
+  trivial_inference.infer_return_type = infer_return_type
+
+  original_fn_takes_side_inputs = (
+      apache_beam.transforms.util.fn_takes_side_inputs)
+
+  def fn_takes_side_inputs(fn):
+    if isinstance(fn, python_callable.PythonCallableWithSource):
+      fn = fn._callable
+    return original_fn_takes_side_inputs(fn)
+
+  apache_beam.transforms.util.fn_takes_side_inputs = fn_takes_side_inputs
+
+
+class InlineProvider(Provider):
+  def __init__(self, transform_factories):
+    self._transform_factories = transform_factories
+
+  def available(self):
+    return True
+
+  def provided_transforms(self):
+    return self._transform_factories.keys()
+
+  def create_transform(self, type, args):
+    return self._transform_factories[type](**args)
+
+
+PRIMITIVE_NAMES_TO_ATOMIC_TYPE = {
+    py_type.__name__: schema_type
+    for (py_type, schema_type) in schemas.PRIMITIVE_TO_ATOMIC_TYPE.items()
+    if py_type.__module__ != 'typing'
+}
+
+
+def create_builtin_provider():
+  def with_schema(**args):
+    # TODO: This is preliminary.
+    def parse_type(spec):
+      if spec in PRIMITIVE_NAMES_TO_ATOMIC_TYPE:
+        return schema_pb2.FieldType(
+            atomic_type=PRIMITIVE_NAMES_TO_ATOMIC_TYPE[spec])
+      elif isinstance(spec, list):
+        if len(spec) != 1:
+          raise ValueError("Use single-element lists to denote list types.")
+        else:
+          return schema_pb2.FieldType(
+              iterable_type=schema_pb2.IterableType(
+                  element_type=parse_type(spec[0])))
+      elif isinstance(spec, dict):
+        return schema_pb2.FieldType(
+            iterable_type=schema_pb2.RowType(schema=parse_schema(spec[0])))
+      else:
+        raise ValueError("Unknown schema type: {spec}")
+
+    def parse_schema(spec):
+      return schema_pb2.Schema(
+          fields=[
+              schema_pb2.Field(name=key, type=parse_type(value), id=ix)
+              for (ix, (key, value)) in enumerate(spec.items())
+          ],
+          id=str(uuid.uuid4()))
+
+    named_tuple = schemas.named_tuple_from_schema(parse_schema(args))
+    names = list(args.keys())
+
+    def extract_field(x, name):
+      if isinstance(x, dict):
+        return x[name]
+      else:
+        return getattr(x, name)
+
+    return 'WithSchema(%s)' % ', '.join(names) >> beam.Map(
+        lambda x: named_tuple(*[extract_field(x, name) for name in names])
+    ).with_output_types(named_tuple)
+
+  # Or should this be posargs, args?
+  # pylint: disable=dangerous-default-value
+  def fully_qualified_named_transform(constructor, args=(), kwargs={}):
+    with FullyQualifiedNamedTransform.with_filter('*'):
+      return constructor >> FullyQualifiedNamedTransform(
+          constructor, args, kwargs)
+
+  class Flatten(beam.PTransform):
+    def expand(self, pcolls):
+      if isinstance(pcolls, beam.PCollection):
+        pipeline_arg = {}
+        pcolls = (pcolls, )
+      elif isinstance(pcolls, dict):
+        pipeline_arg = {}
+        pcolls = tuple(pcolls.values())
+      else:
+        pipeline_arg = {'pipeline': pcolls.pipeline}
+        pcolls = ()
+      return pcolls | beam.Flatten(**pipeline_arg)
+
+  ios = {
+      key: getattr(apache_beam.io, key)
+      for key in dir(apache_beam.io)
+      if key.startswith('ReadFrom') or key.startswith('WriteTo')

Review Comment:
   We'll have to enumerate them individually. 



##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -0,0 +1,434 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import collections
+import logging
+import re
+import uuid
+import yaml
+from typing import Iterable
+from typing import Mapping
+from yaml.loader import SafeLoader
+
+import apache_beam as beam
+from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
+from apache_beam.yaml import yaml_provider
+
+__all__ = ["YamlTransform"]
+
+_LOGGER = logging.getLogger(__name__)
+yaml_provider.fix_pycallable()
+
+
+def memoize_method(func):
+  def wrapper(self, *args):
+    if not hasattr(self, '_cache'):
+      self._cache = {}
+    key = func.__name__, args
+    if key not in self._cache:
+      self._cache[key] = func(self, *args)
+    return self._cache[key]
+
+  return wrapper
+
+
+def only_element(xs):
+  x, = xs
+  return x
+
+
+class SafeLineLoader(SafeLoader):
+  """A yaml loader that attaches line information to mappings and strings."""
+  class TaggedString(str):
+    """A string class to which we can attach metadata.
+
+    This is primarily used to trace a string's origin back to its place in a
+    yaml file.
+    """
+    def __reduce__(self):
+      # Pickle as an ordinary string.
+      return str, (str(self), )
+
+  def construct_scalar(self, node):
+    value = super().construct_scalar(node)
+    if isinstance(value, str):
+      value = SafeLineLoader.TaggedString(value)
+      value._line_ = node.start_mark.line + 1
+    return value
+
+  def construct_mapping(self, node, deep=False):
+    mapping = super().construct_mapping(node, deep=deep)
+    mapping['__line__'] = node.start_mark.line + 1
+    mapping['__uuid__'] = str(uuid.uuid4())
+    return mapping
+
+  @classmethod
+  def strip_metadata(cls, spec, tagged_str=True):
+    if isinstance(spec, Mapping):
+      return {
+          key: cls.strip_metadata(value, tagged_str)
+          for key,
+          value in spec.items() if key not in ('__line__', '__uuid__')
+      }
+    elif isinstance(spec, Iterable) and not isinstance(spec, (str, bytes)):
+      return [cls.strip_metadata(value, tagged_str) for value in spec]
+    elif isinstance(spec, SafeLineLoader.TaggedString) and tagged_str:
+      return str(spec)
+    else:
+      return spec
+
+  @staticmethod
+  def get_line(obj):
+    if isinstance(obj, dict):
+      return obj.get('__line__', 'unknown')
+    else:
+      return getattr(obj, '_line_', 'unknown')
+
+
+class Scope(object):
+  """To look up PCollections (typically outputs of prior transforms) by name."""
+  def __init__(self, root, inputs, transforms, providers):
+    self.root = root
+    self.providers = providers
+    self._inputs = inputs
+    self._transforms = transforms
+    self._transforms_by_uuid = {t['__uuid__']: t for t in self._transforms}
+    self._uuid_by_name = collections.defaultdict(list)
+    for spec in self._transforms:
+      if 'name' in spec:
+        self._uuid_by_name[spec['name']].append(spec['__uuid__'])
+      if 'type' in spec:
+        self._uuid_by_name[spec['type']].append(spec['__uuid__'])
+    self._seen_names = set()
+
+  def compute_all(self):
+    for transform_id in self._transforms_by_uuid.keys():
+      self.compute_outputs(transform_id)
+
+  def get_pcollection(self, name):
+    if name in self._inputs:
+      return self._inputs[name]
+    elif '.' in name:
+      transform, output = name.rsplit('.', 1)
+      outputs = self.get_outputs(transform)
+      if output in outputs:
+        return outputs[output]
+      else:
+        raise ValueError(
+            f'Unknown output {repr(output)} '
+            f'at line {SafeLineLoader.get_line(name)}: '
+            f'{transform} only has outputs {list(outputs.keys())}')
+    else:
+      outputs = self.get_outputs(name)
+      if len(outputs) == 1:
+        return only_element(outputs.values())
+      else:
+        raise ValueError(
+            f'Ambiguous output at line {SafeLineLoader.get_line(name)}: '
+            f'{name} has outputs {list(outputs.keys())}')
+
+  def get_outputs(self, transform_name):
+    if transform_name in self._transforms_by_uuid:
+      transform_id = transform_name
+    else:
+      candidates = self._uuid_by_name[transform_name]
+      if not candidates:
+        raise ValueError(
+            f'Unknown transform at line '
+            f'{SafeLineLoader.get_line(transform_name)}: {transform_name}')
+      elif len(candidates) > 1:
+        raise ValueError(
+            f'Ambiguous transform at line '
+            f'{SafeLineLoader.get_line(transform_name)}: {transform_name}')
+      else:
+        transform_id = only_element(candidates)
+    return self.compute_outputs(transform_id)
+
+  @memoize_method
+  def compute_outputs(self, transform_id):
+    return expand_transform(self._transforms_by_uuid[transform_id], self)
+
+  # A method on scope as providers may be scoped...
+  def create_ptransform(self, spec):
+    if 'type' not in spec:
+      raise ValueError(f'Missing transform type: {identify_object(spec)}')
+
+    if spec['type'] not in self.providers:
+      raise ValueError(
+          'Unknown transform type %r at %s' %
+          (spec['type'], identify_object(spec)))
+
+    for provider in self.providers.get(spec['type']):
+      if provider.available():
+        break
+    else:
+      raise ValueError(
+          'No available provider for type %r at %s' %
+          (spec['type'], identify_object(spec)))
+
+    if 'args' in spec:
+      args = spec['args']
+      if not isinstance(args, dict):
+        raise ValueError(
+            'Arguments for transform at %s must be a mapping.' %
+            identify_object(spec))
+    else:
+      args = {
+          key: value
+          for (key, value) in spec.items()
+          if key not in ('type', 'name', 'input', 'output')
+      }
+    try:
+      # pylint: disable=undefined-loop-variable
+      return provider.create_transform(
+          spec['type'], SafeLineLoader.strip_metadata(args))
+    except Exception as exn:
+      if isinstance(exn, TypeError):
+        # Create a slightly more generic error message for argument errors.
+        msg = str(exn).replace('positional', '').replace('keyword', '')
+        msg = re.sub(r'\S+lambda\S+', '', msg)
+        msg = re.sub('  +', ' ', msg).strip()
+      else:
+        msg = str(exn)
+      raise ValueError(
+          f'Invalid transform specification at {identify_object(spec)}: {msg}'
+      ) from exn
+
+  def unique_name(self, spec, ptransform, strictness=0):
+    if 'name' in spec:
+      name = spec['name']
+      strictness += 1
+    else:
+      name = ptransform.label
+    if name in self._seen_names:
+      if strictness >= 2:
+        raise ValueError(f'Duplicate name at {identify_object(spec)}: {name}')
+      else:
+        name = f'{name}@{SafeLineLoader.get_line(spec)}'
+    self._seen_names.add(name)
+    return name
+
+
+def expand_transform(spec, scope):
+  if 'type' not in spec:
+    raise TypeError(
+        f'Missing type parameter for transform at {identify_object(spec)}')
+  type = spec['type']
+  if type == 'composite':
+    return expand_composite_transform(spec, scope)
+  elif type == 'chain':
+    # TODO: Consistency.

Review Comment:
   Resolved.



##########
sdks/python/apache_beam/yaml/yaml_provider.py:
##########
@@ -0,0 +1,420 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This module defines Providers usable from yaml, which is a specification
+for where to find and how to invoke services that vend implementations of
+various PTransforms."""
+
+import collections
+import hashlib
+import json
+import os
+import subprocess
+import sys
+import uuid
+import yaml
+from yaml.loader import SafeLoader
+
+import apache_beam as beam
+import apache_beam.dataframe.io
+import apache_beam.io
+import apache_beam.transforms.util
+from apache_beam.portability.api import schema_pb2
+from apache_beam.transforms import external
+from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
+from apache_beam.typehints import schemas
+from apache_beam.typehints import trivial_inference
+from apache_beam.utils import python_callable
+from apache_beam.utils import subprocess_server
+from apache_beam.version import __version__ as beam_version
+
+
+class Provider:
+  """Maps transform types to concrete PTransform instances."""
+  def available(self):
+    raise NotImplementedError(type(self))
+
+  def provided_transforms(self):
+    raise NotImplementedError(type(self))
+
+  def create_transform(self, typ, args):
+    raise NotImplementedError(type(self))
+
+
+class ExternalProvider(Provider):
+  """A Provider implemented via the cross language transform service."""
+  def __init__(self, urns, service):
+    self._urns = urns
+    self._service = service
+    self._schema_transforms = None
+
+  def provided_transforms(self):
+    return self._urns.keys()
+
+  def create_transform(self, type, args):
+    if callable(self._service):
+      self._service = self._service()
+    if self._schema_transforms is None:
+      try:
+        self._schema_transforms = [
+            config.identifier
+            for config in external.SchemaAwareExternalTransform.discover(
+                self._service)
+        ]
+      except Exception:
+        self._schema_transforms = []
+    urn = self._urns[type]
+    if urn in self._schema_transforms:
+      return external.SchemaAwareExternalTransform(urn, self._service, **args)
+    else:
+      return type >> self.create_external_transform(urn, args)
+
+  def create_external_transform(self, urn, args):
+    return external.ExternalTransform(
+        urn,
+        external.ImplicitSchemaPayloadBuilder(args).payload(),
+        self._service)
+
+  @staticmethod
+  def provider_from_spec(spec):
+    urns = spec['transforms']
+    type = spec['type']
+    if spec.get('version', None) == 'BEAM_VERSION':
+      spec['version'] = beam_version
+    if type == 'jar':
+      return ExternalJavaProvider(urns, lambda: spec['jar'])
+    elif type == 'mavenJar':
+      return ExternalJavaProvider(
+          urns,
+          lambda: subprocess_server.JavaJarServer.path_to_maven_jar(
+              **{
+                  key: value
+                  for (key, value) in spec.items() if key in [
+                      'artifact_id',
+                      'group_id',
+                      'version',
+                      'repository',
+                      'classifier',
+                      'appendix'
+                  ]
+              }))
+    elif type == 'beamJar':
+      return ExternalJavaProvider(
+          urns,
+          lambda: subprocess_server.JavaJarServer.path_to_beam_jar(
+              **{
+                  key: value
+                  for (key, value) in spec.items() if key in
+                  ['gradle_target', 'version', 'appendix', 'artifact_id']
+              }))
+    elif type == 'pypi':
+      return ExternalPythonProvider(urns, spec['packages'])
+    elif type == 'remote':
+      return RemoteProvider(spec['address'])
+    elif type == 'docker':
+      raise NotImplementedError()
+    else:
+      raise NotImplementedError(f'Unknown provider type: {type}')
+
+
+class RemoteProvider(ExternalProvider):
+  _is_available = None
+
+  def available(self):
+    if self._is_available is None:
+      try:
+        with external.ExternalTransform.service(self._service) as service:
+          service.ready(1)
+          self._is_available = True
+      except Exception:
+        self._is_available = False
+    return self._is_available
+
+
+class ExternalJavaProvider(ExternalProvider):
+  def __init__(self, urns, jar_provider):
+    super().__init__(
+        urns, lambda: external.JavaJarExpansionService(jar_provider()))
+
+  def available(self):
+    # pylint: disable=subprocess-run-check
+    return subprocess.run(['which', 'java'],
+                          capture_output=True).returncode == 0
+
+
+class ExternalPythonProvider(ExternalProvider):
+  def __init__(self, urns, packages):
+    super().__init__(urns, PypiExpansionService(packages))
+
+  def available(self):
+    return True  # If we're running this script, we have Python installed.
+
+  def create_external_transform(self, urn, args):
+    # Python transforms are "registered" by fully qualified name.
+    return external.ExternalTransform(
+        "beam:transforms:python:fully_qualified_named",
+        external.ImplicitSchemaPayloadBuilder({
+            'constructor': urn,
+            'kwargs': args,
+        }).payload(),
+        self._service)
+
+
+# This is needed because type inference can't handle *args, **kwargs fowarding.
+# TODO: Fix Beam itself.

Review Comment:
   Done.



##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -0,0 +1,434 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import collections
+import logging
+import re
+import uuid
+import yaml
+from typing import Iterable
+from typing import Mapping
+from yaml.loader import SafeLoader
+
+import apache_beam as beam
+from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
+from apache_beam.yaml import yaml_provider
+
+__all__ = ["YamlTransform"]
+
+_LOGGER = logging.getLogger(__name__)
+yaml_provider.fix_pycallable()
+
+
+def memoize_method(func):
+  def wrapper(self, *args):
+    if not hasattr(self, '_cache'):
+      self._cache = {}
+    key = func.__name__, args
+    if key not in self._cache:
+      self._cache[key] = func(self, *args)
+    return self._cache[key]
+
+  return wrapper
+
+
+def only_element(xs):
+  x, = xs
+  return x
+
+
+class SafeLineLoader(SafeLoader):
+  """A yaml loader that attaches line information to mappings and strings."""
+  class TaggedString(str):
+    """A string class to which we can attach metadata.
+
+    This is primarily used to trace a string's origin back to its place in a
+    yaml file.
+    """
+    def __reduce__(self):
+      # Pickle as an ordinary string.
+      return str, (str(self), )
+
+  def construct_scalar(self, node):
+    value = super().construct_scalar(node)
+    if isinstance(value, str):
+      value = SafeLineLoader.TaggedString(value)
+      value._line_ = node.start_mark.line + 1
+    return value
+
+  def construct_mapping(self, node, deep=False):
+    mapping = super().construct_mapping(node, deep=deep)
+    mapping['__line__'] = node.start_mark.line + 1
+    mapping['__uuid__'] = str(uuid.uuid4())
+    return mapping
+
+  @classmethod
+  def strip_metadata(cls, spec, tagged_str=True):
+    if isinstance(spec, Mapping):
+      return {
+          key: cls.strip_metadata(value, tagged_str)
+          for key,
+          value in spec.items() if key not in ('__line__', '__uuid__')
+      }
+    elif isinstance(spec, Iterable) and not isinstance(spec, (str, bytes)):
+      return [cls.strip_metadata(value, tagged_str) for value in spec]
+    elif isinstance(spec, SafeLineLoader.TaggedString) and tagged_str:
+      return str(spec)
+    else:
+      return spec
+
+  @staticmethod
+  def get_line(obj):
+    if isinstance(obj, dict):
+      return obj.get('__line__', 'unknown')
+    else:
+      return getattr(obj, '_line_', 'unknown')
+
+
+class Scope(object):
+  """To look up PCollections (typically outputs of prior transforms) by name."""
+  def __init__(self, root, inputs, transforms, providers):
+    self.root = root
+    self.providers = providers
+    self._inputs = inputs
+    self._transforms = transforms
+    self._transforms_by_uuid = {t['__uuid__']: t for t in self._transforms}
+    self._uuid_by_name = collections.defaultdict(list)
+    for spec in self._transforms:
+      if 'name' in spec:
+        self._uuid_by_name[spec['name']].append(spec['__uuid__'])
+      if 'type' in spec:
+        self._uuid_by_name[spec['type']].append(spec['__uuid__'])
+    self._seen_names = set()
+
+  def compute_all(self):
+    for transform_id in self._transforms_by_uuid.keys():
+      self.compute_outputs(transform_id)
+
+  def get_pcollection(self, name):
+    if name in self._inputs:
+      return self._inputs[name]
+    elif '.' in name:
+      transform, output = name.rsplit('.', 1)
+      outputs = self.get_outputs(transform)
+      if output in outputs:
+        return outputs[output]
+      else:
+        raise ValueError(
+            f'Unknown output {repr(output)} '
+            f'at line {SafeLineLoader.get_line(name)}: '
+            f'{transform} only has outputs {list(outputs.keys())}')
+    else:
+      outputs = self.get_outputs(name)
+      if len(outputs) == 1:
+        return only_element(outputs.values())
+      else:
+        raise ValueError(
+            f'Ambiguous output at line {SafeLineLoader.get_line(name)}: '
+            f'{name} has outputs {list(outputs.keys())}')
+
+  def get_outputs(self, transform_name):
+    if transform_name in self._transforms_by_uuid:
+      transform_id = transform_name
+    else:
+      candidates = self._uuid_by_name[transform_name]
+      if not candidates:
+        raise ValueError(
+            f'Unknown transform at line '
+            f'{SafeLineLoader.get_line(transform_name)}: {transform_name}')
+      elif len(candidates) > 1:
+        raise ValueError(
+            f'Ambiguous transform at line '
+            f'{SafeLineLoader.get_line(transform_name)}: {transform_name}')
+      else:
+        transform_id = only_element(candidates)
+    return self.compute_outputs(transform_id)
+
+  @memoize_method
+  def compute_outputs(self, transform_id):
+    return expand_transform(self._transforms_by_uuid[transform_id], self)
+
+  # A method on scope as providers may be scoped...
+  def create_ptransform(self, spec):
+    if 'type' not in spec:
+      raise ValueError(f'Missing transform type: {identify_object(spec)}')
+
+    if spec['type'] not in self.providers:
+      raise ValueError(
+          'Unknown transform type %r at %s' %
+          (spec['type'], identify_object(spec)))
+
+    for provider in self.providers.get(spec['type']):
+      if provider.available():
+        break
+    else:
+      raise ValueError(
+          'No available provider for type %r at %s' %
+          (spec['type'], identify_object(spec)))
+
+    if 'args' in spec:
+      args = spec['args']
+      if not isinstance(args, dict):
+        raise ValueError(
+            'Arguments for transform at %s must be a mapping.' %
+            identify_object(spec))
+    else:
+      args = {
+          key: value
+          for (key, value) in spec.items()
+          if key not in ('type', 'name', 'input', 'output')
+      }
+    try:
+      # pylint: disable=undefined-loop-variable
+      return provider.create_transform(
+          spec['type'], SafeLineLoader.strip_metadata(args))
+    except Exception as exn:
+      if isinstance(exn, TypeError):
+        # Create a slightly more generic error message for argument errors.
+        msg = str(exn).replace('positional', '').replace('keyword', '')
+        msg = re.sub(r'\S+lambda\S+', '', msg)
+        msg = re.sub('  +', ' ', msg).strip()
+      else:
+        msg = str(exn)
+      raise ValueError(
+          f'Invalid transform specification at {identify_object(spec)}: {msg}'
+      ) from exn
+
+  def unique_name(self, spec, ptransform, strictness=0):
+    if 'name' in spec:
+      name = spec['name']
+      strictness += 1
+    else:
+      name = ptransform.label
+    if name in self._seen_names:
+      if strictness >= 2:
+        raise ValueError(f'Duplicate name at {identify_object(spec)}: {name}')
+      else:
+        name = f'{name}@{SafeLineLoader.get_line(spec)}'
+    self._seen_names.add(name)
+    return name
+
+
+def expand_transform(spec, scope):
+  if 'type' not in spec:
+    raise TypeError(
+        f'Missing type parameter for transform at {identify_object(spec)}')
+  type = spec['type']
+  if type == 'composite':
+    return expand_composite_transform(spec, scope)
+  elif type == 'chain':
+    # TODO: Consistency.
+    return expand_composite_transform(chain_as_composite(spec), scope)
+  else:
+    return expand_leaf_transform(spec, scope)
+
+
+def expand_leaf_transform(spec, scope):
+  spec = normalize_inputs_outputs(spec)
+  inputs_dict = {
+      key: scope.get_pcollection(value)
+      for (key, value) in spec['input'].items()
+  }
+  input_type = spec.get('input_type', 'default')
+  if input_type == 'list':
+    inputs = tuple(inputs_dict.values())
+  elif input_type == 'map':
+    inputs = inputs_dict
+  else:
+    if len(inputs_dict) == 0:
+      inputs = scope.root
+    elif len(inputs_dict) == 1:
+      inputs = next(iter(inputs_dict.values()))
+    else:
+      inputs = inputs_dict
+  _LOGGER.info("Expanding %s ", identify_object(spec))
+  ptransform = scope.create_ptransform(spec)
+  try:
+    # TODO: Move validation to construction?
+    with FullyQualifiedNamedTransform.with_filter('*'):
+      outputs = inputs | scope.unique_name(spec, ptransform) >> ptransform
+  except Exception as exn:
+    raise ValueError(
+        f"Errror apply transform {identify_object(spec)}: {exn}") from exn
+  if isinstance(outputs, dict):
+    # TODO: Handle (or at least reject) nested case.
+    return outputs
+  elif isinstance(outputs, (tuple, list)):
+    return {'out{ix}': pcoll for (ix, pcoll) in enumerate(outputs)}
+  elif isinstance(outputs, beam.PCollection):
+    return {'out': outputs}
+  else:
+    raise ValueError(
+        f'Transform {identify_object(spec)} returned an unexpected type '
+        f'{type(outputs)}')
+
+
+def expand_composite_transform(spec, scope):
+  spec = normalize_inputs_outputs(spec)
+
+  inner_scope = Scope(
+      scope.root, {
+          key: scope.get_pcollection(value)
+          for key,
+          value in spec['input'].items()
+      },
+      spec['transforms'],
+      yaml_provider.merge_providers(
+          yaml_provider.parse_providers(spec.get('providers', [])),
+          scope.providers))
+
+  class CompositePTransform(beam.PTransform):
+    @staticmethod
+    def expand(inputs):
+      inner_scope.compute_all()
+      return {
+          key: inner_scope.get_pcollection(value)
+          for (key, value) in spec['output'].items()
+      }
+
+  if 'name' not in spec:
+    spec['name'] = 'Composite'
+  if spec['name'] is None:  # top-level pipeline, don't nest
+    return CompositePTransform.expand(None)
+  else:
+    _LOGGER.info("Expanding %s ", identify_object(spec))
+    return ({
+        key: scope.get_pcollection(value)
+        for key,
+        value in spec['input'].items()
+    } or scope.root) | scope.unique_name(spec, None) >> CompositePTransform()
+
+
+def chain_as_composite(spec):
+  # A chain is simply a composite transform where all inputs and outputs
+  # are implicit.
+  if 'transforms' not in spec:
+    raise TypeError(
+        f"Chain at {identify_object(spec)} missing transforms property.")
+  has_explicit_outputs = 'output' in spec
+  composite_spec = normalize_inputs_outputs(spec)
+  new_transforms = []
+  for ix, transform in enumerate(composite_spec['transforms']):
+    if any(io in transform for io in ('input', 'output', 'input', 'output')):
+      raise ValueError(
+          f'Transform {identify_object(transform)} is part of a chain, '
+          'must have implicit inputs and outputs.')
+    if ix == 0:
+      transform['input'] = {key: key for key in composite_spec['input'].keys()}
+    else:
+      transform['input'] = new_transforms[-1]['__uuid__']
+    new_transforms.append(transform)
+  composite_spec['transforms'] = new_transforms
+
+  last_transform = new_transforms[-1]['__uuid__']
+  if has_explicit_outputs:
+    composite_spec['output'] = {
+        key: f'{last_transform}.{value}'
+        for (key, value) in composite_spec['output'].items()
+    }
+  else:
+    composite_spec['output'] = last_transform
+  if 'name' not in composite_spec:
+    composite_spec['name'] = 'Chain'
+  composite_spec['type'] = 'composite'
+  return composite_spec
+
+
+def pipeline_as_composite(spec):
+  if isinstance(spec, list):
+    return {
+        'type': 'composite',
+        'name': None,
+        'transforms': spec,
+        '__line__': spec[0]['__line__'],
+        '__uuid__': str(uuid.uuid4()),
+    }
+  else:
+    return dict(spec, name=None, type='composite')
+
+
+def normalize_inputs_outputs(spec):
+  spec = dict(spec)
+
+  def normalize_io(tag):
+    io = spec.get(tag, {})
+    if isinstance(io, str):
+      return {tag: io}
+    elif isinstance(io, list):
+      return {f'{tag}{ix}': value for ix, value in enumerate(io)}
+    else:
+      return SafeLineLoader.strip_metadata(io, tagged_str=False)
+
+  return dict(spec, input=normalize_io('input'), output=normalize_io('output'))
+
+
+def identify_object(spec):
+  line = SafeLineLoader.get_line(spec)
+  name = extract_name(spec)
+  if name:
+    return f'"{name}" at line {line}'
+  else:
+    return f'at line {line}'
+
+
+def extract_name(spec):
+  if 'name' in spec:
+    return spec['name']
+  elif 'id' in spec:
+    return spec['id']
+  elif 'type' in spec:
+    return spec['type']
+  elif len(spec) == 1:
+    return extract_name(next(iter(spec.values())))
+  else:
+    return ''
+
+
+class YamlTransform(beam.PTransform):
+  def __init__(self, spec, providers={}):  # pylint: disable=dangerous-default-value
+    if isinstance(spec, str):
+      spec = yaml.load(spec, Loader=SafeLineLoader)
+    self._spec = spec

Review Comment:
   I added a schema that we can validate against.



##########
sdks/python/apache_beam/yaml/yaml_provider.py:
##########
@@ -0,0 +1,420 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""This module defines Providers usable from yaml, which is a specification
+for where to find and how to invoke services that vend implementations of
+various PTransforms."""
+
+import collections
+import hashlib
+import json
+import os
+import subprocess
+import sys
+import uuid
+import yaml
+from yaml.loader import SafeLoader
+
+import apache_beam as beam
+import apache_beam.dataframe.io
+import apache_beam.io
+import apache_beam.transforms.util
+from apache_beam.portability.api import schema_pb2
+from apache_beam.transforms import external
+from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
+from apache_beam.typehints import schemas
+from apache_beam.typehints import trivial_inference
+from apache_beam.utils import python_callable
+from apache_beam.utils import subprocess_server
+from apache_beam.version import __version__ as beam_version
+
+
+class Provider:
+  """Maps transform types to concrete PTransform instances."""
+  def available(self):
+    raise NotImplementedError(type(self))
+
+  def provided_transforms(self):
+    raise NotImplementedError(type(self))
+
+  def create_transform(self, typ, args):
+    raise NotImplementedError(type(self))
+
+
+class ExternalProvider(Provider):
+  """A Provider implemented via the cross language transform service."""
+  def __init__(self, urns, service):
+    self._urns = urns
+    self._service = service
+    self._schema_transforms = None
+
+  def provided_transforms(self):
+    return self._urns.keys()
+
+  def create_transform(self, type, args):
+    if callable(self._service):
+      self._service = self._service()
+    if self._schema_transforms is None:
+      try:
+        self._schema_transforms = [
+            config.identifier
+            for config in external.SchemaAwareExternalTransform.discover(
+                self._service)
+        ]
+      except Exception:
+        self._schema_transforms = []
+    urn = self._urns[type]
+    if urn in self._schema_transforms:
+      return external.SchemaAwareExternalTransform(urn, self._service, **args)

Review Comment:
   True, but all available runners in Python are portable runners (and I'd be tempted to say that the other runners aren't "real" beam Runners :-). Is there somewhere specific you'd want this mentioned?  



##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -0,0 +1,434 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import collections
+import logging
+import re
+import uuid
+import yaml
+from typing import Iterable
+from typing import Mapping
+from yaml.loader import SafeLoader
+
+import apache_beam as beam
+from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
+from apache_beam.yaml import yaml_provider
+
+__all__ = ["YamlTransform"]
+
+_LOGGER = logging.getLogger(__name__)
+yaml_provider.fix_pycallable()
+
+
+def memoize_method(func):
+  def wrapper(self, *args):
+    if not hasattr(self, '_cache'):
+      self._cache = {}
+    key = func.__name__, args
+    if key not in self._cache:
+      self._cache[key] = func(self, *args)
+    return self._cache[key]
+
+  return wrapper
+
+
+def only_element(xs):
+  x, = xs
+  return x
+
+
+class SafeLineLoader(SafeLoader):
+  """A yaml loader that attaches line information to mappings and strings."""
+  class TaggedString(str):
+    """A string class to which we can attach metadata.
+
+    This is primarily used to trace a string's origin back to its place in a
+    yaml file.
+    """
+    def __reduce__(self):
+      # Pickle as an ordinary string.
+      return str, (str(self), )
+
+  def construct_scalar(self, node):
+    value = super().construct_scalar(node)
+    if isinstance(value, str):
+      value = SafeLineLoader.TaggedString(value)
+      value._line_ = node.start_mark.line + 1
+    return value
+
+  def construct_mapping(self, node, deep=False):
+    mapping = super().construct_mapping(node, deep=deep)
+    mapping['__line__'] = node.start_mark.line + 1
+    mapping['__uuid__'] = str(uuid.uuid4())
+    return mapping
+
+  @classmethod
+  def strip_metadata(cls, spec, tagged_str=True):
+    if isinstance(spec, Mapping):
+      return {
+          key: cls.strip_metadata(value, tagged_str)
+          for key,
+          value in spec.items() if key not in ('__line__', '__uuid__')
+      }
+    elif isinstance(spec, Iterable) and not isinstance(spec, (str, bytes)):
+      return [cls.strip_metadata(value, tagged_str) for value in spec]
+    elif isinstance(spec, SafeLineLoader.TaggedString) and tagged_str:
+      return str(spec)
+    else:
+      return spec
+
+  @staticmethod
+  def get_line(obj):
+    if isinstance(obj, dict):
+      return obj.get('__line__', 'unknown')
+    else:
+      return getattr(obj, '_line_', 'unknown')
+
+
+class Scope(object):
+  """To look up PCollections (typically outputs of prior transforms) by name."""
+  def __init__(self, root, inputs, transforms, providers):
+    self.root = root
+    self.providers = providers
+    self._inputs = inputs
+    self._transforms = transforms
+    self._transforms_by_uuid = {t['__uuid__']: t for t in self._transforms}
+    self._uuid_by_name = collections.defaultdict(list)
+    for spec in self._transforms:
+      if 'name' in spec:
+        self._uuid_by_name[spec['name']].append(spec['__uuid__'])
+      if 'type' in spec:
+        self._uuid_by_name[spec['type']].append(spec['__uuid__'])
+    self._seen_names = set()
+
+  def compute_all(self):
+    for transform_id in self._transforms_by_uuid.keys():
+      self.compute_outputs(transform_id)
+
+  def get_pcollection(self, name):
+    if name in self._inputs:
+      return self._inputs[name]
+    elif '.' in name:
+      transform, output = name.rsplit('.', 1)
+      outputs = self.get_outputs(transform)
+      if output in outputs:
+        return outputs[output]
+      else:
+        raise ValueError(
+            f'Unknown output {repr(output)} '
+            f'at line {SafeLineLoader.get_line(name)}: '
+            f'{transform} only has outputs {list(outputs.keys())}')
+    else:
+      outputs = self.get_outputs(name)
+      if len(outputs) == 1:
+        return only_element(outputs.values())
+      else:
+        raise ValueError(
+            f'Ambiguous output at line {SafeLineLoader.get_line(name)}: '
+            f'{name} has outputs {list(outputs.keys())}')
+
+  def get_outputs(self, transform_name):
+    if transform_name in self._transforms_by_uuid:
+      transform_id = transform_name
+    else:
+      candidates = self._uuid_by_name[transform_name]
+      if not candidates:
+        raise ValueError(
+            f'Unknown transform at line '
+            f'{SafeLineLoader.get_line(transform_name)}: {transform_name}')
+      elif len(candidates) > 1:
+        raise ValueError(
+            f'Ambiguous transform at line '
+            f'{SafeLineLoader.get_line(transform_name)}: {transform_name}')
+      else:
+        transform_id = only_element(candidates)
+    return self.compute_outputs(transform_id)
+
+  @memoize_method
+  def compute_outputs(self, transform_id):
+    return expand_transform(self._transforms_by_uuid[transform_id], self)
+
+  # A method on scope as providers may be scoped...
+  def create_ptransform(self, spec):
+    if 'type' not in spec:
+      raise ValueError(f'Missing transform type: {identify_object(spec)}')
+
+    if spec['type'] not in self.providers:
+      raise ValueError(
+          'Unknown transform type %r at %s' %
+          (spec['type'], identify_object(spec)))
+
+    for provider in self.providers.get(spec['type']):
+      if provider.available():
+        break
+    else:
+      raise ValueError(
+          'No available provider for type %r at %s' %
+          (spec['type'], identify_object(spec)))
+
+    if 'args' in spec:
+      args = spec['args']
+      if not isinstance(args, dict):
+        raise ValueError(
+            'Arguments for transform at %s must be a mapping.' %
+            identify_object(spec))
+    else:
+      args = {
+          key: value
+          for (key, value) in spec.items()
+          if key not in ('type', 'name', 'input', 'output')
+      }
+    try:
+      # pylint: disable=undefined-loop-variable
+      return provider.create_transform(
+          spec['type'], SafeLineLoader.strip_metadata(args))
+    except Exception as exn:
+      if isinstance(exn, TypeError):
+        # Create a slightly more generic error message for argument errors.
+        msg = str(exn).replace('positional', '').replace('keyword', '')
+        msg = re.sub(r'\S+lambda\S+', '', msg)
+        msg = re.sub('  +', ' ', msg).strip()
+      else:
+        msg = str(exn)
+      raise ValueError(
+          f'Invalid transform specification at {identify_object(spec)}: {msg}'
+      ) from exn
+
+  def unique_name(self, spec, ptransform, strictness=0):
+    if 'name' in spec:
+      name = spec['name']
+      strictness += 1
+    else:
+      name = ptransform.label
+    if name in self._seen_names:
+      if strictness >= 2:
+        raise ValueError(f'Duplicate name at {identify_object(spec)}: {name}')
+      else:
+        name = f'{name}@{SafeLineLoader.get_line(spec)}'
+    self._seen_names.add(name)
+    return name
+
+
+def expand_transform(spec, scope):
+  if 'type' not in spec:
+    raise TypeError(
+        f'Missing type parameter for transform at {identify_object(spec)}')
+  type = spec['type']
+  if type == 'composite':
+    return expand_composite_transform(spec, scope)
+  elif type == 'chain':
+    # TODO: Consistency.
+    return expand_composite_transform(chain_as_composite(spec), scope)
+  else:
+    return expand_leaf_transform(spec, scope)
+
+
+def expand_leaf_transform(spec, scope):
+  spec = normalize_inputs_outputs(spec)
+  inputs_dict = {
+      key: scope.get_pcollection(value)
+      for (key, value) in spec['input'].items()
+  }
+  input_type = spec.get('input_type', 'default')
+  if input_type == 'list':
+    inputs = tuple(inputs_dict.values())
+  elif input_type == 'map':
+    inputs = inputs_dict
+  else:
+    if len(inputs_dict) == 0:
+      inputs = scope.root
+    elif len(inputs_dict) == 1:
+      inputs = next(iter(inputs_dict.values()))
+    else:
+      inputs = inputs_dict
+  _LOGGER.info("Expanding %s ", identify_object(spec))
+  ptransform = scope.create_ptransform(spec)
+  try:
+    # TODO: Move validation to construction?
+    with FullyQualifiedNamedTransform.with_filter('*'):
+      outputs = inputs | scope.unique_name(spec, ptransform) >> ptransform
+  except Exception as exn:
+    raise ValueError(
+        f"Errror apply transform {identify_object(spec)}: {exn}") from exn
+  if isinstance(outputs, dict):
+    # TODO: Handle (or at least reject) nested case.
+    return outputs
+  elif isinstance(outputs, (tuple, list)):
+    return {'out{ix}': pcoll for (ix, pcoll) in enumerate(outputs)}
+  elif isinstance(outputs, beam.PCollection):
+    return {'out': outputs}
+  else:
+    raise ValueError(
+        f'Transform {identify_object(spec)} returned an unexpected type '
+        f'{type(outputs)}')
+
+
+def expand_composite_transform(spec, scope):
+  spec = normalize_inputs_outputs(spec)
+
+  inner_scope = Scope(
+      scope.root, {
+          key: scope.get_pcollection(value)
+          for key,
+          value in spec['input'].items()
+      },
+      spec['transforms'],
+      yaml_provider.merge_providers(
+          yaml_provider.parse_providers(spec.get('providers', [])),
+          scope.providers))
+
+  class CompositePTransform(beam.PTransform):
+    @staticmethod
+    def expand(inputs):
+      inner_scope.compute_all()
+      return {
+          key: inner_scope.get_pcollection(value)
+          for (key, value) in spec['output'].items()
+      }
+
+  if 'name' not in spec:
+    spec['name'] = 'Composite'
+  if spec['name'] is None:  # top-level pipeline, don't nest
+    return CompositePTransform.expand(None)
+  else:
+    _LOGGER.info("Expanding %s ", identify_object(spec))
+    return ({
+        key: scope.get_pcollection(value)
+        for key,
+        value in spec['input'].items()
+    } or scope.root) | scope.unique_name(spec, None) >> CompositePTransform()
+
+
+def chain_as_composite(spec):
+  # A chain is simply a composite transform where all inputs and outputs
+  # are implicit.
+  if 'transforms' not in spec:
+    raise TypeError(
+        f"Chain at {identify_object(spec)} missing transforms property.")
+  has_explicit_outputs = 'output' in spec
+  composite_spec = normalize_inputs_outputs(spec)
+  new_transforms = []
+  for ix, transform in enumerate(composite_spec['transforms']):
+    if any(io in transform for io in ('input', 'output', 'input', 'output')):
+      raise ValueError(
+          f'Transform {identify_object(transform)} is part of a chain, '
+          'must have implicit inputs and outputs.')
+    if ix == 0:
+      transform['input'] = {key: key for key in composite_spec['input'].keys()}
+    else:
+      transform['input'] = new_transforms[-1]['__uuid__']
+    new_transforms.append(transform)
+  composite_spec['transforms'] = new_transforms
+
+  last_transform = new_transforms[-1]['__uuid__']
+  if has_explicit_outputs:
+    composite_spec['output'] = {
+        key: f'{last_transform}.{value}'
+        for (key, value) in composite_spec['output'].items()
+    }
+  else:
+    composite_spec['output'] = last_transform
+  if 'name' not in composite_spec:
+    composite_spec['name'] = 'Chain'
+  composite_spec['type'] = 'composite'
+  return composite_spec
+
+
+def pipeline_as_composite(spec):
+  if isinstance(spec, list):
+    return {
+        'type': 'composite',
+        'name': None,
+        'transforms': spec,
+        '__line__': spec[0]['__line__'],
+        '__uuid__': str(uuid.uuid4()),
+    }
+  else:
+    return dict(spec, name=None, type='composite')
+
+
+def normalize_inputs_outputs(spec):
+  spec = dict(spec)
+
+  def normalize_io(tag):
+    io = spec.get(tag, {})
+    if isinstance(io, str):
+      return {tag: io}
+    elif isinstance(io, list):
+      return {f'{tag}{ix}': value for ix, value in enumerate(io)}
+    else:
+      return SafeLineLoader.strip_metadata(io, tagged_str=False)
+
+  return dict(spec, input=normalize_io('input'), output=normalize_io('output'))
+
+
+def identify_object(spec):
+  line = SafeLineLoader.get_line(spec)
+  name = extract_name(spec)
+  if name:
+    return f'"{name}" at line {line}'
+  else:
+    return f'at line {line}'
+
+
+def extract_name(spec):
+  if 'name' in spec:
+    return spec['name']
+  elif 'id' in spec:
+    return spec['id']
+  elif 'type' in spec:
+    return spec['type']
+  elif len(spec) == 1:
+    return extract_name(next(iter(spec.values())))
+  else:
+    return ''
+
+
+class YamlTransform(beam.PTransform):
+  def __init__(self, spec, providers={}):  # pylint: disable=dangerous-default-value
+    if isinstance(spec, str):
+      spec = yaml.load(spec, Loader=SafeLineLoader)
+    self._spec = spec
+    self._providers = yaml_provider.merge_providers(
+        providers, yaml_provider.standard_providers())
+
+  def expand(self, pcolls):
+    if isinstance(pcolls, beam.pvalue.PBegin):
+      root = pcolls
+      pcolls = {}
+    elif isinstance(pcolls, beam.PCollection):
+      root = pcolls.pipeline
+      pcolls = {'input': pcolls}
+    else:
+      root = next(iter(pcolls.values())).pipeline
+    result = expand_transform(
+        self._spec,
+        Scope(root, pcolls, transforms=[], providers=self._providers))
+    if len(result) == 1:
+      return only_element(result.values())
+    else:
+      return result
+
+
+def expand_pipeline(pipeline, pipeline_spec):

Review Comment:
   It's used to construct the PBegin root object.



##########
sdks/python/apache_beam/yaml/yaml_transform_test.py:
##########
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.yaml.yaml_transform import YamlTransform
+
+
+class YamlTransformTest(unittest.TestCase):
+  def test_composite(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      elements = p | beam.Create([1, 2, 3])
+      result = elements | YamlTransform(
+          '''
+          type: composite
+          input:
+              elements: input
+          transforms:
+            - type: PyMap

Review Comment:
   This creates a composite transform. One could also do 
   
   ```
   YamlTransform(
       '''
       type: PyMap
       name: Cube
       '''
   )
   ```
   
   to get a single transform. I agree the input here is a bit hacky, dropped a TODO to think about this.



##########
sdks/python/apache_beam/yaml/yaml_transform_test.py:
##########
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import unittest
+
+import apache_beam as beam
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.yaml.yaml_transform import YamlTransform
+
+
+class YamlTransformTest(unittest.TestCase):
+  def test_composite(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      elements = p | beam.Create([1, 2, 3])
+      result = elements | YamlTransform(
+          '''
+          type: composite
+          input:
+              elements: input
+          transforms:
+            - type: PyMap
+              name: Square
+              input: elements
+              fn: "lambda x: x * x"
+            - type: PyMap
+              name: Cube
+              input: elements
+              fn: "lambda x: x * x * x"
+            - type: Flatten
+              input: [Square, Cube]
+          output:
+              Flatten
+          ''')
+      assert_that(result, equal_to([1, 4, 9, 1, 8, 27]))
+
+  def test_chain_with_input(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(

Review Comment:
   True, but that requires invoking the whole cross-langauge testing infrastructure framework, and I was hoping to run these as unit tests (mostly validating the parsing and construction). 



##########
sdks/python/apache_beam/yaml/yaml_transform.py:
##########
@@ -0,0 +1,434 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import collections
+import logging
+import re
+import uuid
+import yaml
+from typing import Iterable
+from typing import Mapping
+from yaml.loader import SafeLoader
+
+import apache_beam as beam
+from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
+from apache_beam.yaml import yaml_provider
+
+__all__ = ["YamlTransform"]
+
+_LOGGER = logging.getLogger(__name__)
+yaml_provider.fix_pycallable()
+
+
+def memoize_method(func):
+  def wrapper(self, *args):
+    if not hasattr(self, '_cache'):
+      self._cache = {}
+    key = func.__name__, args
+    if key not in self._cache:
+      self._cache[key] = func(self, *args)
+    return self._cache[key]
+
+  return wrapper
+
+
+def only_element(xs):
+  x, = xs
+  return x
+
+
+class SafeLineLoader(SafeLoader):
+  """A yaml loader that attaches line information to mappings and strings."""
+  class TaggedString(str):
+    """A string class to which we can attach metadata.
+
+    This is primarily used to trace a string's origin back to its place in a
+    yaml file.
+    """
+    def __reduce__(self):
+      # Pickle as an ordinary string.
+      return str, (str(self), )
+
+  def construct_scalar(self, node):
+    value = super().construct_scalar(node)
+    if isinstance(value, str):
+      value = SafeLineLoader.TaggedString(value)
+      value._line_ = node.start_mark.line + 1
+    return value
+
+  def construct_mapping(self, node, deep=False):
+    mapping = super().construct_mapping(node, deep=deep)
+    mapping['__line__'] = node.start_mark.line + 1
+    mapping['__uuid__'] = str(uuid.uuid4())
+    return mapping
+
+  @classmethod
+  def strip_metadata(cls, spec, tagged_str=True):
+    if isinstance(spec, Mapping):
+      return {
+          key: cls.strip_metadata(value, tagged_str)
+          for key,
+          value in spec.items() if key not in ('__line__', '__uuid__')
+      }
+    elif isinstance(spec, Iterable) and not isinstance(spec, (str, bytes)):
+      return [cls.strip_metadata(value, tagged_str) for value in spec]
+    elif isinstance(spec, SafeLineLoader.TaggedString) and tagged_str:
+      return str(spec)
+    else:
+      return spec
+
+  @staticmethod
+  def get_line(obj):
+    if isinstance(obj, dict):
+      return obj.get('__line__', 'unknown')
+    else:
+      return getattr(obj, '_line_', 'unknown')
+
+
+class Scope(object):
+  """To look up PCollections (typically outputs of prior transforms) by name."""
+  def __init__(self, root, inputs, transforms, providers):
+    self.root = root
+    self.providers = providers
+    self._inputs = inputs
+    self._transforms = transforms
+    self._transforms_by_uuid = {t['__uuid__']: t for t in self._transforms}
+    self._uuid_by_name = collections.defaultdict(list)
+    for spec in self._transforms:
+      if 'name' in spec:
+        self._uuid_by_name[spec['name']].append(spec['__uuid__'])
+      if 'type' in spec:
+        self._uuid_by_name[spec['type']].append(spec['__uuid__'])
+    self._seen_names = set()
+
+  def compute_all(self):
+    for transform_id in self._transforms_by_uuid.keys():
+      self.compute_outputs(transform_id)
+
+  def get_pcollection(self, name):
+    if name in self._inputs:
+      return self._inputs[name]
+    elif '.' in name:
+      transform, output = name.rsplit('.', 1)
+      outputs = self.get_outputs(transform)
+      if output in outputs:
+        return outputs[output]
+      else:
+        raise ValueError(
+            f'Unknown output {repr(output)} '
+            f'at line {SafeLineLoader.get_line(name)}: '
+            f'{transform} only has outputs {list(outputs.keys())}')
+    else:
+      outputs = self.get_outputs(name)
+      if len(outputs) == 1:
+        return only_element(outputs.values())
+      else:
+        raise ValueError(
+            f'Ambiguous output at line {SafeLineLoader.get_line(name)}: '
+            f'{name} has outputs {list(outputs.keys())}')
+
+  def get_outputs(self, transform_name):
+    if transform_name in self._transforms_by_uuid:
+      transform_id = transform_name
+    else:
+      candidates = self._uuid_by_name[transform_name]
+      if not candidates:
+        raise ValueError(
+            f'Unknown transform at line '
+            f'{SafeLineLoader.get_line(transform_name)}: {transform_name}')
+      elif len(candidates) > 1:
+        raise ValueError(
+            f'Ambiguous transform at line '
+            f'{SafeLineLoader.get_line(transform_name)}: {transform_name}')
+      else:
+        transform_id = only_element(candidates)
+    return self.compute_outputs(transform_id)
+
+  @memoize_method
+  def compute_outputs(self, transform_id):
+    return expand_transform(self._transforms_by_uuid[transform_id], self)
+
+  # A method on scope as providers may be scoped...
+  def create_ptransform(self, spec):
+    if 'type' not in spec:
+      raise ValueError(f'Missing transform type: {identify_object(spec)}')
+
+    if spec['type'] not in self.providers:
+      raise ValueError(
+          'Unknown transform type %r at %s' %
+          (spec['type'], identify_object(spec)))
+
+    for provider in self.providers.get(spec['type']):
+      if provider.available():
+        break
+    else:

Review Comment:
   This is the else of the for loop. https://docs.python.org/3/tutorial/controlflow.html#break-and-continue-statements-and-else-clauses-on-loops



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org