You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@arrow.apache.org by "ericlin4 (via GitHub)" <gi...@apache.org> on 2023/03/06 22:14:29 UTC

[GitHub] [arrow] ericlin4 opened a new issue, #34474: Table.join() produces incorrect results for large inputs

ericlin4 opened a new issue, #34474:
URL: https://github.com/apache/arrow/issues/34474

   ### Describe the bug, including details regarding any error messages, version, and platform.
   
   Pyarrow's join does not produce the same results as Pandas when the input tables are large. I am observing this in industry data that I am working with, and I have a reproducible example below that mimics this data.
   
   In this example, we have 72 million unique rows in each table with 9 join key columns of various types. The tables are identical except for the 'val' column in the second table.
   
   Pyarrow's join creates null values for 'val' where there should be actual values from the second table. The join performed in
   Pandas produces the expected result.
   
   I can produce the same result as Pandas by splitting each table into pieces, joining each left piece to each right piece, coalescing 'val', and concatenating the outputs (e.g., pa.concat([tbl1_a.join(tbl2_a), tbl1_a.join(tbl2_b), tbl1_b.join(tbl2_a), tbl1_b.join(tbl2_b)])).
   
   Apologies for the long-running example. The first section that generates the join key data takes about an hour on my machine (AWS r5.24xlarge EC2 instance) with the rest taking about 30 minutes. Around 100GB of memory is necessary to run the code.
   
   ```
   import pyarrow as pa #11.0.0
   import pandas as pd  #1.5.3
   import numpy as np   #1.23.5
   
   #Generate join key data
   n_rows = 72000000
   join_keys = [f'col{i}' for i in range(1,10)]
   
   col_str = [str(i) for i in range(n_rows)]
   col_date = [pd.to_datetime('2000-01-01') for i in range(n_rows)]
   col_int = [i for i in range(n_rows)]
   
   #Create dataframes -- df1 and df2 are identical except for the the 'val' column
   df1 = pd.DataFrame({'col1': col_str,
                       'col2': col_str,
                       'col3': col_str,
                       'col4': col_str,
                       'col5': col_str,
                       'col6': col_date,
                       'col7': col_date,
                       'col8': col_date,
                       'col9': col_int})
   
   df2 = pd.DataFrame({'col1': col_str,
                       'col2': col_str,
                       'col3': col_str,
                       'col4': col_str,
                       'col5': col_str,
                       'col6': col_date,
                       'col7': col_date,
                       'col8': col_date,
                       'col9': col_int,
                       'val': [i for i in range(n_rows - 10000000)] + [np.nan for i in range(10000000)]})
   
   #Create Pyarrow Tables and merge
   df1_pa = pa.Table.from_pandas(df1)
   df2_pa = pa.Table.from_pandas(df2)
   
   merge_pa = df1_pa.join(df2_pa, keys = join_keys, join_type = 'left outer')
   merge_pa_df = merge_pa.to_pandas()
   
   #Merge dataframes analogously in Pandas
   merge_pd_df = pd.merge(df1, df2, on = join_keys, how = 'left')
   
   #Compare results -- should have the same number of non-null values in 'val'
   print(f"Pyarrow join non-null rows: {sum(merge_pa_df['val'].notnull())}")
   print(f"Pandas merge non-null rows: {sum(merge_pd_df['val'].notnull())}")
   
   #Returns
   #"Pyarrow join non-null rows: 37317087" (also changes from run to run)
   #"Pandas merge non-null rows: 62000000"
   
   #Expected
   #"Pyarrow join non-null rows: 62000000"
   #"Pandas merge non-null rows: 62000000"
   
   
   
   #Example row of unexpected output
   #merge_pd_df.rename({'val':'val_pd'}, axis = 1, inplace = True)
   #merge_pa_df.rename({'val':'val_pa'}, axis = 1, inplace = True)
   #comp = pd.merge(merge_pd_df, merge_pa_df, on = join_keys, how = 'left')
   
   #col1   col2   col3   col4   col5   col6       col7       col8       col9   val_pd   val_pa
   #0      0      0      0      0      2000-01-01 2000-01-01 2000-01-01 0      0.0      NaN
   ```
   
   ### Component(s)
   
   Python


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] jorisvandenbossche closed issue #34474: [Python] Table.join() produces incorrect results for large inputs

