You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/08/12 14:30:33 UTC

[GitHub] [airflow] sanxore opened a new pull request, #25691: don't convert date to iso string format if export format is parquet in PostgresToGCSOperator

sanxore opened a new pull request, #25691:
URL: https://github.com/apache/airflow/pull/25691

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   - Issue description:
   In `PostgresToGCSOperator` operator we can't write DATE column when `export_format="parquet"` because at `convert_type` method the date data result from postgres are converted to string so pyarrow raise this exception : `pyarrow.lib.ArrowTypeError: object of type <class 'str'> cannot be converted to int` 
   
   - Solution:
   Don't convert date to string when export format is parquet.
   
   
   - Here my solution and screen of DagRun:
   
   ```python
   import datetime
   import json
   import os
   import time
   from decimal import Decimal
   
   import pendulum
   from airflow import DAG
   from airflow.providers.google.cloud.transfers.postgres_to_gcs import PostgresToGCSOperator
   from common_v2.settings import GCS_BUCKET
   
   DEFAULT_ARGS = {
       "start_date": datetime.datetime(2022, 8, 12),
       "retries": 1,
       "retry_delay": datetime.timedelta(minutes=1),
   }
   
   
   class PostgresToGCSFixedOperator(PostgresToGCSOperator):
       def __init__(self, **kwargs):
           super().__init__(**kwargs)
   
       def convert_type(self, value, schema_type, stringify_dict=True):
           """
           Takes a value from Postgres, and converts it to a value that's safe for
           JSON/Google Cloud Storage/BigQuery.
           Timezone aware Datetime are converted to UTC seconds.
           Unaware Datetime, Date and Time are converted to ISO formatted strings.
           Decimals are converted to floats.
   
           :param value: Postgres column value.
           :param schema_type: BigQuery data type.
           :param stringify_dict: Specify whether to convert dict to string.
           """
           if isinstance(value, datetime.datetime):
               iso_format_value = value.isoformat()
               if value.tzinfo is None:
                   return iso_format_value
               return pendulum.parse(iso_format_value).float_timestamp
   
           if self.export_format != "parquet":
               if isinstance(value, datetime.date):
                   return value.isoformat()
               if isinstance(value, datetime.time):
                   formatted_time = time.strptime(str(value), "%H:%M:%S")
                   time_delta = datetime.timedelta(
                       hours=formatted_time.tm_hour, minutes=formatted_time.tm_min, seconds=formatted_time.tm_sec
                   )
                   return str(time_delta)
   
           if stringify_dict and isinstance(value, dict):
               return json.dumps(value)
           if isinstance(value, Decimal):
               return float(value)
           return value
   
   
   with DAG(
       "date_parquet_issue",
       default_args=DEFAULT_ARGS,
       schedule_interval=None,
   ) as dag:
       date_pg_to_gcs_issue = PostgresToGCSOperator(
           task_id=f"date_pg_to_gcs_issue",
           postgres_conn_id="postgres",
           sql="SELECT CURRENT_DATE;",
           bucket=GCS_BUCKET,
           filename=os.path.join("data/date_pg_to_gcs_issue", "part_{}.parquet"),
           export_format="parquet",
       )
   
       date_pg_to_gcs_fixed = PostgresToGCSFixedOperator(
           task_id=f"date_pg_to_gcs_fixed",
           postgres_conn_id="postgres",
           sql="SELECT CURRENT_DATE;",
           bucket=GCS_BUCKET,
           filename=os.path.join("data/date_pg_to_gcs_fixed", "part_{}.parquet"),
           export_format="parquet",
       )
   ```
   ![image](https://user-images.githubusercontent.com/14028677/184374167-033771c6-40a0-4e9e-8863-63e471117533.png)
   
   - Fixed Operator write data to Parquet with the right format (physical_type: INT32, and logical_type: Date):
   ![image](https://user-images.githubusercontent.com/14028677/184374885-59fef294-0cd3-43b3-b4d2-faed17f86cbe.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] closed pull request #25691: don't convert date to iso string format if export format is parquet in PostgresToGCSOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #25691: don't convert date to iso string format if export format is parquet in PostgresToGCSOperator
URL: https://github.com/apache/airflow/pull/25691


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25691: don't convert date to iso string format if export format is parquet in PostgresToGCSOperator

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25691:
URL: https://github.com/apache/airflow/pull/25691#issuecomment-1228994485

   needs rebase and test fixes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sanxore commented on pull request #25691: don't convert date to iso string format if export format is parquet in PostgresToGCSOperator

Posted by GitBox <gi...@apache.org>.
sanxore commented on PR #25691:
URL: https://github.com/apache/airflow/pull/25691#issuecomment-1220286818

   > > If I remember correctly we had to modify things to be able to export them to `BigQuery` with the correct field type.
   > > 
   > > * You are updating the `date` case, do we also want not to isoformat `datetime` in this case ? [here](https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/transfers/postgres_to_gcs.py#L143)
   > > * Is is possible to confirm that these fields are getting exported with the correct type to bigquery with this modification ? (DATETIME, DATE and TIME)
   > 
   > * we have the issue with datetime too. ( I will add the fix to the PR )
   > * This Operator have no interaction with BigQuery. The goal of this Operator is to write data to GCS file system with the right type if we write data to a binary format ( like parquet) or with the right format if we write data to a string format ( like csv )
   > 
   > Converting time, detetime and date to a string format had broken the Operator for parquet as export format
   
   Yes i confirm this field will have the right type in BigQuery. If you see my terminal screenshot I parsed the schema of parquet file I exported. 
   The physical type is INT32 an logical type is date is exactly what's date for BigQuery in we use a `bq load` for managed and external table on parquet files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #25691: don't convert date to iso string format if export format is parquet in PostgresToGCSOperator

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #25691:
URL: https://github.com/apache/airflow/pull/25691#issuecomment-1273935705

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sanxore commented on pull request #25691: don't convert date to iso string format if export format is parquet in PostgresToGCSOperator

Posted by GitBox <gi...@apache.org>.
sanxore commented on PR #25691:
URL: https://github.com/apache/airflow/pull/25691#issuecomment-1220290640

   > > * This Operator have no interaction with BigQuery. The goal of this Operator is to write data to GCS file system with the right type if we write data to a binary format ( like parquet) or with the right format if we write data to a string format ( like csv )
   > 
   > The schema generated by this operator use type that are safe for BigQuery (`_write_local_schema_file` will for instance use `field_to_bigquery`, using the underlying `type_map` mapping db types to BigQuery types). This makes sure we can load into `BigQuery` all data exported with a `BaseSQLToGCSOperator`.
   > 
   > Parquet has on top of that an additional mapping, for mapping BigQuery types to pyarrow types. (See `_convert_parquet_schema`).
   > 
   > I would expect parquet export to be successful when columns are dates, but also be able to import this into bigquery with a correct schema definition. (This is how it works for `csv` and `json` export format)
   > 
   > _Note: I took a quick look, and couldn’t find what changed. On my bucket I found working extract from 19 April 2022, with `PostgresToGCSOperator` to parquet format with `date` and `datetime` in the schema. 🤔_
   
   But unfortunately we don't have unit tests on parquet format which can confirm that parquet format was working :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sanxore commented on pull request #25691: don't convert date to iso string format if export format is parquet in PostgresToGCSOperator

Posted by GitBox <gi...@apache.org>.
sanxore commented on PR #25691:
URL: https://github.com/apache/airflow/pull/25691#issuecomment-1219989590

   > If I remember correctly we had to modify things to be able to export them to `BigQuery` with the correct field type.
   > 
   > * You are updating the `date` case, do we also want not to isoformat `datetime` in this case ? [here](https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/transfers/postgres_to_gcs.py#L143)
   > * Is is possible to confirm that these fields are getting exported with the correct type to bigquery with this modification ? (DATETIME, DATE and TIME)
   
   - I didn't check if have the issue with timestamp too.
   
   - This Operator have no interaction with BigQuery. The goal of this Operator is to write data to GCS file system with the right type if we write data to a binary format ( like parquet) or with the right format if we write data to a string format ( like csv )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on pull request #25691: don't convert date to iso string format if export format is parquet in PostgresToGCSOperator

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on PR #25691:
URL: https://github.com/apache/airflow/pull/25691#issuecomment-1220097978

   I took a look. Can't find what changed. I found working extract from August 2022, with `PostgresToGCSOperator` to parquet format with `date` and `datetime` in the schema. :thinking:
   
   The schema generated by this operator use type that are safe for BigQuery (`_write_local_schema_file` will for instance use `field_to_bigquery`, using the underlying `type_map` mapping db types to BigQuery  types). This makes sure we can import into `BigQuery` an exported Schema to `GCS`.
   
   Parquet has on top of that an additional mapping, for mapping BigQuery types to pyarrow types. (See `_convert_parquet_schema`).
   
   I would expect parquet export to be successful when columns are dates, but also be able to import this to bigquery with a correct schema definition. (This is how it works for csv and json export)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] pierrejeambrun commented on pull request #25691: don't convert date to iso string format if export format is parquet in PostgresToGCSOperator

Posted by GitBox <gi...@apache.org>.
pierrejeambrun commented on PR #25691:
URL: https://github.com/apache/airflow/pull/25691#issuecomment-1219925046

   If I remember correctly we had to modify things here to be able to put the data into BigQuery with the correct format.
   
   - Can you confirm that these fields are getting exported with the correct type to bigquery with this modification ?
   - You are updating the `date` case, do we also want not to isoformat `datetime` is this case ? [here](https://github.com/apache/airflow/pull/25691/files#diff-94ba72b037053ca871b197e5071ad2b5904ab176ebbb62c8ed75dadc7f8e597fL143) 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sanxore commented on pull request #25691: don't convert date to iso string format if export format is parquet in PostgresToGCSOperator

Posted by GitBox <gi...@apache.org>.
sanxore commented on PR #25691:
URL: https://github.com/apache/airflow/pull/25691#issuecomment-1220298819

   > > * This Operator have no interaction with BigQuery. The goal of this Operator is to write data to GCS file system with the right type if we write data to a binary format ( like parquet) or with the right format if we write data to a string format ( like csv )
   > 
   > The schema generated by this operator use type that are safe for BigQuery (`_write_local_schema_file` will for instance use `field_to_bigquery`, using the underlying `type_map` mapping db types to BigQuery types). This makes sure we can load into `BigQuery` all data exported with a `BaseSQLToGCSOperator`.
   > 
   > Parquet has on top of that an additional mapping, for mapping BigQuery types to pyarrow types. (See `_convert_parquet_schema`).
   > 
   > I would expect parquet export to be successful when columns are dates, but also be able to import this into bigquery with a correct schema definition. (This is how it works for `csv` and `json` export format)
   > 
   > _Note: I took a quick look, and couldn’t find what changed. On my bucket I found working extract from 19 April 2022, with `PostgresToGCSOperator` to parquet format with `date` and `datetime` in the schema. 🤔_
   
   We should not compare how json and csv format work with parquet format. Because it's two different different files types, json and csv file are string serialized format and parquet is a binary format which make parquet a type aware format. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #25691: don't convert date to iso string format if export format is parquet in PostgresToGCSOperator

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #25691:
URL: https://github.com/apache/airflow/pull/25691#issuecomment-1213177083

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org