You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/02/06 19:07:47 UTC

[airflow] branch main updated: Fix to check if values are integer or float and convert accordingly. (#21277)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a6ea57  Fix to check if values are integer or float and convert accordingly. (#21277)
0a6ea57 is described below

commit 0a6ea572fb5340a904e9cefaa656ac0127b15216
Author: Sasan Ahmadi <sa...@msn.com>
AuthorDate: Sun Feb 6 11:07:09 2022 -0800

    Fix to check if values are integer or float and convert accordingly. (#21277)
    
    This code will prevent the loss of data if the value is a float it will convert to float if it is not then int.  It will use pd.Float64Dtype() for floats instead of using the the pd.Int64Dtype(). Since there could be floating-point values in the array this will fix the exception for safely casting the array to data type.
    fixes error when using mysql_to_s3 (TypeError: cannot safely cast non-equivalent object to int64) #16919
---
 airflow/providers/amazon/aws/transfers/sql_to_s3.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/amazon/aws/transfers/sql_to_s3.py b/airflow/providers/amazon/aws/transfers/sql_to_s3.py
index 696b4fe..8fe44de 100644
--- a/airflow/providers/amazon/aws/transfers/sql_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/sql_to_s3.py
@@ -127,10 +127,14 @@ class SqlToS3Operator(BaseOperator):
             if "float" in df[col].dtype.name and df[col].hasnans:
                 # inspect values to determine if dtype of non-null values is int or float
                 notna_series = df[col].dropna().values
-                if np.isclose(notna_series, notna_series.astype(int)).all():
+                if np.equal(notna_series, notna_series.astype(int)).all():
                     # set to dtype that retains integers and supports NaNs
                     df[col] = np.where(df[col].isnull(), None, df[col])
                     df[col] = df[col].astype(pd.Int64Dtype())
+                elif np.isclose(notna_series, notna_series.astype(int)).all():
+                    # set to float dtype that retains floats and supports NaNs
+                    df[col] = np.where(df[col].isnull(), None, df[col])
+                    df[col] = df[col].astype(pd.Float64Dtype())
 
     def execute(self, context: 'Context') -> None:
         sql_hook = self._get_hook()