You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/20 00:08:55 UTC

[GitHub] [beam] robertwb commented on a diff in pull request #22802: Updates ExpansionService to support dynamically discovering and expanding SchemaTransforms

robertwb commented on code in PR #22802:
URL: https://github.com/apache/beam/pull/22802#discussion_r950617150


##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -104,6 +105,44 @@ def payload(self):
     """
     return self.build().SerializeToString()
 
+  def get_schema_proto_and_payload(self, *args, **kwargs):
+    named_fields = []
+    fields_to_values = OrderedDict()
+    next_field_id = 0
+    for value in args:
+      if value is None:
+        raise ValueError(
+            'Received value None. None values are currently not supported')
+      named_fields.append(
+          ((JavaClassLookupPayloadBuilder.IGNORED_ARG_FORMAT % next_field_id),

Review Comment:
   This code isn't specific to JavaClassLookupPayload anymore, is it?



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto:
##########
@@ -106,4 +109,13 @@ message BuilderMethod {
   bytes payload = 3;
 }
 
+message SchemaTransformPayload {
+  // Identifier of the SchemaTransform
+  string identifier = 1;
 
+  // The config of the SchemaTransform.
+  // Should be decodable via beam:coder:row:v1.
+  // The schema of the Row should be compatible with the schema of the
+  // SchemaTransform denoted by the identifier.
+  bytes config_row = 2;

Review Comment:
   Should we also provide the schema that this is encoded against, in case the schema evolves (in a compatible way of course) between when the discovery happens and when this request is made. 



##########
sdks/python/apache_beam/transforms/external.py:
##########
@@ -104,6 +105,44 @@ def payload(self):
     """
     return self.build().SerializeToString()
 
+  def get_schema_proto_and_payload(self, *args, **kwargs):
+    named_fields = []
+    fields_to_values = OrderedDict()
+    next_field_id = 0
+    for value in args:
+      if value is None:
+        raise ValueError(
+            'Received value None. None values are currently not supported')
+      named_fields.append(
+          ((JavaClassLookupPayloadBuilder.IGNORED_ARG_FORMAT % next_field_id),
+           convert_to_typing_type(instance_to_type(value))))
+      fields_to_values[(
+          JavaClassLookupPayloadBuilder.IGNORED_ARG_FORMAT %
+          next_field_id)] = value
+      next_field_id += 1
+    for key, value in kwargs.items():
+      if not key:
+        raise ValueError('Parameter name cannot be empty')
+      if value is None:
+        raise ValueError(
+            'Received value None for key %s. None values are currently not '
+            'supported' % key)
+      named_fields.append(
+          (key, convert_to_typing_type(instance_to_type(value))))
+      fields_to_values[key] = value
+
+    schema_proto = named_fields_to_schema(named_fields)
+    row = named_tuple_from_schema(schema_proto)(**fields_to_values)
+
+    logging.error('********* xyz123 kwargs: %r', kwargs)

Review Comment:
   Extra logging.



##########
model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/external_transforms.proto:
##########
@@ -106,4 +109,13 @@ message BuilderMethod {
   bytes payload = 3;
 }
 
+message SchemaTransformPayload {
+  // Identifier of the SchemaTransform
+  string identifier = 1;

Review Comment:
   Should this typically be a urn as well? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org