You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/03 06:06:45 UTC

[airflow] branch main updated: Fix BaseSQLToGCSOperator approx_max_file_size_bytes (#25469)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 803c0e252f Fix BaseSQLToGCSOperator approx_max_file_size_bytes (#25469)
803c0e252f is described below

commit 803c0e252fc78a424a181a34a93e689fa9aaaa09
Author: dclandau <94...@users.noreply.github.com>
AuthorDate: Wed Aug 3 07:06:26 2022 +0100

    Fix BaseSQLToGCSOperator approx_max_file_size_bytes (#25469)
    
    * Fix BaseSQLToGCSOperator approx_max_file_size_bytes
    
    When using the parquet file_format, using `tmp_file_handle.tell()`
    always points to the beginning of the file after the data has been saved
    and therefore is not a good indicator for the files current size.
    
    Save the current file pointer position and set the file pointer position
    to `os.SEEK_END`. file_size is set to the new position, and the file
    pointer's position goes back to the saved position.
    
    Currently, after a parquet write operation the pointer is set to 0,
    and therefore, simply executing `tmp_file_handle.tell()` is not
    sufficient to determine the current size. This sequence is added to
    allow file splitting when the export format is set to parquet.
---
 airflow/providers/google/cloud/transfers/sql_to_gcs.py | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
index c204479024..bfee9dd1d2 100644
--- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
@@ -198,6 +198,8 @@ class BaseSQLToGCSOperator(BaseOperator):
             names in GCS, and values are file handles to local files that
             contain the data for the GCS objects.
         """
+        import os
+
         org_schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
         schema = [column for column in org_schema if column not in self.exclude_columns]
 
@@ -250,7 +252,12 @@ class BaseSQLToGCSOperator(BaseOperator):
                 tmp_file_handle.write(b'\n')
 
             # Stop if the file exceeds the file size limit.
-            if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
+            fppos = tmp_file_handle.tell()
+            tmp_file_handle.seek(0, os.SEEK_END)
+            file_size = tmp_file_handle.tell()
+            tmp_file_handle.seek(fppos, os.SEEK_SET)
+
+            if file_size >= self.approx_max_file_size_bytes:
                 file_no += 1
 
                 if self.export_format == 'parquet':