You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/09 17:08:17 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #14135: [BEAM-11881] Fix partitioning ordering for DataFrames

TheNeuralBit commented on a change in pull request #14135:
URL: https://github.com/apache/beam/pull/14135#discussion_r590552721



##########
File path: sdks/python/apache_beam/dataframe/partitionings_test.py
##########
@@ -36,8 +36,8 @@ class PartitioningsTest(unittest.TestCase):
   }).set_index(['shape', 'color', 'size'])
 
   def test_index_is_subpartition(self):
-    ordered_list = [Nothing(), Index([3]), Index([1, 3]), Index(), Singleton()]
-    for loose, strict in zip(ordered_list[:1], ordered_list[1:]):
+    ordered_list = [Singleton(), Index([3]), Index([1, 3]), Index(), Nothing()]
+    for loose, strict in zip(ordered_list[:-1], ordered_list[1:]):

Review comment:
       Note the change to make this `-1`. This test was only verifying `Singleton()` vs `Index([3])` then exiting the loop. The checks between different `Index()` partitionings would have failed, but they weren't being executed.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -130,7 +130,7 @@ def fillna(self, value, method, axis, **kwargs):
             lambda df,
             value: df.fillna(value, method=method, axis=axis, **kwargs),
             [self._expr, value_expr],
-            preserves_partition_by=partitionings.Singleton(),
+            preserves_partition_by=partitionings.Index(),

Review comment:
       In general, what we previously called `preserves=Singleton()` (or equivalently `preserves=Index()`), i.e. "this expression will preserve any partitioning", now must be indicated by `preserves=Index()`.
   
   Also what we previously called `preserves=Nothing()`, i.e. "this expression cannot preserve any partitioning", is now represented with `preserves=Singleton()`. This is perhaps a little surprising but I think it's easy to reason about, just think of it as "the only thing this expression preserves is Singleton() parititioning, which is trivially preserved".
   
   All the `preserves=` updates are applying this mapping

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -331,8 +339,7 @@ def expr_to_stages(expr):
       required_partitioning = expr.requires_partition_by()
       for stage in common_stages([expr_to_stages(arg) for arg in expr.args()
                                   if arg not in inputs]):
-        if all(output_is_partitioned_by(arg, stage, required_partitioning)
-               for arg in expr.args() if not is_scalar(arg)):
+        if output_partitioning_in_stage(expr, stage) is not None:

Review comment:
       Instead of duplicating a portion of the logic from `output_is_partitioned_by`, this new implementation moves all of the logic into `output_partitioning_in_stage`. Here we just check for a None output indicating this expression cannot be computed in this stage.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org