You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/25 21:06:47 UTC

[GitHub] [airflow] arubafna opened a new issue #13898: S3 to GCS to BQ daily incremental data transfer pipeline. Append data from only the incremental (newly added) files from GCS into BQ table?

arubafna opened a new issue #13898:
URL: https://github.com/apache/airflow/issues/13898


   **Description**
   Set up an automated Airflow pipeline for data transfer from S3 to GCS to BQ. 
   
   **Use case / motivation**
   A major requirement of the solution is to trigger the dag daily, to get daily S3 data uploaded into BQ external source partitioned(hive) tables
   
   **Related Issues**
   For the first part, we have used the **"S3ToGoogleCloudStorageOperator"** which imports the files from the source s3 bucket into the destination GCS bucket. This operator ensures to copy only the newly added (incremental) files. In the backend, it returns a list of newly added filenames which is pushed into XCOM and which we then pass onto the 
   **"BigQueryInsertJobOperator"** by **pulling the XCOM**.
   This ensures that only the data from the incremental files is appended into the BQ tables.
   
    **Issue:**
   This works perfectly fine for most of my S3 buckets, however for some cases I receive this error:
   **ERROR - (_mysql_exceptions.DataError) (1406, "Data too long for column 'value' at row 1"
   [SQL: INSERT INTO xcom (`key`, value, timestamp, execution_date, task_id, dag_id) VALUES (%s, %s, %s, %s, %s, %s)**
   Which I realize is because of **XCOM size limitations**. The list of string filenames may be exceeding the limit.
   An example of the list returned: ["partner=doubleverify/sfmt=v1/table=blk/seat=607671/dt=2021-01-15/file=ATT_BLK_ImpID-20210115-07.csv.gz/part-00000-10065608-4a8e-45e3-99df-3f1c7765ed3f-c000.snappy.parquet", .....500 more elements ]
   
   ### Is there any way to override the XCOM size limitation? If not then what changes in the DAG architecture should be made to make the pipeline scalable and ensure that **only the newly added files in GCS are identified to be loaded into BQ**?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal edited a comment on issue #13898: S3 to GCS to BQ daily incremental data transfer pipeline. Append data from only the incremental (newly added) files from GCS into BQ table?

Posted by GitBox <gi...@apache.org>.
eladkal edited a comment on issue #13898:
URL: https://github.com/apache/airflow/issues/13898#issuecomment-768041034


   Xcom has limits. In Airflow 2 you can define a custom xcom backend 
   see [documentation](https://airflow.apache.org/docs/apache-airflow/stable/concepts.html?highlight=xcom#custom-xcom-backend), [polidea article](https://www.polidea.com/blog/airflow-2-0-dag-authoring-redesigned/)
   
   I don't know your use case however from what you described I'm not sure if xcom are even needed here. Why not simply writing the delta (increment) to a folder in GCS in a known path? For example using `{{ ds }}` for daily runs or  `{{ ts_nodash }}` for hourly runs then the loading job can just read the data from this known path without looking in xcoms.
   
   In any case since this is not a bug/feature request but more of a troubleshooting question for a specific ETL I'm closing this issue. If you need further assistance please ask on https://stackoverflow.com/ 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal commented on issue #13898: S3 to GCS to BQ daily incremental data transfer pipeline. Append data from only the incremental (newly added) files from GCS into BQ table?

Posted by GitBox <gi...@apache.org>.
eladkal commented on issue #13898:
URL: https://github.com/apache/airflow/issues/13898#issuecomment-768041034


   Xcom has limits. In Airflow 2 you can define a custom xcom backend 
   see [documentation](https://airflow.apache.org/docs/apache-airflow/stable/concepts.html?highlight=xcom#custom-xcom-backend), [polidea article](https://www.polidea.com/blog/airflow-2-0-dag-authoring-redesigned/)
   
   I don't really know your use case but I'm not really sure if xcom are even needed here. Why not simply writing the delta (increment) to a folder in GCS in a known path? For example using `{{ ds }}` for daily runs or  `{{ ts_nodash }}` for hourly runs then the loading job can just read the data from this known path without looking in xcoms.
   
   In any case since this is not a bug/feature request but more of a troubleshooting question for a specific ETL I'm closing this issue. If you need further assistance please ask on https://stackoverflow.com/ 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #13898: S3 to GCS to BQ daily incremental data transfer pipeline. Append data from only the incremental (newly added) files from GCS into BQ table?

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #13898:
URL: https://github.com/apache/airflow/issues/13898#issuecomment-767112637


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] eladkal closed issue #13898: S3 to GCS to BQ daily incremental data transfer pipeline. Append data from only the incremental (newly added) files from GCS into BQ table?

Posted by GitBox <gi...@apache.org>.
eladkal closed issue #13898:
URL: https://github.com/apache/airflow/issues/13898


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on issue #13898: S3 to GCS to BQ daily incremental data transfer pipeline. Append data from only the incremental (newly added) files from GCS into BQ table?

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #13898:
URL: https://github.com/apache/airflow/issues/13898#issuecomment-767112637


   Thanks for opening your first issue here! Be sure to follow the issue template!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org