You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/10/17 00:44:07 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #15410: [BEAM-10277] Encoding position initial implementation

TheNeuralBit commented on a change in pull request #15410:
URL: https://github.com/apache/beam/pull/15410#discussion_r730135297



##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -169,6 +170,7 @@ def __init__(self, schema, components):
         field.type.nullable for field in self.schema.fields)
 
   def encode_to_stream(self, value, out, nested):
+    self.schema = SCHEMA_REGISTRY.get_schema_by_id(self.schema.id)

Review comment:
       Is this necessary?

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -190,7 +193,10 @@ def encode_to_stream(self, value, out, nested):
               "Attempted to encode null for non-nullable field \"{}\".".format(
                   field.name))
         continue
-      c.encode_to_stream(attr, out, True)
+      attrs_enc_pos.append((c, field.encoding_position, attr))

Review comment:
       I think we should only need to read the encoding positions once, when constructing the coder. We don't need to inspect the encoding positions and sort for every element.
   
   Instead, this should read the encoding positions in `__init__`, and transform them into a list of indexes that will encode in the proper order (`numpy.argsort` should do this). For the case where encoding positions aren't specified the list could just be `list(range(len(self.schema.fields)))`.
   
   It may also make sense to validate the encoding positions when constructing the coder (they should all be unique values in the range `[0, len(self.schema.fields))`), to be safe.

##########
File path: sdks/python/apache_beam/typehints/schemas.py
##########
@@ -138,8 +138,9 @@ def named_fields_to_schema(names_and_types):
   # type: (Sequence[Tuple[str, type]]) -> schema_pb2.Schema
   return schema_pb2.Schema(
       fields=[
-          schema_pb2.Field(name=name, type=typing_to_runner_api(type))
-          for (name, type) in names_and_types
+          schema_pb2.Field(
+              name=name, type=typing_to_runner_api(type), encoding_position=idx)
+          for idx, (name, type) in enumerate(names_and_types)

Review comment:
       I think we should actually leave the encoding positions undefined when we create schemas in this file. The encoding position is something that the runner will manipulate to ensure update compatibility, the SDK doesn't need to worry about it.

##########
File path: sdks/go/test/regression/coders/fromyaml/fromyaml.go
##########
@@ -83,11 +88,13 @@ func (s *Spec) testStandardCoder() (err error) {
 		log.Printf("skipping unimplemented coder urn: %v", s.Coder.Urn)
 		return nil
 	}
-	// TODO(BEAM-9615): Support Logical types, and produce a better error message.
-	if strings.Contains(s.Coder.Payload, "logical") {
-		log.Printf("skipping coder with logical type. Unsupported in the Go SDK for now. Payload: %v", s.Coder.Payload)
-		return nil
+	for _, c := range filteredCases {
+		if strings.Contains(s.Coder.Payload, c.filter) {
+			log.Printf("skipping coder case. Unsupported in the Go SDK for now: %v Payload: %v", c.reason, s.Coder.Payload)
+			return nil
+		}

Review comment:
       I have very limited knowledge of Go, but this LGTM and works. Does it look ok to you @lostluck?

##########
File path: model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -409,6 +409,26 @@ examples:
   "\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f": {f_map: {"foo": 9001, "bar": 9223372036854775807}}
   "\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00": {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
+---
+# Binary data generated with the python SDK:
+# 
+# fields = [("foo", str), ("bar", str)]
+# schema = typing.NamedTuple( "test", fields)
+# coder = RowCoder.from_type_hint(schema, None)
+# examples = (
+#             coder.encode(schema(foo="str1",bar="str2")),
+#             coder.encode(schema(bar="str2",foo="str1"))
+#            )
+# print("schema = %s" % coder.schema.SerializeToString())

Review comment:
       You may need to use a different approach to generate test cases here. We'll want to have some test cases where the encoding position is different from the field order. In order to do that I think you'll need to manually construct a Schema proto object (as is done in some places in `schemas_test.py`), and make sure it has encoding positions that are out of order. Then you can serialize it with `SerializeToString()`. Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org