You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/24 02:57:46 UTC

[GitHub] [beam] udim commented on a change in pull request #13839: [BEAM-11719 ] Use deterministic coders for grouping keys.

udim commented on a change in pull request #13839:
URL: https://github.com/apache/beam/pull/13839#discussion_r581544889



##########
File path: sdks/python/apache_beam/transforms/ptransform_test.py
##########
@@ -500,6 +503,94 @@ def process(self, gbk_result):
           | 'Reiteration-Sum' >> beam.ParDo(MyDoFn()))
       assert_that(result, equal_to([(1, 170)]))
 
+  def test_group_by_key_determanistic_coder(self):

Review comment:
       spelling: "determanistic" here and below

##########
File path: sdks/python/apache_beam/transforms/ptransform_test.py
##########
@@ -500,6 +503,94 @@ def process(self, gbk_result):
           | 'Reiteration-Sum' >> beam.ParDo(MyDoFn()))
       assert_that(result, equal_to([(1, 170)]))
 
+  def test_group_by_key_determanistic_coder(self):
+    # pylint: disable=global-variable-not-assigned
+    global MyObject  # for pickling of the class instance
+
+    class MyObject:
+      def __init__(self, value):
+        self.value = value
+
+      def __eq__(self, other):
+        return self.value == other.value
+
+      def __hash__(self):
+        return hash(self.value)
+
+    class MyObjectCoder(beam.coders.Coder):
+      def encode(self, o):
+        return pickle.dumps((o.value, random.random()))
+
+      def decode(self, encoded):
+        return MyObject(pickle.loads(encoded)[0])
+
+      def as_deterministic_coder(self, *args):
+        return MyDetermanisticObjectCoder()
+
+      def to_type_hint(self):
+        return MyObject
+
+    class MyDetermanisticObjectCoder(beam.coders.Coder):
+      def encode(self, o):
+        return pickle.dumps(o.value)
+
+      def decode(self, encoded):
+        return MyObject(pickle.loads(encoded))
+
+      def is_deterministic(self):
+        return True
+
+    beam.coders.registry.register_coder(MyObject, MyObjectCoder)
+
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | beam.Create([(MyObject(k % 2), k) for k in range(10)])
+      grouped = pcoll | beam.GroupByKey() | beam.MapTuple(
+          lambda k, vs: (k.value, sorted(vs)))
+      combined = pcoll | beam.CombinePerKey(sum) | beam.MapTuple(
+          lambda k, v: (k.value, v))
+      assert_that(
+          grouped,
+          equal_to([(0, [0, 2, 4, 6, 8]), (1, [1, 3, 5, 7, 9])]),
+          'CheckGrouped')
+      assert_that(combined, equal_to([(0, 20), (1, 25)]), 'CheckCombined')
+
+  def test_group_by_key_non_determanistic_coder(self):
+    with self.assertRaises(Exception):
+      with TestPipeline() as pipeline:
+        _ = (
+            pipeline
+            | beam.Create([(PickledObject(10), None)])
+            | beam.GroupByKey()
+            | beam.MapTuple(lambda k, v: list(v)))
+
+  def test_group_by_key_allow_non_determanistic_coder(self):
+    with TestPipeline() as pipeline:
+      # The GroupByKey below would fail without this option.
+      pipeline._options.view_as(
+          TypeOptions).allow_non_deterministic_key_coders = True
+      grouped = (
+          pipeline
+          | beam.Create([(PickledObject(10), None)])
+          | beam.GroupByKey()
+          | beam.MapTuple(lambda k, v: list(v)))
+    assert_that(grouped, equal_to([[None]]))

Review comment:
       I'm not sure, but I think that this assert_that (and the one below) don't work because they're outside the `with` block.

##########
File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
##########
@@ -858,9 +859,13 @@ def _load_data(
             lambda x,
             deleting_tables: deleting_tables,
             pvalue.AsIter(temp_tables_pc))
-        | "RemoveTempTables/AddUselessValue" >> beam.Map(lambda x: (x, None))
+        # TableReference has no deterministic coder, but as this de-duplication

Review comment:
       From what I can tell, `parse_table_reference` is used to normalize and populate TableReference instances. They are essentially a "(project, dataset, table) tuple."
   It should be possible to `beam.Map(lambda x: (x.tableId, x.datasetId, x.projectId)) | beam.Distinct() | beam.Map(lambda x: parse_table_reference(*x))`
   
   cc: @pabloem 