Posted by "jorisvandenbossche (via GitHub)" <gi...@apache.org>.
jorisvandenbossche closed issue #34474: [Python] Table.join() produces incorrect results for large inputs
URL: https://github.com/apache/arrow/issues/34474


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on issue #34474: [Python] Table.join() produces incorrect results for large inputs

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on issue #34474:
URL: https://github.com/apache/arrow/issues/34474#issuecomment-1506009490

   I managed to look into this today.  The bad news is that this join isn't supported.
   
   There are 9 key columns.  The date and int columns are 8 bytes each.  The string columns are variable but at least 4 bytes and average out close enough to 8 bytes that we can just use 8.
   
   72,000,000 * 8 bytes * 9 columns ~ 6GB of data.  We store key data in a structure that we index with uint32_t which means we can have at most 4GiB of key data.
   
   The current behavior is that we trigger an overflow and clobber existing data in our keys array which is leading to the results you are seeing (incorrect data).
   
   I'm working on a fix that will detect this condition and fail the join when it encounters more than 4GiB key data.  My guess is that by implementing hash join spilling (e.g. https://github.com/apache/arrow/pull/13669) we would naturally increase this limit.  Until then the best we can do is fail.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on issue #34474: [Python] Table.join() produces incorrect results for large inputs

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on issue #34474:
URL: https://github.com/apache/arrow/issues/34474#issuecomment-1458582856

   Thank you very much for the report!
   
   > #"Pyarrow join non-null rows: 37317087" (also changes from run to run)
   
   Hmm, there isn't much that is non-deterministic in a hash-join.  So my guess would be that this is some sort of race condition.  Perhaps we are scheduling more tasks at the higher size and that is leading to the issue.  I was able to come up with a reproducer that runs in under a minute and _should_ be runnable with 32GB of RAM:
   
   ```
   import pyarrow as pa #11.0.0                                                                                                                                                                                       
   import pandas as pd  #1.5.3                                                                                                                                                                                        
   import numpy as np   #1.23.5                                                                                                                                                                                       
   import pyarrow.compute as pc
   
   #Generate join key data                                                                                                                                                                                            
   # n_rows = 72_000_000                                                                                                                                                                                              
   n_rows = 72_000_000
   n_nan_rows = 10_000_000
   join_keys = [f'col{i}' for i in range(1,10)]
   
   some_date = pd.to_datetime('2000-01-01')
   col_date = pa.array([some_date for i in range(n_rows)])
   col_int = pa.array([i for i in range(n_rows)])
   col_str = pc.cast(col_int, pa.string())
   
   #Create dataframes -- df1 and df2 are identical except for the the 'val' column                                                                                                                                    
   df1_pa = pa.Table.from_pydict({'col1': col_str,
                       'col2': col_str,
                       'col3': col_str,
                       'col4': col_str,
                       'col5': col_str,
                       'col6': col_date,
                       'col7': col_date,
                       'col8': col_date,
                       'col9': col_int})
   print(f'left nbytes: {df1_pa.nbytes}')
   
   values = pa.array([i for i in range(n_rows - n_nan_rows)] + [np.nan for i in range(n_nan_rows)])
   
   df2_pa = pa.Table.from_pydict({'col1': col_str,
                       'col2': col_str,
                       'col3': col_str,
                       'col4': col_str,
                       'col5': col_str,
                       'col6': col_date,
                       'col7': col_date,
                       'col8': col_date,
                       'col9': col_int,
                               'val': values})
   print(f'right nbytes: {df2_pa.nbytes}')
   
   merge_pa = df1_pa.join(df2_pa, keys = join_keys, join_type = 'left outer')
   vals_merged = merge_pa.column('val')
   
   non_null_count = pc.count(merge_pa.column('val'))
   nan_count = len(vals_merged.filter(pc.is_nan(vals_merged)))
   print(f'Expected: {n_nan_rows} Actual: {nan_count}')
   ```
   
   This will be a tricky one to get to the bottom of I think.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on issue #34474: [Python] Table.join() produces incorrect results for large inputs

Posted by "westonpace (via GitHub)" <gi...@apache.org>.
westonpace commented on issue #34474:
URL: https://github.com/apache/arrow/issues/34474#issuecomment-1458906889

   Ok, so it seems the non-determinism is from garbage memory and not threading.  This code triggers a segmentation fault when run in debug mode.  The error is somewhere in the hash-table and that code is pretty complex.  That's about as far as I can get today but I'll try and find a day to really dive into this before the release.  This needs to be fixed.
   
   For future reference, I'm attaching the stack trace I am getting.
   [snippet.txt](https://github.com/apache/arrow/files/10914114/snippet.txt)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org