You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/08/03 06:06:45 UTC
[airflow] branch main updated: Fix BaseSQLToGCSOperator approx_max_file_size_bytes (#25469)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 803c0e252f Fix BaseSQLToGCSOperator approx_max_file_size_bytes (#25469)
803c0e252f is described below
commit 803c0e252fc78a424a181a34a93e689fa9aaaa09
Author: dclandau <94...@users.noreply.github.com>
AuthorDate: Wed Aug 3 07:06:26 2022 +0100
Fix BaseSQLToGCSOperator approx_max_file_size_bytes (#25469)
* Fix BaseSQLToGCSOperator approx_max_file_size_bytes
When using the parquet file_format, using `tmp_file_handle.tell()`
always points to the beginning of the file after the data has been saved
and therefore is not a good indicator for the files current size.
Save the current file pointer position and set the file pointer position
to `os.SEEK_END`. file_size is set to the new position, and the file
pointer's position goes back to the saved position.
Currently, after a parquet write operation the pointer is set to 0,
and therefore, simply executing `tmp_file_handle.tell()` is not
sufficient to determine the current size. This sequence is added to
allow file splitting when the export format is set to parquet.
---
airflow/providers/google/cloud/transfers/sql_to_gcs.py | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/google/cloud/transfers/sql_to_gcs.py b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
index c204479024..bfee9dd1d2 100644
--- a/airflow/providers/google/cloud/transfers/sql_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/sql_to_gcs.py
@@ -198,6 +198,8 @@ class BaseSQLToGCSOperator(BaseOperator):
names in GCS, and values are file handles to local files that
contain the data for the GCS objects.
"""
+ import os
+
org_schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
schema = [column for column in org_schema if column not in self.exclude_columns]
@@ -250,7 +252,12 @@ class BaseSQLToGCSOperator(BaseOperator):
tmp_file_handle.write(b'\n')
# Stop if the file exceeds the file size limit.
- if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
+ fppos = tmp_file_handle.tell()
+ tmp_file_handle.seek(0, os.SEEK_END)
+ file_size = tmp_file_handle.tell()
+ tmp_file_handle.seek(fppos, os.SEEK_SET)
+
+ if file_size >= self.approx_max_file_size_bytes:
file_no += 1
if self.export_format == 'parquet':