You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2023/12/13 22:07:09 UTC

(beam) branch master updated: [yaml] Normalize drop on MapToFields

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a04a4ad4cd2 [yaml] Normalize drop on MapToFields
     new c124e473fa4 Merge pull request #29755 from Polber/jkinard/mapping-drop
a04a4ad4cd2 is described below

commit a04a4ad4cd2cd5ee5e22f131176e5e4867120d82
Author: Jeffrey Kinard <je...@thekinards.com>
AuthorDate: Wed Dec 13 13:36:16 2023 -0500

    [yaml] Normalize drop on MapToFields
    
    Signed-off-by: Jeffrey Kinard <je...@thekinards.com>
---
 sdks/python/apache_beam/yaml/yaml_mapping.py   | 13 +++++++++++--
 sdks/python/apache_beam/yaml/yaml_transform.py |  2 ++
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py
index 0ce706bbea5..13b4d900791 100644
--- a/sdks/python/apache_beam/yaml/yaml_mapping.py
+++ b/sdks/python/apache_beam/yaml/yaml_mapping.py
@@ -48,6 +48,17 @@ from apache_beam.yaml import yaml_provider
 from apache_beam.yaml.yaml_provider import dicts_to_rows
 
 
+def normalize_mapping(spec):
+  """
+  Normalizes various fields for mapping transforms.
+  """
+  if spec['type'] == 'MapToFields':
+    config = spec.get('config')
+    if isinstance(config.get('drop'), str):
+      config['drop'] = [config['drop']]
+  return spec
+
+
 def _check_mapping_arguments(
     transform_name, expression=None, callable=None, name=None, path=None):
   # Argument checking
@@ -453,8 +464,6 @@ def normalize_fields(pcoll, fields, drop=(), append=False, language='generic'):
       raise ValueError("Can only use expressions on a schema'd input.") from exn
     input_schema = {}
 
-  if isinstance(drop, str):
-    drop = [drop]
   if drop and not append:
     raise ValueError("Can only drop fields if append is true.")
   for name in drop:
diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py
index 8a5ccb3bb8b..5d71583e193 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform.py
@@ -36,6 +36,7 @@ from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
 from apache_beam.yaml import yaml_provider
 from apache_beam.yaml.yaml_combine import normalize_combine
+from apache_beam.yaml.yaml_mapping import normalize_mapping
 
 __all__ = ["YamlTransform"]
 
@@ -904,6 +905,7 @@ def preprocess(spec, verbose=False, known_transforms=None):
 
   for phase in [
       ensure_transforms_have_types,
+      normalize_mapping,
       normalize_combine,
       preprocess_langauges,
       ensure_transforms_have_providers,