##########
File path: sdks/python/apache_beam/transforms/ptransform_test.py
##########
@@ -500,6 +503,94 @@ def process(self, gbk_result):
           | 'Reiteration-Sum' >> beam.ParDo(MyDoFn()))
       assert_that(result, equal_to([(1, 170)]))
 
+  def test_group_by_key_determanistic_coder(self):
+    # pylint: disable=global-variable-not-assigned
+    global MyObject  # for pickling of the class instance
+
+    class MyObject:
+      def __init__(self, value):
+        self.value = value
+
+      def __eq__(self, other):
+        return self.value == other.value
+
+      def __hash__(self):
+        return hash(self.value)
+
+    class MyObjectCoder(beam.coders.Coder):
+      def encode(self, o):
+        return pickle.dumps((o.value, random.random()))
+
+      def decode(self, encoded):
+        return MyObject(pickle.loads(encoded)[0])
+
+      def as_deterministic_coder(self, *args):
+        return MyDetermanisticObjectCoder()
+
+      def to_type_hint(self):
+        return MyObject
+
+    class MyDetermanisticObjectCoder(beam.coders.Coder):
+      def encode(self, o):
+        return pickle.dumps(o.value)
+
+      def decode(self, encoded):
+        return MyObject(pickle.loads(encoded))
+
+      def is_deterministic(self):
+        return True
+
+    beam.coders.registry.register_coder(MyObject, MyObjectCoder)
+
+    with TestPipeline() as pipeline:
+      pcoll = pipeline | beam.Create([(MyObject(k % 2), k) for k in range(10)])
+      grouped = pcoll | beam.GroupByKey() | beam.MapTuple(
+          lambda k, vs: (k.value, sorted(vs)))
+      combined = pcoll | beam.CombinePerKey(sum) | beam.MapTuple(
+          lambda k, v: (k.value, v))
+      assert_that(
+          grouped,
+          equal_to([(0, [0, 2, 4, 6, 8]), (1, [1, 3, 5, 7, 9])]),
+          'CheckGrouped')
+      assert_that(combined, equal_to([(0, 20), (1, 25)]), 'CheckCombined')
+
+  def test_group_by_key_non_determanistic_coder(self):
+    with self.assertRaises(Exception):

Review comment:
       Could this be `self.assertRaisesRegex(ValueError, r'deterministic')`? Otherwise some other exception might creep in and we wouldn't know.

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -352,6 +360,8 @@ def visit_transform(self, transform_node):
               # access pattern to appease Dataflow.
               side_input.pvalue.element_type = typehints.coerce_to_kv_type(
                   side_input.pvalue.element_type, transform_node.full_label)
+              side_input.pvalue.requires_deterministic_key_coder = (
+                  deterministic_key_coders and transform_node.full_label)

Review comment:
       Nice! I like that the error message will have the transform name.

##########
File path: sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
##########
@@ -80,22 +80,17 @@ def test_basics_without_type_check(self):
     # therefore any custom coders will not be used. The default coder (pickler)
     # will be used instead.
     temp_path = self.create_temp_file(self.SAMPLE_RECORDS)
-    group_with_coder.run([

Review comment:
       This means that the example will not run without type checking. The example doesn't mention type checking at all (in group_with_coder.py), so it's probably okay to assume people will be using the defaults.

##########
File path: sdks/python/apache_beam/runners/pipeline_context.py
##########
@@ -233,12 +233,20 @@ def requirements(self):
   # rather than an actual coder. The element type is required for some runners,
   # as well as performing a round-trip through protos.
   # TODO(BEAM-2717): Remove once this is no longer needed.
-  def coder_id_from_element_type(self, element_type):
-    # type: (Any) -> str
+  def coder_id_from_element_type(
+      self, element_type, requires_deterministic_key_coder=None):
+    # type: (Any, Optional[str]) -> str
     if self.use_fake_coders:
       return pickler.dumps(element_type).decode('ascii')
     else:
-      return self.coders.get_id(coders.registry.get_coder(element_type))
+      coder = coders.registry.get_coder(element_type)
+      if requires_deterministic_key_coder:
+        coder = coders.TupleCoder([

Review comment:
       I don't understand this part. Why is it safe to assume that any coder here will have a key and value coder?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org