You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2021/12/16 19:03:24 UTC
[beam] branch master updated: [BEAM-10277] Initial implementation for encoding position in Python RowCoder (#15410)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 673507a [BEAM-10277] Initial implementation for encoding position in Python RowCoder (#15410)
673507a is described below
commit 673507a3c541715dc7c62ba073e4195d01be0899
Author: AlikRodriguez <74...@users.noreply.github.com>
AuthorDate: Thu Dec 16 13:01:08 2021 -0600
[BEAM-10277] Initial implementation for encoding position in Python RowCoder (#15410)
* add test to encoding position
* respect encoding position
* enforcing encoding postion on trivial test
* add coder_yaml test
* fix types for go fromyaml_test
* skip unimplemented contents
* encoding position from init
* revert encoding position in schemas
* added payload id on testcase inyaml file
* change go test exeption in coders yaml
* change helper methods to coders test
* add condition to check encoding_position_set
* fix pylint
* encoding position in code decode with argsort
* check encoding position start at 0 and has no duplicates
* fix test, precompute argsort encoding position
---
.../beam/model/fnexecution/v1/standard_coders.yaml | 33 ++++++++++
.../go/test/regression/coders/fromyaml/fromyaml.go | 15 +++--
sdks/python/apache_beam/coders/row_coder.py | 39 +++++++++---
sdks/python/apache_beam/coders/row_coder_test.py | 73 ++++++++++++++++++++++
4 files changed, 147 insertions(+), 13 deletions(-)
diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
index ef37772..df119ba 100644
--- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
+++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
@@ -410,6 +410,39 @@ examples:
"\x01\x00\x00\x00\x00\x04\neverything\x00\x02is\x00\x05null!\x00\r\xc2\xaf\\_(\xe3\x83\x84)_/\xc2\xaf\x00": {f_map: {"everything": null, "is": null, "null!": null, "¯\\_(ツ)_/¯": null}}
---
+# Binary data generated with the python SDK:
+# schema1 = schema_pb2.Schema(
+# id="30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9",
+# fields=[
+# schema_pb2.Field(
+# name="str",
+# type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+# encoding_position=1),
+# schema_pb2.Field(
+# name="f_bool",
+# type=schema_pb2.FieldType(atomic_type=schema_pb2.BOOLEAN),
+# encoding_position=2),
+# schema_pb2.Field(
+# name="i32",
+# type=schema_pb2.FieldType(
+# atomic_type=schema_pb2.INT32, nullable=True),
+# encoding_position=0)
+# ],
+# encoding_positions_set=True)
+#
+# coder = RowCoder(schema1)
+# c = coder.schema.SerializeToString()
+# print("payload = %s" % c)
+# test = typing.NamedTuple("test", [ ("f_bool", bool), ("i32", np.int32), ("str", str) ])
+# example = coder.encode(test(False,21,"str2"))
+# print("example = %s" % example)
+coder:
+ urn: "beam:coder:row:v1"
+ payload: "\n\x0b\n\x03str\x1a\x02\x10\x07(\x01\n\x0e\n\x06f_bool\x1a\x02\x10\x08(\x02\n\x0b\n\x03i32\x1a\x04\x08\x01\x10\x03\x12$30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9 \x01"
+examples:
+ "\x03\x00\x15\x04str2\x00": {f_bool: False, i32: 21, str: "str2"}
+
+---
coder:
urn: "beam:coder:row:v1"
diff --git a/sdks/go/test/regression/coders/fromyaml/fromyaml.go b/sdks/go/test/regression/coders/fromyaml/fromyaml.go
index 337c48e..9147834 100644
--- a/sdks/go/test/regression/coders/fromyaml/fromyaml.go
+++ b/sdks/go/test/regression/coders/fromyaml/fromyaml.go
@@ -48,6 +48,11 @@ var unimplementedCoders = map[string]bool{
"beam:coder:custom_window:v1": true,
}
+var filteredCases = []struct{ filter, reason string }{
+ {"logical", "BEAM-9615: Support logical types"},
+ {"30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9", "BEAM-13043: Support encoding position."},
+}
+
// Coder is a representation a serialized beam coder.
type Coder struct {
Urn string `yaml:"urn,omitempty"`
@@ -83,11 +88,13 @@ func (s *Spec) testStandardCoder() (err error) {
log.Printf("skipping unimplemented coder urn: %v", s.Coder.Urn)
return nil
}
- // TODO(BEAM-9615): Support Logical types, and produce a better error message.
- if strings.Contains(s.Coder.Payload, "logical") {
- log.Printf("skipping coder with logical type. Unsupported in the Go SDK for now. Payload: %v", s.Coder.Payload)
- return nil
+ for _, c := range filteredCases {
+ if strings.Contains(s.Coder.Payload, c.filter) {
+ log.Printf("skipping coder case. Unsupported in the Go SDK for now: %v Payload: %v", c.reason, s.Coder.Payload)
+ return nil
+ }
}
+
// Construct the coder proto equivalents.
// Only nested tests need to be run, since nestedness is a pre-portability
diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py
index 20fa867..09df61c 100644
--- a/sdks/python/apache_beam/coders/row_coder.py
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -20,6 +20,8 @@
import itertools
from array import array
+import numpy as np
+
from apache_beam.coders import typecoders
from apache_beam.coders.coder_impl import StreamCoderImpl
from apache_beam.coders.coders import BooleanCoder
@@ -164,7 +166,21 @@ class RowCoderImpl(StreamCoderImpl):
def __init__(self, schema, components):
self.schema = schema
self.constructor = named_tuple_from_schema(schema)
- self.components = list(c.get_impl() for c in components)
+ self.encoding_positions = list(range(len(self.schema.fields)))
+ if self.schema.encoding_positions_set:
+ # should never be duplicate encoding positions.
+ enc_posx = list(
+ set(field.encoding_position for field in self.schema.fields))
+ if len(enc_posx) != len(self.schema.fields):
+ raise ValueError(
+ f'''Schema with id {schema.id} has encoding_positions_set=True,
+ but not all fields have encoding_position set''')
+ self.encoding_positions = list(
+ field.encoding_position for field in self.schema.fields)
+ self.encoding_positions_argsort = np.argsort(self.encoding_positions)
+ self.components = list(
+ components[self.encoding_positions.index(i)].get_impl()
+ for i in self.encoding_positions)
self.has_nullable_fields = any(
field.type.nullable for field in self.schema.fields)
@@ -183,14 +199,14 @@ class RowCoderImpl(StreamCoderImpl):
self.NULL_MARKER_CODER.encode_to_stream(words.tobytes(), out, True)
- for c, field, attr in zip(self.components, self.schema.fields, attrs):
- if attr is None:
- if not field.type.nullable:
+ for i in self.encoding_positions_argsort:
+ if attrs[i] is None:
+ if not self.schema.fields[i].type.nullable:
raise ValueError(
"Attempted to encode null for non-nullable field \"{}\".".format(
- field.name))
+ self.schema.fields[i].name))
continue
- c.encode_to_stream(attr, out, True)
+ self.components[i].encode_to_stream(attrs[i], out, True)
def decode_from_stream(self, in_stream, nested):
nvals = self.SIZE_CODER.decode_from_stream(in_stream, True)
@@ -213,10 +229,15 @@ class RowCoderImpl(StreamCoderImpl):
# Note that if this coder's schema has *fewer* attributes than the encoded
# value, we just need to ignore the additional values, which will occur
# here because we only decode as many values as we have coders for.
+
+ sorted_components = [
+ None if is_null else self.components[c].decode_from_stream(
+ in_stream, True) for c,
+ is_null in zip(self.encoding_positions_argsort, nulls)
+ ]
+
return self.constructor(
- *(
- None if is_null else c.decode_from_stream(in_stream, True) for c,
- is_null in zip(self.components, nulls)))
+ *[sorted_components[i] for i in self.encoding_positions])
def _make_value_coder(self, nulls=itertools.repeat(False)):
components = [
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py b/sdks/python/apache_beam/coders/row_coder_test.py
index 2fdf7f8..74a33b4 100644
--- a/sdks/python/apache_beam/coders/row_coder_test.py
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -31,6 +31,8 @@ from apache_beam.portability.api import schema_pb2
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
+from apache_beam.typehints.schemas import SCHEMA_REGISTRY
+from apache_beam.typehints.schemas import named_tuple_to_schema
from apache_beam.typehints.schemas import typing_to_runner_api
from apache_beam.utils.timestamp import Timestamp
@@ -282,6 +284,61 @@ class RowCoderTest(unittest.TestCase):
self.assertEqual(value, coder.decode(coder.encode(value)))
+ def test_encoding_position_reorder_fields(self):
+ fields = [("field1", str), ("field2", int), ("field3", int)]
+
+ expected = typing.NamedTuple('expected', fields)
+ reorder = schema_pb2.Schema(
+ id="new_order",
+ fields=[
+ schema_pb2.Field(
+ name="field3",
+ type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING),
+ encoding_position=2),
+ schema_pb2.Field(
+ name="field2",
+ type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
+ encoding_position=1),
+ schema_pb2.Field(
+ name="field1",
+ type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32),
+ encoding_position=0)
+ ])
+
+ old_coder = RowCoder.from_type_hint(expected, None)
+ new_coder = RowCoder(reorder)
+
+ encode_expected = old_coder.encode(expected("foo", 7, 12))
+ encode_reorder = new_coder.encode(expected(12, 7, "foo"))
+ self.assertEqual(encode_expected, encode_reorder)
+
+ def test_encoding_position_add_fields(self):
+ fields = [("field1", str), ("field2", str)]
+
+ Old = typing.NamedTuple("Old", fields[:-1])
+ New = typing.NamedTuple("New", fields)
+
+ old_coder = RowCoder.from_type_hint(Old, None)
+ new_coder = RowCoder.from_type_hint(New, None)
+
+ self.assertEqual(
+ New("bar", None), new_coder.decode(old_coder.encode(Old("bar"))))
+
+ def test_encoding_position_add_fields_and_reorder(self):
+ fields = [("field1", typing.Optional[str]), ("field2", str),
+ ("field3", typing.Optional[str])]
+
+ Old = typing.NamedTuple("Old", fields[:-1])
+ New = typing.NamedTuple("New", fields)
+
+ old_coder = RowCoder.from_type_hint(Old, None)
+ new_coder = RowCoder.from_type_hint(New, None)
+ set_encoding_position(New, [("field3", 2), ("field2", 1), ("field1", 0)])
+
+ self.assertEqual(
+ New("foo", "baz", None),
+ new_coder.decode(old_coder.encode(Old("foo", "baz"))))
+
def test_row_coder_fail_early_bad_schema(self):
schema_proto = schema_pb2.Schema(
fields=[
@@ -294,6 +351,22 @@ class RowCoderTest(unittest.TestCase):
ValueError, "type_with_no_typeinfo", lambda: RowCoder(schema_proto))
+def get_encoding_position(schema):
+ return [f.encoding_position for f in schema.fields]
+
+
+def set_encoding_position(type_, values):
+ beam_schema_id = "_beam_schema_id"
+ if hasattr(type_, beam_schema_id):
+ schema = SCHEMA_REGISTRY.get_schema_by_id(getattr(type_, beam_schema_id))
+ else:
+ schema = named_tuple_to_schema(type_)
+ val = dict(values)
+ for idx, field in enumerate(schema.fields):
+ schema.fields[idx].encoding_position = val[field.name]
+ SCHEMA_REGISTRY.add(type_, schema)
+
+
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
unittest.main()