You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "Polber (via GitHub)" <gi...@apache.org> on 2023/09/20 20:55:18 UTC

[GitHub] [beam] Polber commented on a diff in pull request #28462: Refactor and cleanup yaml MapToFields.

Polber commented on code in PR #28462:
URL: https://github.com/apache/beam/pull/28462#discussion_r1332168917


##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -190,189 +253,169 @@ def with_exception_handling(self, **kwargs):
     return self
 
 
-# TODO(yaml): Should Filter and Explode be distinct operations from Project?
-# We'll want these per-language.
 @beam.ptransform.ptransform_fn
-def _PythonProjectionTransform(
-    pcoll,
-    *,
-    fields,
-    transform_name,
-    language,
-    keep=None,
-    explode=(),
-    cross_product=True,
-    error_handling=None):
-  original_fields = [
-      name for (name, _) in named_fields_from_element_type(pcoll.element_type)
-  ]
+@maybe_with_exception_handling_transform_fn
+def _PyJsFilter(
+    pcoll, keep: Union[str, Dict[str, str]], language: Optional[str] = None):
 
-  if error_handling is None:
-    error_handling_args = None
+  input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+  if isinstance(keep, str) and keep in input_schema:
+    keep_fn = lambda row: getattr(row, keep)
   else:
-    error_handling_args = {
-        'dead_letter_tag' if k == 'output' else k: v
-        for (k, v) in error_handling.items()
-    }
+    keep_fn = _as_callable(list(input_schema.keys()), keep, "keep", language)
+  return pcoll | beam.Filter(keep_fn)
 
-  pcoll = beam.core._MaybePValueWithErrors(pcoll, error_handling_args)
 
-  if keep:
-    if isinstance(keep, str) and keep in original_fields:
-      keep_fn = lambda row: getattr(row, keep)
-    else:
-      keep_fn = _as_callable(original_fields, keep, transform_name, language)
-    filtered = pcoll | beam.Filter(keep_fn)
-  else:
-    filtered = pcoll
+def is_expr(v):
+  return isinstance(v, str) or (isinstance(v, dict) and 'expression' in v)
 
-  projected = filtered | beam.Select(
-      **{
-          name: _as_callable(original_fields, expr, transform_name, language)
-          for (name, expr) in fields.items()
-      })
-
-  if explode:
-    result = projected | _Explode(explode, cross_product=cross_product)
-  else:
-    result = projected
-
-  return result.as_result(
-      # TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple.
-      beam.Map(
-          lambda x: beam.Row(
-              element=x[0], msg=str(x[1][1]), stack=str(x[1][2]))))
 
+def normalize_fields(pcoll, fields, drop=(), append=False, language='generic'):
+  try:
+    input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+  except ValueError as exn:
+    if drop:
+      raise ValueError("Can only drop fields on a schema'd input.") from exn
+    if append:
+      raise ValueError("Can only append fields on a schema'd input.") from exn
+    elif any(is_expr(x) for x in fields.values()):
+      raise ValueError("Can only use expressions on a schema'd input.") from exn
+    input_schema = {}
 
-@beam.ptransform.ptransform_fn
-def MapToFields(
-    pcoll,
-    yaml_create_transform,
-    *,
-    fields,
-    keep=None,
-    explode=(),
-    cross_product=None,
-    append=False,
-    drop=(),
-    language=None,
-    error_handling=None,
-    transform_name="MapToFields",
-    **language_keywords):
-  if isinstance(explode, str):
-    explode = [explode]
-  if cross_product is None:
-    if len(explode) > 1:
-      # TODO(robertwb): Consider if true is an OK default.
-      raise ValueError(
-          'cross_product must be specified true or false '
-          'when exploding multiple fields')
-    else:
-      # Doesn't matter.
-      cross_product = True
-
-  input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+  if isinstance(drop, str):
+    drop = [drop]
   if drop and not append:
     raise ValueError("Can only drop fields if append is true.")
   for name in drop:
     if name not in input_schema:
       raise ValueError(f'Dropping unknown field "{name}"')
-  for name in explode:
-    if not (name in fields or (append and name in input_schema)):
-      raise ValueError(f'Exploding unknown field "{name}"')
   if append:
     for name in fields:
       if name in input_schema and name not in drop:
         raise ValueError(f'Redefinition of field "{name}"')
 
+  if language == 'generic':
+    for expr in fields.values():
+      if not isinstance(expr, str):
+        raise ValueError("Missing language specification.")
+    missing = set(fields.values()) - set(input_schema.keys())
+    if missing:
+      raise ValueError(
+          f"Missing language specification or unkown input fields: {missing}")
+
   if append:
