You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2021/12/16 19:03:24 UTC

[beam] branch master updated: [BEAM-10277] Initial implementation for encoding position in Python RowCoder (#15410)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 673507a  [BEAM-10277] Initial implementation for encoding position in Python RowCoder (#15410)
673507a is described below

commit 673507a3c541715dc7c62ba073e4195d01be0899
Author: AlikRodriguez <74...@users.noreply.github.com>
AuthorDate: Thu Dec 16 13:01:08 2021 -0600

    [BEAM-10277] Initial implementation for encoding position in Python RowCoder (#15410)
    
    * add test to encoding position
    
    * respect encoding position
    
    * enforcing encoding postion on trivial test
    
    * add coder_yaml test
    
    * fix types for go fromyaml_test
    
    * skip unimplemented contents
    
    * encoding position from init
    
    * revert encoding position in schemas
    
    * added payload id on testcase inyaml file
    
    * change go test exeption in coders yaml
    
    * change helper methods to coders test
    
    * add condition to check encoding_position_set
    
    * fix pylint
    
    * encoding position in code decode with argsort
    
    * check encoding position start at 0 and has no duplicates
    
    * fix test, precompute argsort encoding position
---
 .../beam/model/fnexecution/v1/standard_coders.yaml | 33 ++++++++++
 .../go/test/regression/coders/fromyaml/fromyaml.go | 15 +++--
 sdks/python/apache_beam/coders/row_coder.py        | 39 +++++++++---
 sdks/python/apache_beam/coders/row_coder_test.py   | 73 ++++++++++++++++++++++
 4 files changed, 147 insertions(+), 13 deletions(-)

diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
index ef37772..df119ba 100644
--- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
+++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
@@ -410,6 +410,39 @@ examples:
   "\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00": {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
 
 ---
+# Binary data generated with the python SDK:
+# schema1 = schema_pb2.Schema(
+# id="30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9",
+# fields=[
+#     schema_pb2.Field(
+#         name="str",
+#         type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+#         encoding_position=1),
+#     schema_pb2.Field(
+#         name="f_bool",
+#         type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+#         encoding_position=2),
+#     schema_pb2.Field(
+#         name="i32",
+#         type=schema_pb2.FieldType(
+#             atomic_type=schema_pb2.INT32, nullable=True),
+#         encoding_position=0)
+#   ],
+#   encoding_positions_set=True)
+#
+# coder = RowCoder(schema1)
+# c = coder.schema.SerializeToString()
+# print("payload = %s" % c)
+# test = typing.NamedTuple("test", [ ("f_bool", bool), ("i32", np.int32), ("str", str) ])
+# example = coder.encode(test(False,21,"str2"))
+# print("example = %s" % example)
+coder:
+  urn: "beam:coder:row:v1"
+  payload: "\n\x0b\n\x03str\x1a\x02\x10\x07(\x01\n\x0e\n\x06f_bool\x1a\x02\x10\x08(\x02\n\x0b\n\x03i32\x1a\x04\x08\x01\x10\x03\x12$30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9 \x01"
+examples:
+  "\x03\x00\x15\x04str2\x00": {f_bool: False, i32: 21, str: "str2"}
+
+---
 
 coder:
   urn: "beam:coder:row:v1"
diff --git a/sdks/go/test/regression/coders/fromyaml/fromyaml.go b/sdks/go/test/regression/coders/fromyaml/fromyaml.go
index 337c48e..9147834 100644
--- a/sdks/go/test/regression/coders/fromyaml/fromyaml.go
+++ b/sdks/go/test/regression/coders/fromyaml/fromyaml.go
@@ -48,6 +48,11 @@ var unimplementedCoders = map[string]bool{
 	"beam:coder:custom_window:v1":        true,
 }
 
+var filteredCases = []struct{ filter, reason string }{
+	{"logical", "BEAM-9615: Support logical types"},
+	{"30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9", "BEAM-13043: Support encoding position."},
+}
+
 // Coder is a representation a serialized beam coder.
 type Coder struct {
 	Urn              string  `yaml:"urn,omitempty"`
@@ -83,11 +88,13 @@ func (s *Spec) testStandardCoder() (err error) {
 		log.Printf("skipping unimplemented coder urn: %v", s.Coder.Urn)
 		return nil
 	}
-	// TODO(BEAM-9615): Support Logical types, and produce a better error message.
-	if strings.Contains(s.Coder.Payload, "logical") {
-		log.Printf("skipping coder with logical type. Unsupported in the Go SDK for now. Payload: %v", s.Coder.Payload)
-		return nil
+	for _, c := range filteredCases {
+		if strings.Contains(s.Coder.Payload, c.filter) {
+			log.Printf("skipping coder case. Unsupported in the Go SDK for now: %v Payload: %v", c.reason, s.Coder.Payload)
+			return nil
+		}
 	}
+
 	// Construct the coder proto equivalents.
 
 	// Only nested tests need to be run, since nestedness is a pre-portability
diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py
index 20fa867..09df61c 100644
--- a/sdks/python/apache_beam/coders/row_coder.py
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -20,6 +20,8 @@
 import itertools
 from array import array
 
+import numpy as np
+
 from apache_beam.coders import typecoders
 from apache_beam.coders.coder_impl import StreamCoderImpl
 from apache_beam.coders.coders import BooleanCoder
@@ -164,7 +166,21 @@ class RowCoderImpl(StreamCoderImpl):
   def __init__(self, schema, components):
     self.schema = schema
     self.constructor = named_tuple_from_schema(schema)
-    self.components = list(c.get_impl() for c in components)
+    self.encoding_positions = list(range(len(self.schema.fields)))
+    if self.schema.encoding_positions_set:
+      # should never be duplicate encoding positions.
+      enc_posx = list(
+          set(field.encoding_position for field in self.schema.fields))
+      if len(enc_posx) != len(self.schema.fields):
+        raise ValueError(
+            f'''Schema with id {schema.id} has encoding_positions_set=True,
+            but not all fields have encoding_position set''')
+      self.encoding_positions = list(
+          field.encoding_position for field in self.schema.fields)
+    self.encoding_positions_argsort = np.argsort(self.encoding_positions)
+    self.components = list(
+        components[self.encoding_positions.index(i)].get_impl()
+        for i in self.encoding_positions)
     self.has_nullable_fields = any(
         field.type.nullable for field in self.schema.fields)
 
@@ -183,14 +199,14 @@ class RowCoderImpl(StreamCoderImpl):
 
     self.NULL_MARKER_CODER.encode_to_stream(words.tobytes(), out, True)
 
-    for c, field, attr in zip(self.components, self.schema.fields, attrs):
-      if attr is None:
-        if not field.type.nullable:
+    for i in self.encoding_positions_argsort:
+      if attrs[i] is None:
+        if not self.schema.fields[i].type.nullable:
           raise ValueError(
               "Attempted to encode null for non-nullable field \"{}\".".format(
-                  field.name))
+                  self.schema.fields[i].name))
         continue
-      c.encode_to_stream(attr, out, True)
+      self.components[i].encode_to_stream(attrs[i], out, True)
 
   def decode_from_stream(self, in_stream, nested):
     nvals = self.SIZE_CODER.decode_from_stream(in_stream, True)
@@ -213,10 +229,15 @@ class RowCoderImpl(StreamCoderImpl):
     # Note that if this coder's schema has *fewer* attributes than the encoded
     # value, we just need to ignore the additional values, which will occur
     # here because we only decode as many values as we have coders for.
+
+    sorted_components = [
+        None if is_null else self.components[c].decode_from_stream(
+            in_stream, True) for c,
+        is_null in zip(self.encoding_positions_argsort, nulls)
+    ]
+
     return self.constructor(
-        *(
-            None if is_null else c.decode_from_stream(in_stream, True) for c,
-            is_null in zip(self.components, nulls)))
+        *[sorted_components[i] for i in self.encoding_positions])
 
   def _make_value_coder(self, nulls=itertools.repeat(False)):
     components = [
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py b/sdks/python/apache_beam/coders/row_coder_test.py
index 2fdf7f8..74a33b4 100644
--- a/sdks/python/apache_beam/coders/row_coder_test.py
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -31,6 +31,8 @@ from apache_beam.portability.api import schema_pb2
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
+from apache_beam.typehints.schemas import SCHEMA_REGISTRY
+from apache_beam.typehints.schemas import named_tuple_to_schema
 from apache_beam.typehints.schemas import typing_to_runner_api
 from apache_beam.utils.timestamp import Timestamp
 
@@ -282,6 +284,61 @@ class RowCoderTest(unittest.TestCase):
 
     self.assertEqual(value, coder.decode(coder.encode(value)))
 
+  def test_encoding_position_reorder_fields(self):
+    fields = [("field1", str), ("field2", int), ("field3", int)]
+
+    expected = typing.NamedTuple('expected', fields)
+    reorder = schema_pb2.Schema(
+        id="new_order",
+        fields=[
+            schema_pb2.Field(
+                name="field3",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+                encoding_position=2),
+            schema_pb2.Field(
+                name="field2",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
+                encoding_position=1),
+            schema_pb2.Field(
+                name="field1",
+                type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
+                encoding_position=0)
+        ])
+
+    old_coder = RowCoder.from_type_hint(expected, None)
+    new_coder = RowCoder(reorder)
+
+    encode_expected = old_coder.encode(expected("foo", 7, 12))
+    encode_reorder = new_coder.encode(expected(12, 7, "foo"))
+    self.assertEqual(encode_expected, encode_reorder)
+
+  def test_encoding_position_add_fields(self):
+    fields = [("field1", str), ("field2", str)]
+
+    Old = typing.NamedTuple("Old", fields[:-1])
+    New = typing.NamedTuple("New", fields)
+
+    old_coder = RowCoder.from_type_hint(Old, None)
+    new_coder = RowCoder.from_type_hint(New, None)
+
+    self.assertEqual(
+        New("bar", None), new_coder.decode(old_coder.encode(Old("bar"))))
+
+  def test_encoding_position_add_fields_and_reorder(self):
+    fields = [("field1", typing.Optional[str]), ("field2", str),
+              ("field3", typing.Optional[str])]
+
+    Old = typing.NamedTuple("Old", fields[:-1])
+    New = typing.NamedTuple("New", fields)
+
+    old_coder = RowCoder.from_type_hint(Old, None)
+    new_coder = RowCoder.from_type_hint(New, None)
+    set_encoding_position(New, [("field3", 2), ("field2", 1), ("field1", 0)])
+
+    self.assertEqual(
+        New("foo", "baz", None),
+        new_coder.decode(old_coder.encode(Old("foo", "baz"))))
+
   def test_row_coder_fail_early_bad_schema(self):
     schema_proto = schema_pb2.Schema(
         fields=[
@@ -294,6 +351,22 @@ class RowCoderTest(unittest.TestCase):
         ValueError, "type_with_no_typeinfo", lambda: RowCoder(schema_proto))
 
 
+def get_encoding_position(schema):
+  return [f.encoding_position for f in schema.fields]
+
+
+def set_encoding_position(type_, values):
+  beam_schema_id = "_beam_schema_id"
+  if hasattr(type_, beam_schema_id):
+    schema = SCHEMA_REGISTRY.get_schema_by_id(getattr(type_, beam_schema_id))
+  else:
+    schema = named_tuple_to_schema(type_)
+  val = dict(values)
+  for idx, field in enumerate(schema.fields):
+    schema.fields[idx].encoding_position = val[field.name]
+  SCHEMA_REGISTRY.add(type_, schema)
+
+
 if __name__ == "__main__":
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()