You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2023/12/13 22:07:09 UTC
(beam) branch master updated: [yaml] Normalize drop on MapToFields
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a04a4ad4cd2 [yaml] Normalize drop on MapToFields
new c124e473fa4 Merge pull request #29755 from Polber/jkinard/mapping-drop
a04a4ad4cd2 is described below
commit a04a4ad4cd2cd5ee5e22f131176e5e4867120d82
Author: Jeffrey Kinard <je...@thekinards.com>
AuthorDate: Wed Dec 13 13:36:16 2023 -0500
[yaml] Normalize drop on MapToFields
Signed-off-by: Jeffrey Kinard <je...@thekinards.com>
---
sdks/python/apache_beam/yaml/yaml_mapping.py | 13 +++++++++++--
sdks/python/apache_beam/yaml/yaml_transform.py | 2 ++
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py b/sdks/python/apache_beam/yaml/yaml_mapping.py
index 0ce706bbea5..13b4d900791 100644
--- a/sdks/python/apache_beam/yaml/yaml_mapping.py
+++ b/sdks/python/apache_beam/yaml/yaml_mapping.py
@@ -48,6 +48,17 @@ from apache_beam.yaml import yaml_provider
from apache_beam.yaml.yaml_provider import dicts_to_rows
+def normalize_mapping(spec):
+ """
+ Normalizes various fields for mapping transforms.
+ """
+ if spec['type'] == 'MapToFields':
+ config = spec.get('config')
+ if isinstance(config.get('drop'), str):
+ config['drop'] = [config['drop']]
+ return spec
+
+
def _check_mapping_arguments(
transform_name, expression=None, callable=None, name=None, path=None):
# Argument checking
@@ -453,8 +464,6 @@ def normalize_fields(pcoll, fields, drop=(), append=False, language='generic'):
raise ValueError("Can only use expressions on a schema'd input.") from exn
input_schema = {}
- if isinstance(drop, str):
- drop = [drop]
if drop and not append:
raise ValueError("Can only drop fields if append is true.")
for name in drop:
diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py
index 8a5ccb3bb8b..5d71583e193 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform.py
@@ -36,6 +36,7 @@ from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.transforms.fully_qualified_named_transform import FullyQualifiedNamedTransform
from apache_beam.yaml import yaml_provider
from apache_beam.yaml.yaml_combine import normalize_combine
+from apache_beam.yaml.yaml_mapping import normalize_mapping
__all__ = ["YamlTransform"]
@@ -904,6 +905,7 @@ def preprocess(spec, verbose=False, known_transforms=None):
for phase in [
ensure_transforms_have_types,
+ normalize_mapping,
normalize_combine,
preprocess_langauges,
ensure_transforms_have_providers,