You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/30 21:58:07 UTC

[GitHub] [beam] robertwb commented on a change in pull request #12426: [BEAM-7996] Add support for MapType and Nulls in container types for Python RowCoder

robertwb commented on a change in pull request #12426:
URL: https://github.com/apache/beam/pull/12426#discussion_r463291968



##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -855,10 +855,21 @@ message StandardCoders {
     //     BOOLEAN:   beam:coder:bool:v1
     //     BYTES:     beam:coder:bytes:v1
     //   ArrayType:   beam:coder:iterable:v1 (always has a known length)
-    //   MapType:     not yet a standard coder (BEAM-7996)
+    //   MapType:     not a standard coder, specification defined below.
     //   RowType:     beam:coder:row:v1
     //   LogicalType: Uses the coder for its representation.
     //
+    // The MapType is encoded by:
+    //   - An INT32 representing the size of the map (N)
+    //   - Followed by N interleaved keys and values, encoded with their
+    //     corresponding coder.
+    //
+    // Nullable types in container types (ArrayType, MapType) are encoded by:
+    //   - A one byte null indicator, 0x00 for null values, or 0x01 for present
+    //     values.
+    //   - For present values the null indicator is followed by the value
+    //     encoded with it's corresponding coder.
+    //

Review comment:
       For maps specifically, do we want to allow null keys? Is it valuable to have null values (as distinct from just not present)? I might lean towards disallowing nulls and then possibly allowing it in the future if we have good reason to, which will be forward compatible. 

##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -530,6 +530,88 @@ def estimate_size(self, unused_value, nested=False):
     return 1
 
 
+class MapCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for typing.Mapping objects."""
+  def __init__(
+      self,
+      key_coder,  # type: CoderImpl
+      value_coder  # type: CoderImpl
+  ):
+    self._key_coder = key_coder
+    self._value_coder = value_coder
+
+  def encode_to_stream(self, value, out, nested):
+    size = len(value)
+    out.write_bigendian_int32(size)

Review comment:
       We don't care about larger maps? Also, why not varint? (If that's what's conventionally used elsewhere, I'm fine with that, but iterable coder uses varint64.)

##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -855,10 +855,21 @@ message StandardCoders {
     //     BOOLEAN:   beam:coder:bool:v1
     //     BYTES:     beam:coder:bytes:v1
     //   ArrayType:   beam:coder:iterable:v1 (always has a known length)
-    //   MapType:     not yet a standard coder (BEAM-7996)
+    //   MapType:     not a standard coder, specification defined below.
     //   RowType:     beam:coder:row:v1
     //   LogicalType: Uses the coder for its representation.
     //
+    // The MapType is encoded by:
+    //   - An INT32 representing the size of the map (N)
+    //   - Followed by N interleaved keys and values, encoded with their
+    //     corresponding coder.
+    //
+    // Nullable types in container types (ArrayType, MapType) are encoded by:
+    //   - A one byte null indicator, 0x00 for null values, or 0x01 for present
+    //     values.
+    //   - For present values the null indicator is followed by the value
+    //     encoded with it's corresponding coder.
+    //

Review comment:
       Is this just the encoding of a nullable type? (Why does it have to be called out specially?)

##########
File path: sdks/python/apache_beam/coders/coder_impl.py
##########
@@ -530,6 +530,88 @@ def estimate_size(self, unused_value, nested=False):
     return 1
 
 
+class MapCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  A coder for typing.Mapping objects."""
+  def __init__(
+      self,
+      key_coder,  # type: CoderImpl
+      value_coder  # type: CoderImpl
+  ):
+    self._key_coder = key_coder
+    self._value_coder = value_coder
+
+  def encode_to_stream(self, value, out, nested):
+    size = len(value)
+    out.write_bigendian_int32(size)
+    for i, kv in enumerate(value.items()):
+      key, value = kv
+      last = i == size - 1

Review comment:
       Just nest everything. That'll simplify the logic and the definition of the coder. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org