You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/04/08 20:47:05 UTC

[beam] branch master updated: [BEAM-14281] add as_deterministic_coder to nullable coder (#17322)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2dc2ad455d1 [BEAM-14281] add as_deterministic_coder to nullable coder (#17322)
2dc2ad455d1 is described below

commit 2dc2ad455d1a5a6877da851e05ad90d67c084871
Author: johnjcasey <95...@users.noreply.github.com>
AuthorDate: Fri Apr 8 16:46:58 2022 -0400

    [BEAM-14281] add as_deterministic_coder to nullable coder (#17322)
    
    * [BEAM-14281] add as_deterministic_coder to nullable coder
    
    * Update coders_test.py
    
    Co-authored-by: Robert Bradshaw <ro...@gmail.com>
---
 sdks/python/apache_beam/coders/coders.py      |  8 ++++++++
 sdks/python/apache_beam/coders/coders_test.py | 16 ++++++++++++++++
 2 files changed, 24 insertions(+)

diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index fce397df626..19463443386 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -627,6 +627,14 @@ class NullableCoder(FastCoder):
     # type: () -> bool
     return self._value_coder.is_deterministic()
 
+  def as_deterministic_coder(self, step_label, error_message=None):
+    if self.is_deterministic():
+      return self
+    else:
+      deterministic_value_coder = self._value_coder.as_deterministic_coder(
+          step_label, error_message)
+      return NullableCoder(deterministic_value_coder)
+
   def __eq__(self, other):
     return (
         type(self) == type(other) and self._value_coder == other._value_coder)
diff --git a/sdks/python/apache_beam/coders/coders_test.py b/sdks/python/apache_beam/coders/coders_test.py
index 49ad33202f0..0a30a320e90 100644
--- a/sdks/python/apache_beam/coders/coders_test.py
+++ b/sdks/python/apache_beam/coders/coders_test.py
@@ -21,7 +21,9 @@ import logging
 import unittest
 
 import proto
+import pytest
 
+from apache_beam import typehints
 from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
 from apache_beam.coders import coders
 from apache_beam.coders.avro_record import AvroRecord
@@ -220,6 +222,20 @@ class FallbackCoderTest(unittest.TestCase):
     self.assertEqual(DummyClass(), coder.decode(coder.encode(DummyClass())))
 
 
+class NullableCoderTest(unittest.TestCase):
+  def test_determinism(self):
+    deterministic = coders_registry.get_coder(typehints.Optional[int])
+    deterministic.as_deterministic_coder('label')
+
+    complex_deterministic = coders_registry.get_coder(
+        typehints.Optional[DummyClass])
+    complex_deterministic.as_deterministic_coder('label')
+
+    nondeterministic = coders.NullableCoder(coders.Base64PickleCoder())
+    with pytest.raises(ValueError):
+      nondeterministic.as_deterministic_coder('label')
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()