You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/12/30 18:11:54 UTC

[GitHub] [beam] ibzib commented on a change in pull request #13634: [BEAM-11532] Fix edge case in merge where left_on and right_on contain equivalent column names

ibzib commented on a change in pull request #13634:
URL: https://github.com/apache/beam/pull/13634#discussion_r550282012



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1218,15 +1219,32 @@ def merge(
     merged = frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'merge',
-            lambda left, right: left.merge(
-                right, left_index=True, right_index=True, **kwargs),
+            lambda left, right: left.merge(right,
+                                           left_index=True,
+                                           right_index=True,
+                                           suffixes=suffixes,
+                                           **kwargs),
             [indexed_left._expr, indexed_right._expr],
             preserves_partition_by=partitionings.Singleton(),
             requires_partition_by=partitionings.Index()))
 
     if left_index or right_index:
       return merged
     else:
+      common_cols = set(left_on).intersection(right_on)
+      if len(common_cols):
+        # When merging on the same column name from both dfs, merged will have
+        # two duplicate columns, one with lsuffix and one with rsuffix.
+        # Normally pandas de-dupes these into a single column with no suffix.
+        # This replicates that logic by dropping the _right_ dupe, and removing
+        # the suffix from the _left_ dupe.
+        lsuffix, rsuffix = suffixes
+        merged = merged.drop(

Review comment:
       What if `{col}{rsuffix}` or `{col}{lsuffix}` already exists in the input?

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -124,7 +124,6 @@ def test_groupby(self):
 
     self._run_test(lambda df: df.groupby(['second', 'A']).sum(), df)
 
-  @unittest.skipIf(sys.version_info <= (3, ), 'differing signature')

Review comment:
       Thanks for the cleanup 👍 

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1218,15 +1219,32 @@ def merge(
     merged = frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'merge',
-            lambda left, right: left.merge(
-                right, left_index=True, right_index=True, **kwargs),
+            lambda left, right: left.merge(right,
+                                           left_index=True,
+                                           right_index=True,
+                                           suffixes=suffixes,
+                                           **kwargs),
             [indexed_left._expr, indexed_right._expr],
             preserves_partition_by=partitionings.Singleton(),
             requires_partition_by=partitionings.Index()))
 
     if left_index or right_index:
       return merged
     else:
+      common_cols = set(left_on).intersection(right_on)
+      if len(common_cols):
+        # When merging on the same column name from both dfs, merged will have
+        # two duplicate columns, one with lsuffix and one with rsuffix.
+        # Normally pandas de-dupes these into a single column with no suffix.
+        # This replicates that logic by dropping the _right_ dupe, and removing
+        # the suffix from the _left_ dupe.
+        lsuffix, rsuffix = suffixes
+        merged = merged.drop(
+            columns=[f'{col}{rsuffix}' for col in common_cols])
+        merged = merged.rename(
+            columns={f'{col}{lsuffix}': col for col in common_cols})
+

Review comment:
       Nit: Why two newlines, shouldn't it just be one?

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -152,6 +151,45 @@ def test_merge(self):
           df1,
           df2)
 
+  def test_merge_same_key(self):
+    df1 = pd.DataFrame({
+        'key': ['foo', 'bar', 'baz', 'foo'], 'value': [1, 2, 3, 5]
+    })
+    df2 = pd.DataFrame({
+        'key': ['foo', 'bar', 'baz', 'foo'], 'value': [5, 6, 7, 8]
+    })
+    with beam.dataframe.allow_non_parallel_operations():
+      self._run_test(
+          lambda df1,
+          df2: df1.merge(df2, on='key').rename(index=lambda x: '*').sort_values(
+              ['value_x', 'value_y']),
+          df1,
+          df2)
+      self._run_test(
+          lambda df1,
+          df2: df1.merge(df2, on='key', suffixes=('_left', '_right')).rename(
+              index=lambda x: '*').sort_values(['value_left', 'value_right']),
+          df1,
+          df2)
+
+    df1 = pd.DataFrame({'a': ['foo', 'bar'], 'b': [1, 2]})
+    df2 = pd.DataFrame({'a': ['foo', 'baz'], 'c': [3, 4]})
+
+    with beam.dataframe.allow_non_parallel_operations():
+      self._run_test(
+          lambda df1,
+          df2: df1.merge(df2, how='left', on='a').rename(index=lambda x: '*').
+          sort_values(['b', 'c']),
+          df1,
+          df2)
+      # Test without specifying 'on'

Review comment:
       Can we split `test_merge_same_key` into separate test methods for each invocation of `_run_test`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org