You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "chamikaramj (via GitHub)" <gi...@apache.org> on 2023/04/05 19:48:09 UTC

[GitHub] [beam] chamikaramj commented on a diff in pull request #26100: Use external config schema to construct Python SchemaTransform payload

chamikaramj commented on code in PR #26100:
URL: https://github.com/apache/beam/pull/26100#discussion_r1157573798


##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -180,14 +180,52 @@ def _get_named_tuple_instance(self):
 
 
 class SchemaTransformPayloadBuilder(PayloadBuilder):
-  def __init__(self, identifier, **kwargs):
-    self._identifier = identifier
+  def __init__(self, schematransform_config, strict_schema=False, **kwargs):

Review Comment:
   I think it should be possible to use SchemaTransforms without the full config or the schema (i.e. just using the schema transform ID and a set of kwargs). Can you adjust the change so that the additional validation is optional ?



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -180,14 +180,52 @@ def _get_named_tuple_instance(self):
 
 
 class SchemaTransformPayloadBuilder(PayloadBuilder):
-  def __init__(self, identifier, **kwargs):
-    self._identifier = identifier
+  def __init__(self, schematransform_config, strict_schema=False, **kwargs):
+    self._schematransform_config = schematransform_config
+    self._strict_schema = strict_schema
     self._kwargs = kwargs
 
+  def _get_schema_proto_and_payload(self, **kwargs):

Review Comment:
   Can we move the additional checks before this call and continue to use the existing external._get_schema_proto_and_payload() method ?



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -180,14 +180,52 @@ def _get_named_tuple_instance(self):
 
 
 class SchemaTransformPayloadBuilder(PayloadBuilder):
-  def __init__(self, identifier, **kwargs):
-    self._identifier = identifier
+  def __init__(self, schematransform_config, strict_schema=False, **kwargs):
+    self._schematransform_config = schematransform_config
+    self._strict_schema = strict_schema
     self._kwargs = kwargs
 
+  def _get_schema_proto_and_payload(self, **kwargs):
+    named_fields = []
+    fields_to_values = OrderedDict()
+    external_config_schema_fields = \
+      self._schematransform_config.configuration_schema._fields
+    kwargs_fields = tuple(self._kwargs.keys())
+
+    if self._strict_schema and external_config_schema_fields != kwargs_fields:
+      raise ValueError(
+          "Parameters in kwargs: %s do not match the external "
+          "SchemaTransform's configuration fields: %s" %
+          (kwargs_fields, external_config_schema_fields))
+
+    # The discover API allows us to obtain an ordered configuration schema

Review Comment:
   I think instead of the "strict_schema" option, we should do a "rearrange_based_on_discovery" option. If the option is not provided, we use kwargs as is without the overhead of the additional RPC (this will work for anything other than TypedSchemaTransformProvider). For TypedSchemaTransformProvider, we would set the "rearrange_based_on_discovery" option to true and would rearrange kwargs based on a discovery call before the  "_get_schema_proto_and_payload" invocation. WDYT ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org