You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/24 17:51:43 UTC

[GitHub] [beam] robertwb commented on a change in pull request #13839: [BEAM-11719 ] Use deterministic coders for grouping keys.

robertwb commented on a change in pull request #13839:
URL: https://github.com/apache/beam/pull/13839#discussion_r582169419



##########
File path: sdks/python/apache_beam/transforms/ptransform_test.py
##########
@@ -500,6 +503,94 @@ def process(self, gbk_result):
           | 'Reiteration-Sum' >> beam.ParDo(MyDoFn()))
       assert_that(result, equal_to([(1, 170)]))
 
+  def test_group_by_key_determanistic_coder(self):
+    # pylint: disable=global-variable-not-assigned
+    global MyObject  # for pickling of the class instance
+
+    class MyObject:
+      def __init__(self, value):
+        self.value = value
+
+      def __eq__(self, other):
+        return self.value == other.value
+
+      def __hash__(self):
+        return hash(self.value)
+
+    class MyObjectCoder(beam.coders.Coder):
+      def encode(self, o):
+        return pickle.dumps((o.value, random.random()))
+
+      def decode(self, encoded):
+        return MyObject(pickle.loads(encoded)[0])
+
+      def as_deterministic_coder(self, *args):
+        return MyDetermanisticObjectCoder()
+
+      def to_type_hint(self):
+        return MyObject
+
+    class MyDetermanisticObjectCoder(beam.coders.Coder):
+      def encode(self, o):
+        return pickle.dumps(o.value)
+
+      def decode(self, encoded):
+        return MyObject(pickle.loads(encoded))
+
+      def is_deterministic(self):
+        return True
+
+    beam.coders.registry.register_coder(MyObject, MyObjectCoder)
+
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | beam.Create([(MyObject(k % 2), k) for k in range(10)])
+      grouped = pcoll | beam.GroupByKey() | beam.MapTuple(
+          lambda k, vs: (k.value, sorted(vs)))
+      combined = pcoll | beam.CombinePerKey(sum) | beam.MapTuple(
+          lambda k, v: (k.value, v))
+      assert_that(
+          grouped,
+          equal_to([(0, [0, 2, 4, 6, 8]), (1, [1, 3, 5, 7, 9])]),
+          'CheckGrouped')
+      assert_that(combined, equal_to([(0, 20), (1, 25)]), 'CheckCombined')
+
+  def test_group_by_key_non_determanistic_coder(self):
+    with self.assertRaises(Exception):

Review comment:
       Done. I'm leaving the error type as Exception is case remote runners don't propagate the error type. 

##########
File path: sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
##########
@@ -80,22 +80,17 @@ def test_basics_without_type_check(self):
     # therefore any custom coders will not be used. The default coder (pickler)
     # will be used instead.
     temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
-    group_with_coder.run([

Review comment:
       Correct. this was explicitly testing something (non-default) that is no longer supported. 

##########
File path: sdks/python/apache_beam/transforms/ptransform_test.py
##########
@@ -500,6 +503,94 @@ def process(self, gbk_result):
           | 'Reiteration-Sum' >> beam.ParDo(MyDoFn()))
       assert_that(result, equal_to([(1, 170)]))
 
+  def test_group_by_key_determanistic_coder(self):
+    # pylint: disable=global-variable-not-assigned
+    global MyObject  # for pickling of the class instance
+
+    class MyObject:
+      def __init__(self, value):
+        self.value = value
+
+      def __eq__(self, other):
+        return self.value == other.value
+
+      def __hash__(self):
+        return hash(self.value)
+
+    class MyObjectCoder(beam.coders.Coder):
+      def encode(self, o):
+        return pickle.dumps((o.value, random.random()))
+
+      def decode(self, encoded):
+        return MyObject(pickle.loads(encoded)[0])
+
+      def as_deterministic_coder(self, *args):
+        return MyDetermanisticObjectCoder()
+
+      def to_type_hint(self):
+        return MyObject
+
+    class MyDetermanisticObjectCoder(beam.coders.Coder):
+      def encode(self, o):
+        return pickle.dumps(o.value)
+
+      def decode(self, encoded):
+        return MyObject(pickle.loads(encoded))
+
+      def is_deterministic(self):
+        return True
+
+    beam.coders.registry.register_coder(MyObject, MyObjectCoder)
+
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | beam.Create([(MyObject(k % 2), k) for k in range(10)])
+      grouped = pcoll | beam.GroupByKey() | beam.MapTuple(
+          lambda k, vs: (k.value, sorted(vs)))
+      combined = pcoll | beam.CombinePerKey(sum) | beam.MapTuple(
+          lambda k, v: (k.value, v))
+      assert_that(
+          grouped,
+          equal_to([(0, [0, 2, 4, 6, 8]), (1, [1, 3, 5, 7, 9])]),
+          'CheckGrouped')
+      assert_that(combined, equal_to([(0, 20), (1, 25)]), 'CheckCombined')
+
+  def test_group_by_key_non_determanistic_coder(self):
+    with self.assertRaises(Exception):
+      with TestPipeline() as pipeline:
+        _ = (
+            pipeline
+            | beam.Create([(PickledObject(10), None)])
+            | beam.GroupByKey()
+            | beam.MapTuple(lambda k, v: list(v)))
+
+  def test_group_by_key_allow_non_determanistic_coder(self):
+    with TestPipeline() as pipeline:
+      # The GroupByKey below would fail without this option.
+      pipeline._options.view_as(
+          TypeOptions).allow_non_deterministic_key_coders = True
+      grouped = (
+          pipeline
+          | beam.Create([(PickledObject(10), None)])
+          | beam.GroupByKey()
+          | beam.MapTuple(lambda k, v: list(v)))
+    assert_that(grouped, equal_to([[None]]))

Review comment:
       Oh, nice catch!

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
##########
@@ -858,9 +859,13 @@ def _load_data(
             lambda x,
             deleting_tables: deleting_tables,
             pvalue.AsIter(temp_tables_pc))
-        | "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None))
+        # TableReference has no deterministic coder, but as this de-duplication

Review comment:
       Assuming exactly these three fields would not be as future proof. 

##########
File path: sdks/python/apache_beam/runners/pipeline_context.py
##########
@@ -233,12 +233,20 @@ def requirements(self):
   # rather than an actual coder. The element type is required for some runners,
   # as well as performing a round-trip through protos.
   # TODO(BEAM-2717): Remove once this is no longer needed.
-  def coder_id_from_element_type(self, element_type):
-    # type: (Any) -> str
+  def coder_id_from_element_type(
+      self, element_type, requires_deterministic_key_coder=None):
+    # type: (Any, Optional[str]) -> str
     if self.use_fake_coders:
       return pickler.dumps(element_type).decode('ascii')
     else:
-      return self.coders.get_id(coders.registry.get_coder(element_type))
+      coder = coders.registry.get_coder(element_type)
+      if requires_deterministic_key_coder:
+        coder = coders.TupleCoder([

Review comment:
       If it requires a deterministic key, it must be a coder with a key (and value). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org