You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/11/05 17:44:06 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #15410: [BEAM-10277] Encoding position initial implementation

TheNeuralBit commented on a change in pull request #15410:
URL: https://github.com/apache/beam/pull/15410#discussion_r743844253



##########
File path: model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -409,6 +409,40 @@ examples:
   "\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f": {f_map: {"foo": 9001, "bar": 9223372036854775807}}
   "\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00": {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
+---
+# Binary data generated with the python SDK:
+# 
+# schema1 = schema_pb2.Schema(
+#    id="testcase",
+#    fields=[
+#      schema_pb2.Field(
+#        name="str",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+#        encoding_position=2
+#      ),
+#      schema_pb2.Field(
+#        name="f_bool",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+#        encoding_position=0),
+#      schema_pb2.Field(
+#        name="i32",
+#        type=schema_pb2.FieldType(
+#          atomic_type=schema_pb2.INT32, nullable=True),
+#        encoding_position=1)

Review comment:
       What I meant here is that you will need to add `encoding_positions_set=True` to the `Schema`, since when it's not True, the encoding positions should be ignored.

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -181,9 +192,11 @@ def encode_to_stream(self, value, out, nested):
         for i, is_null in enumerate(nulls):
           words[i // 8] |= is_null << (i % 8)
 
+    attrs = self._enc_pos_idx(attrs)
+    fields = self._enc_pos_idx(self.schema.fields)

Review comment:
       We need to avoid using logic like this to determine the field order in the encode_to_stream method. This implementation is essentially determining the field order dynamically for every encoded element. It should be possible to use something like `np.argsort` to determine the order one time (likely in the `__init__` method), and then reference that order in the encode_to_stream method.
   
   Please note we'll also need to reference that order in the decode_from_stream method. RowCoder should respect the encoding positions when encoding _and_ decoding.

##########
File path: sdks/python/apache_beam/coders/row_coder_test.py
##########
@@ -267,6 +324,54 @@ def test_row_coder_fail_early_bad_schema(self):
     self.assertRaisesRegex(
         ValueError, "type_with_no_typeinfo", lambda: RowCoder(schema_proto))
 
+  def test_yaml(self):
+    schema1 = schema_pb2.Schema(
+        id="30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9",
+        fields=[
+            schema_pb2.Field(
+                name="str",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+                encoding_position=2),
+            schema_pb2.Field(
+                name="f_bool",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+                encoding_position=3),
+            schema_pb2.Field(
+                name="i32",
+                type=schema_pb2.FieldType(
+                    atomic_type=schema_pb2.INT32, nullable=True),
+                encoding_position=1)
+        ],
+        encoding_positions_set=True)
+
+    coder = RowCoder(schema1)
+    c = coder.schema.SerializeToString()
+    print("payload = %s" % c)
+    test = typing.NamedTuple(
+        "test", [
+            ("i32", np.int32),
+            ("str", str),
+            ("f_bool", bool),
+        ])
+    example = coder.encode(test(21, "str2", False))
+    print("example = %s" % example)

Review comment:
       It looks like this was added just to generate the payload for `standard_coders.yaml`, please remove

##########
File path: sdks/python/apache_beam/coders/row_coder.py
##########
@@ -164,7 +164,13 @@ class RowCoderImpl(StreamCoderImpl):
   def __init__(self, schema, components):
     self.schema = schema
     self.constructor = named_tuple_from_schema(schema)
-    self.components = list(c.get_impl() for c in components)
+    self.encoding_positions = list(range(len(self.schema.fields)))
+    if any(field.encoding_position for field in self.schema.fields):
+      self.encoding_positions = list(
+          field.encoding_position for field in self.schema.fields)

Review comment:
       Please note the last part from the comment above:
   > If this Field is part of a Schema where encoding_positions_set is True then encoding_position must be defined, *otherwise this field is ignored.*
   
   What you have implemented now still respects the `encoding_position` fields even if  `encoding_positions_set` is False.

##########
File path: model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -409,6 +409,40 @@ examples:
   "\x01\x00\x00\x00\x00\x02\x03foo\x01\xa9F\x03bar\x01\xff\xff\xff\xff\xff\xff\xff\xff\x7f": {f_map: {"foo": 9001, "bar": 9223372036854775807}}
   "\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00": {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
+---
+# Binary data generated with the python SDK:
+# 
+# schema1 = schema_pb2.Schema(
+#    id="testcase",
+#    fields=[
+#      schema_pb2.Field(
+#        name="str",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+#        encoding_position=2
+#      ),
+#      schema_pb2.Field(
+#        name="f_bool",
+#        type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+#        encoding_position=3),
+#      schema_pb2.Field(
+#        name="i32",
+#        type=schema_pb2.FieldType(
+#          atomic_type=schema_pb2.INT32, nullable=True),
+#        encoding_position=1)
+#   ])
+#
+# coder = RowCoder(schema1)
+# c = coder.schema.SerializeToString()
+# print("payload = %s" % c)
+# test = typing.NamedTuple("test", [ ("f_bool", bool), ("i32", np.int32), ("str", str) ])
+# example = coder.encode(test(False,21,"str2"))
+# print("example = %s" % example)
+coder:
+  urn: "beam:coder:row:v1"
+  payload: "\n\x0b\n\x03str\x1a\x02\x10\x07(\x02\n\x0c\n\x06f_bool\x1a\x02\x10\x08\n\r\n\x03i32\x1a\x04\x08\x01\x10\x03(\x01\x12$30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9"
+examples:
+  "\x03\x00\x04str2\x00\x15": {f_bool: False, i32: 21, str: "str2"}

Review comment:
       Since this example is passing, I think it indicates your implementation is flawed. Looking at the example, the fields appear to be encoded in the natural order, _not_ the order specified by encoding positions as they should:
   
   ```
   str2\x00\x15
   ```
   
   - str2 = value for str
   - \x00 (false) = value for f_bool
   - \x15 (21) = value for i32




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org