You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/09/29 01:34:08 UTC

[GitHub] [beam] chamikaramj commented on a diff in pull request #22802: Updates ExpansionService to support dynamically discovering and expanding SchemaTransforms

chamikaramj commented on code in PR #22802:
URL: https://github.com/apache/beam/pull/22802#discussion_r982698371


##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto:
##########
@@ -106,4 +109,13 @@ message BuilderMethod {
   bytes payload = 3;
 }
 
+message SchemaTransformPayload {
+  // Identifier of the SchemaTransform
+  string identifier = 1;

Review Comment:
   I think so (though it's not specifically mentioned in the Schema-Aware Transform design doc)



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -104,6 +105,44 @@ def payload(self):
     """
     return self.build().SerializeToString()
 
+  def get_schema_proto_and_payload(self, *args, **kwargs):
+    named_fields = []
+    fields_to_values = OrderedDict()
+    next_field_id = 0
+    for value in args:
+      if value is None:
+        raise ValueError(
+            'Received value None. None values are currently not supported')
+      named_fields.append(
+          ((JavaClassLookupPayloadBuilder.IGNORED_ARG_FORMAT % next_field_id),
+           convert_to_typing_type(instance_to_type(value))))
+      fields_to_values[(
+          JavaClassLookupPayloadBuilder.IGNORED_ARG_FORMAT %
+          next_field_id)] = value
+      next_field_id += 1
+    for key, value in kwargs.items():
+      if not key:
+        raise ValueError('Parameter name cannot be empty')
+      if value is None:
+        raise ValueError(
+            'Received value None for key %s. None values are currently not '
+            'supported' % key)
+      named_fields.append(
+          (key, convert_to_typing_type(instance_to_type(value))))
+      fields_to_values[key] = value
+
+    schema_proto = named_fields_to_schema(named_fields)
+    row = named_tuple_from_schema(schema_proto)(**fields_to_values)
+
+    logging.error('********* xyz123 kwargs: %r', kwargs)

Review Comment:
   Removed.



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -104,6 +105,44 @@ def payload(self):
     """
     return self.build().SerializeToString()
 
+  def get_schema_proto_and_payload(self, *args, **kwargs):
+    named_fields = []
+    fields_to_values = OrderedDict()
+    next_field_id = 0
+    for value in args:
+      if value is None:
+        raise ValueError(
+            'Received value None. None values are currently not supported')
+      named_fields.append(
+          ((JavaClassLookupPayloadBuilder.IGNORED_ARG_FORMAT % next_field_id),

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org