-    fields = {
+    return input_schema, {
         **{name: name
            for name in input_schema.keys() if name not in drop},
         **fields
     }
+  else:
+    return input_schema, fields
 
-  if language is None:
-    for name, expr in fields.items():
-      if not isinstance(expr, str) or expr not in input_schema:
-        # TODO(robertw): Could consider defaulting to SQL, or another
-        # lowest-common-denominator expression language.
-        raise ValueError("Missing language specification.")
 
-    # We should support this for all languages.
-    language = "python"
-
-  if language in ("sql", "calcite"):
-    if error_handling:
-      raise ValueError('Error handling unsupported for sql.')
-    selects = [f'{expr} AS {name}' for (name, expr) in fields.items()]
-    query = "SELECT " + ", ".join(selects) + " FROM PCOLLECTION"
-    if keep:
-      query += " WHERE " + keep
-
-    result = pcoll | yaml_create_transform({
-        'type': 'Sql',
-        'config': {
-            'query': query, **language_keywords
-        },
-    }, [pcoll])
-    if explode:
-      # TODO(yaml): Implement via unnest.
-      result = result | _Explode(explode, cross_product)
-
-    return result
-
-  elif language == 'python' or language == 'javascript':
-    return pcoll | yaml_create_transform({
-        'type': 'PyTransform',
-        'config': {
-            'constructor': __name__ + '._PythonProjectionTransform',
-            'kwargs': {
-                'fields': fields,
-                'transform_name': transform_name,
-                'language': language,
-                'keep': keep,
-                'explode': explode,
-                'cross_product': cross_product,
-                'error_handling': error_handling,
-            },
-            **language_keywords
-        },
-    }, [pcoll])
+@beam.ptransform.ptransform_fn
+@maybe_with_exception_handling_transform_fn
+def _PyJsMapToFields(pcoll, language='generic', **mapping_args):
+  input_schema, fields = normalize_fields(
+      pcoll, language=language, **mapping_args)
+  original_fields = list(input_schema.keys())
 
-  else:
-    # TODO(yaml): Support javascript expressions and UDFs.
-    # TODO(yaml): Support java by fully qualified name.
-    # TODO(yaml): Maybe support java lambdas?
-    raise ValueError(
-        f'Unknown language: {language}. '
-        'Supported languages are "sql" (alias calcite) and "python."')
+  return pcoll | beam.Select(
+      **{
+          name: _as_callable(original_fields, expr, name, language)
+          for (name, expr) in fields.items()
+      })
+
+
+class SqlMappingProvider(yaml_provider.Provider):
+  def __init__(self, sql_provider=None):
+    if sql_provider is None:
+      print('yaml_provider', yaml_provider)
+      print('yaml_provider.beam_jar', yaml_provider.beam_jar)
+      sql_provider = yaml_provider.beam_jar(
+          urns={'Sql': 'beam:external:java:sql:v1'},
+          gradle_target='sdks:java:extensions:sql:expansion-service:shadowJar')

Review Comment:
   Should this have the version as well similar to what is defined in the `standard_providers.yaml`? Also, can't the `standard_providers.yaml` file be removed since there is now an explicit provider?



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -190,189 +253,169 @@ def with_exception_handling(self, **kwargs):
     return self
 
 
-# TODO(yaml): Should Filter and Explode be distinct operations from Project?
-# We'll want these per-language.
 @beam.ptransform.ptransform_fn
-def _PythonProjectionTransform(
-    pcoll,
-    *,
-    fields,
-    transform_name,
-    language,
-    keep=None,
-    explode=(),
-    cross_product=True,
-    error_handling=None):
-  original_fields = [
-      name for (name, _) in named_fields_from_element_type(pcoll.element_type)
-  ]
+@maybe_with_exception_handling_transform_fn
+def _PyJsFilter(
+    pcoll, keep: Union[str, Dict[str, str]], language: Optional[str] = None):
 
-  if error_handling is None:
-    error_handling_args = None
+  input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+  if isinstance(keep, str) and keep in input_schema:
+    keep_fn = lambda row: getattr(row, keep)
   else:
-    error_handling_args = {
-        'dead_letter_tag' if k == 'output' else k: v
-        for (k, v) in error_handling.items()
-    }
+    keep_fn = _as_callable(list(input_schema.keys()), keep, "keep", language)
+  return pcoll | beam.Filter(keep_fn)

Review Comment:
   Should these functions not map the input back to `Row`? They used to run through `beam.Select` when `MapToFields` was 1 function, but with it separated out, why not carry the `Select` transform over too or at least utilize the Map transform you included in the test file?



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -190,189 +253,169 @@ def with_exception_handling(self, **kwargs):
     return self
 
 
-# TODO(yaml): Should Filter and Explode be distinct operations from Project?
-# We'll want these per-language.
 @beam.ptransform.ptransform_fn
-def _PythonProjectionTransform(
-    pcoll,
-    *,
-    fields,
-    transform_name,
-    language,
-    keep=None,
-    explode=(),
-    cross_product=True,
-    error_handling=None):
-  original_fields = [
-      name for (name, _) in named_fields_from_element_type(pcoll.element_type)
-  ]
+@maybe_with_exception_handling_transform_fn
+def _PyJsFilter(
+    pcoll, keep: Union[str, Dict[str, str]], language: Optional[str] = None):
 
-  if error_handling is None:
-    error_handling_args = None
+  input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+  if isinstance(keep, str) and keep in input_schema:
+    keep_fn = lambda row: getattr(row, keep)
   else:
-    error_handling_args = {
-        'dead_letter_tag' if k == 'output' else k: v
-        for (k, v) in error_handling.items()
-    }
+    keep_fn = _as_callable(list(input_schema.keys()), keep, "keep", language)
+  return pcoll | beam.Filter(keep_fn)
 
-  pcoll = beam.core._MaybePValueWithErrors(pcoll, error_handling_args)
 
-  if keep:
-    if isinstance(keep, str) and keep in original_fields:
-      keep_fn = lambda row: getattr(row, keep)
-    else:
-      keep_fn = _as_callable(original_fields, keep, transform_name, language)
-    filtered = pcoll | beam.Filter(keep_fn)
-  else:
-    filtered = pcoll
+def is_expr(v):
+  return isinstance(v, str) or (isinstance(v, dict) and 'expression' in v)
 
-  projected = filtered | beam.Select(
-      **{
-          name: _as_callable(original_fields, expr, transform_name, language)
-          for (name, expr) in fields.items()
-      })
-
-  if explode:
-    result = projected | _Explode(explode, cross_product=cross_product)
-  else:
-    result = projected
-
-  return result.as_result(
-      # TODO(https://github.com/apache/beam/issues/24755): Switch to MapTuple.
-      beam.Map(
-          lambda x: beam.Row(
-              element=x[0], msg=str(x[1][1]), stack=str(x[1][2]))))
 
+def normalize_fields(pcoll, fields, drop=(), append=False, language='generic'):
+  try:
+    input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+  except ValueError as exn:
+    if drop:
+      raise ValueError("Can only drop fields on a schema'd input.") from exn
+    if append:
+      raise ValueError("Can only append fields on a schema'd input.") from exn
+    elif any(is_expr(x) for x in fields.values()):
+      raise ValueError("Can only use expressions on a schema'd input.") from exn
+    input_schema = {}
 
-@beam.ptransform.ptransform_fn
-def MapToFields(
-    pcoll,
-    yaml_create_transform,
-    *,
-    fields,
-    keep=None,
-    explode=(),
-    cross_product=None,
-    append=False,
-    drop=(),
-    language=None,
-    error_handling=None,
-    transform_name="MapToFields",
-    **language_keywords):
-  if isinstance(explode, str):
-    explode = [explode]
-  if cross_product is None:
-    if len(explode) > 1:
-      # TODO(robertwb): Consider if true is an OK default.
-      raise ValueError(
-          'cross_product must be specified true or false '
-          'when exploding multiple fields')
-    else:
-      # Doesn't matter.
-      cross_product = True
-
-  input_schema = dict(named_fields_from_element_type(pcoll.element_type))
+  if isinstance(drop, str):
+    drop = [drop]
   if drop and not append:
     raise ValueError("Can only drop fields if append is true.")
   for name in drop:
     if name not in input_schema:
       raise ValueError(f'Dropping unknown field "{name}"')
-  for name in explode:
-    if not (name in fields or (append and name in input_schema)):
-      raise ValueError(f'Exploding unknown field "{name}"')
   if append:
     for name in fields:
       if name in input_schema and name not in drop:
         raise ValueError(f'Redefinition of field "{name}"')
 
+  if language == 'generic':
+    for expr in fields.values():
+      if not isinstance(expr, str):
+        raise ValueError("Missing language specification.")
+    missing = set(fields.values()) - set(input_schema.keys())
+    if missing:
+      raise ValueError(
+          f"Missing language specification or unkown input fields: {missing}")

Review Comment:
   "Unknown" is misspelled though for what it's worth...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org