You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/08 02:14:03 UTC

[GitHub] [beam] chamikaramj commented on a change in pull request #16923: [BEAM-10529] nullable xlang coder

chamikaramj commented on a change in pull request #16923:
URL: https://github.com/apache/beam/pull/16923#discussion_r821263272



##########
File path: sdks/python/apache_beam/typehints/typehints.py
##########
@@ -507,6 +507,9 @@ def _inner_types(self):
       for t in self.union_types:
         yield t
 
+    def contains_type(self, maybe_type):

Review comment:
       Does the order matter (for nullable and other cases) ?

##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -1081,6 +1081,14 @@ message StandardCoders {
     // Components: the user key coder.
     // Experimental.
     SHARDED_KEY = 15 [(beam_urn) = "beam:coder:sharded_key:v1"];
+
+    //Wraps a coder of a potentially null value
+    //
+    // A Nullable coder encodes nullable values of wrapped coder value that does
+    // not tolerate null values. A Nullable coder uses exactly 1 byte per entry
+    // to indicate whether the value is null, then adds the encoding of the
+    // inner coder for non-null values.

Review comment:
       You mean 1 byte (\u0000) implies null ? I might be misunderstanding something.

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -776,11 +777,11 @@ private static Coder resolveCoder(Class deserializer) {
               continue;
             }
             if (returnType.equals(byte[].class)) {
-              return ByteArrayCoder.of();
+              return NullableCoder.of(ByteArrayCoder.of());
             } else if (returnType.equals(Integer.class)) {
-              return VarIntCoder.of();
+              return NullableCoder.of(VarIntCoder.of());
             } else if (returnType.equals(Long.class)) {
-              return VarLongCoder.of();
+              return NullableCoder.of(VarLongCoder.of());

Review comment:
       Sounds good. Thanks.

##########
File path: model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
##########
@@ -569,3 +569,15 @@ coder:
 examples:
   "\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0067\u0080\u0000\u0001\u0052\u009a\u00a4\u009b\u0068\u0080\u00dd\u00db\u0001" : {window: {end: 1454293425000, span: 3600000}}
   "\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0075\u007f\u00df\u003b\u0064\u005a\u001c\u00ad\u0076\u00ed\u0002" : {window: {end: -9223372036854410, span: 365}}
+
+
+---
+coder:
+  urn: "beam:coder:nullable:v1"
+  components: [{urn: "beam:coder:bytes:v1"}]
+nested: true
+
+examples:
+  "\u0001\u0003\u0061\u0062\u0063" : "abc"
+  "\u0001\u000a\u006d\u006f\u0072\u0065\u0020\u0062\u0079\u0074\u0065\u0073" : "more bytes"
+  "\u0000" : null

Review comment:
       To clarify, only the last byte here will be in the an encoded element that has value null ?

##########
File path: sdks/python/apache_beam/coders/typecoders.py
##########
@@ -138,6 +137,10 @@ def get_coder(self, typehint):
         return coders.IterableCoder.from_type_hint(typehint, self)
       elif isinstance(typehint, typehints.ListConstraint):
         return coders.ListCoder.from_type_hint(typehint, self)
+      elif (isinstance(typehint, typehints.UnionConstraint) and
+            typehint.contains_type(type(None) and
+                                   len(list(typehint._inner_types())) == 2)):

Review comment:
       Let's make sure this is covered by a unit test.

##########
File path: sdks/python/apache_beam/coders/typecoders.py
##########
@@ -138,6 +137,10 @@ def get_coder(self, typehint):
         return coders.IterableCoder.from_type_hint(typehint, self)
       elif isinstance(typehint, typehints.ListConstraint):
         return coders.ListCoder.from_type_hint(typehint, self)
+      elif (isinstance(typehint, typehints.UnionConstraint) and
+            typehint.contains_type(type(None) and
+                                   len(list(typehint._inner_types())) == 2)):
+        return coders.NullableCoder.from_type_hint(typehint, self)

Review comment:
       Did you mean to use the inner type in this call ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org