You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/06 18:00:48 UTC

[GitHub] [airflow] josh-fell opened a new pull request #16837: Updating example DAG files to latest syntax via Taskflow API

josh-fell opened a new pull request #16837:
URL: https://github.com/apache/airflow/pull/16837


   Closes #10285
   
   - Updated example DAG files such that `xcom_pull()` calls use an operator's `.output` property as well access of `TaskInstance` objects from context to use `get_current_context()` function
   - Added comments to which task dependencies, if any, are handled and/or created via `XComArgs` for transparency
   - Removed or refactored the `default_args` pattern where necessary as requested by @ashb (i.e. removed a separated `default_args` declaration for deference for declaration as part of the `DAG` object)
   - Other miscellaneous updates based on `.output` refactoring
   
   >**Note:** There are several instances where the `xcom_pull()` call was not updated.  These instances involve accessing a specific value within the `XCom` or calling user-defined macros with an `XCom` value.  Reference #16618 for an open issue to enhance the `XComArg` functionality to provide similar behavior as the classic `xcom_pull()` method.
   
   > **Note:** Not all DAGs were tested functionally (i.e. with hard integrations to source systems and executed), however each DAG was tested to compile and generate a DAG graph as expected locally.
   
   An detailed summary of all changes made as part of this PR can be found below:
   | DAG File | Converted `xcom_pull()`? | Other Updates? | Comments |
   | ---------| ------------------------ | -------------- | -------- |
   | airflow/example_dags/example_bash_operator.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_branch_datetime_operator.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_branch_day_of_week_operator.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_branch_operator.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_branch_python_dop_operator_3.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_dag_decorator.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_kubernetes_executor_config.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_kubernetes_executor.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_passing_params_via_test_command.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_python_operator.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_python_operator.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_short_circuit_operator.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_skip_dag.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_trigger_controller_dag.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_trigger_target_dag.py | No | Yes | Removed unneeded `default_args` pattern. </br></br>Updated to use `get_current_context()`.|
   | airflow/example_dags/example_xcom.py | Yes | Yes | Included examples for context retrieval using `get_current_context()` based on previous PR comments to show new and old means of `XCom` access.<br/><br/>Removed explicit task dependencies that are created via `XComArgs`. <br/><br/>Removed unneeded `default_args` pattern. |
   | airflow/example_dags/example_xcomargs.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/tutorial_etl_dag.py | No | No | Not updated based on earlier comment to keep this file unchanged during initial PR. |
   | airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/example_dags/tutorial_taskflow_api_etl.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`.<br/><br/>Removed unneeded `default_args` pattern. |
   | airflow/providers/amazon/aws/example_dags/example_dms_full_load_task.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`.</br></br>Refactored  `default_args` pattern. |
   | airflow/providers/amazon/aws/example_dags/example_dms_full_load_task.py | No | Yes | Refactored  `default_args` pattern. |
   | airflow/providers/amazon/aws/example_dags/example_emr_job_flow_automatic_steps.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. </br></br>Refactored  `default_args` pattern. |
   airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.py |  Yes ** | Yes | ** Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update this occurrence: <br/>`"{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}"`. <br/><br/> All other `xcom_pull()` calls have been updated.<br/><br/>Removed explicit task dependencies that are created via `XComArgs`. </br></br>Refactored  `default_args` pattern. |
   | airflow/providers/amazon/aws/example_dags/example_glacier_to_gcs.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value.  Could not update any occurrences in this DAG. |
   | airflow/providers/amazon/aws/example_dags/example_google_api_to_s3_transfer_advanced.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. </br></br>Updated to use `get_current_context()`. |
   | airflow/providers/apache/beam/example_dags/example_beam.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value.  Could not update any occurrences in this DAG. |
   | airflow/providers/apache/cassandra/example_dags/example_cassandra_dag.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/providers/apache/hive/example_dags/example_twitter_dag.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/apache/kylin/example_dags/example_kylin_dag.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. <br/><br/>Removed unneeded `default_args` pattern. |
   | airflow/providers/apache/pig/example_dags/example_pig.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/providers/apache/spark/example_dags/example_spark_dag.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/providers/asana/example_dags/example_asana.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/providers/cncf/kubernetes/example_dags/example_kubernetes.py | No | Yes | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update this occurrence:<br/>`"echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\""`. <br/><br/>Removed unneeded `default_args` pattern. |
   | airflow/providers/cncf/kubernetes/example_dags/example_spark_kubernetes.py | No | Yes | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update this occurrence:<br/>`"{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}"`. <br/><br/>Refactored `default_args` pattern. |
   | airflow/providers/asana/example_dags/example_asana.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/providers/dingding/example_dags/example_dingding.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/docker/example_dags/example_docker_copy_data.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. <br/><br/>Refactored `default_args` pattern. </br></br>Refactored `ShortCircuitOperator` callable logic. |
   | airflow/providers/docker/example_dags/example_docker_swarm.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/docker/example_dags/example_docker.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/google/cloud/example_dags/example_automl_nl_text_classification.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_automl_nl_text_extraction.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_automl_nl_text_sentiment.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_automl_tables.py | Yes ** | Yes | ** Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update the following occurrences: <br/>`"{{ extract_object_id(task_instance.xcom_pull('list_tables_spec_task')[0]) }}"`<br/>`'{{ task_instance.xcom_pull("create_dataset_task")["name"] }}'`</br>`"{{ get_target_column_spec(task_instance.xcom_pull('list_columns_spec_task'), target) }}"`<br/><br/>All other `xcom_pull()` calls have been updated. <br/><br/> Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_automl_translation.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_automl_video_intelligence_classification.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_automl_video_intelligence_tracking.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_automl_vision_classification.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_automl_vision_object_detection.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_azure_fileshare_to_gcs.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/google/cloud/example_dags/example_bigquery_dts.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_bigquery_operations.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
   | airflow/providers/google/cloud/example_dags/example_bigquery_queries.py | Yes | Yes | Added globals assignment so both example DAGs are exposed rather than only the last one in the loop. |
   | airflow/providers/google/cloud/example_dags/example_cloud_build.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
   | airflow/providers/google/cloud/example_dags/example_cloud_memorystore.py | Yes ** | No | ** Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update this occurrence: <br/>`"user-{{ task_instance.xcom_pull('get-instance')['persistence_iam_identity']"` <br/><br/>All other `xcom_pull()` calls have been updated. |
   | airflow/providers/google/cloud/example_dags/example_datacatalog.py | Yes ** | Yes | ** Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update this occurrence: </br>`"echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\""` <br/><br/>All other `xcom_pull()` calls have been updated. |
   | airflow/providers/google/cloud/example_dags/example_cloud_sql.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. </br></br>Removed duplicate/redundant task. |
   | airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_aws.py | No | Yes | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update any occurrences in this DAG. </br></br>Removed unused `default_args` variable. |
   | airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
   | airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_gcp.py | Yes ** | Yes | ** Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update this occurrence: <br/>`"echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\""` </br></br>All other `xcom_pull()` calls have been updated. |
   | airflow/providers/google/cloud/example_dags/example_dataflow.py| No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
   | airflow/providers/google/cloud/example_dags/example_dataproc.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_datastore.py | Yes ** | Yes | ** Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update these occurrences: </br>` "{{task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}"` </br>`"{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}"` </br></br>All other `xcom_pull()` calls have been updated. </br></br>Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_functions.py | No | Yes | Added `default_args` to DAG as there was logic to populate the dict but not being applied. |
   | airflow/providers/google/cloud/example_dags/example_dlp.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
   | airflow/providers/google/cloud/example_dags/example_gcs.py | Yes | No ||
   | airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
   | airflow/providers/google/cloud/example_dags/example_mlengine.py | Yes | Yes | Removed unneeded `default_args` pattern. |
   | airflow/providers/google/cloud/example_dags/example_natural_language.py | Yes | No ||
   | airflow/providers/google/cloud/example_dags/example_pubsub.py | Yes ** | Yes | ** Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update the following occurrence: <br/>`"""{% for m in task_instance.xcom_pull('pull_messages') %} echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}" {% endfor %}"""`<br/><br/>All other `xcom_pull()` calls have been updated. <br/><br/> Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_stackdriver.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
   | airflow/providers/google/cloud/example_dags/example_tasks.py | Yes | No ||
   | airflow/providers/google/cloud/example_dags/example_translate.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
   | airflow/providers/google/cloud/example_dags/example_video_intelligence.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
   | airflow/providers/google/cloud/example_dags/example_vision.py | Yes ** | Yes | ** Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update the following occurrences: <br/>`"echo {{ task_instance.xcom_pull('annotate_image')['logoAnnotations'][0]['description'] }}"` </br>`"echo {{ task_instance.xcom_pull('detect_text')['textAnnotations'][0] }}"` </br>`"echo {{ task_instance.xcom_pull('document_detect_text')['textAnnotations'][0] }}"` </br>`"echo {{ task_instance.xcom_pull('detect_labels')['labelAnnotations'][0] }}"`<br/><br/>All other `xcom_pull()` calls have been updated. <br/><br/> Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/cloud/example_dags/example_workflows.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/leveldb/example_dags/example_leveldb.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/providers/google/marketing_platform/example_dags/example_campaign_manager.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
   | airflow/providers/google/marketing_platform/example_dags/example_display_video.py | Yes ** | Yes | ** Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update the following occurrence: <br/>`'{{ task_instance.xcom_pull("create_sdf_download_task")["name"] }}'`<br/><br/>All other `xcom_pull()` calls have been updated. <br/><br/> Removed explicit task dependencies that are created via `XComArgs`. </br></br>Updated this instance to reference the correct task rather than an non-existant one:</br>`'{{ task_instance.xcom_pull("upload_sdf_to_bigquery")}}'`|
   | airflow/providers/google/marketing_platform/example_dags/example_search_ads.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. |
   | airflow/providers/google/suite/example_dags/example_gcs_to_sheets.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update any occurrences in this DAG. |
   | airflow/providers/google/suite/example_dags/example_sheets.py | Yes ** | No | ** Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly. Did not update the following occurrences: <br/>`"{{ task_instance.xcom_pull('upload_sheet_to_gcs')[0] }}"`<br/><br/>All other `xcom_pull()` calls have been updated. |
   | airflow/providers/http/example_dags/example_http.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/jdbc/example_dags/example_jdbc_queries.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/jenkins/example_dags/example_jenkins_job_trigger.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. </br></br>Refactored `grab_artifact_from_jenkins()` function to take an input `XComArgs` from a previous task. </br></br>Refactored `default_args` pattern. |
   | airflow/providers/microsoft/azure/example_dags/example_azure_container_instances.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/microsoft/azure/example_dags/example_azure_cosmosdb.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/microsoft/winrm/example_dags/example_winrm.py | No | Yes | Removed `default_args` pattern. |
   | airflow/providers/mysql/example_dags/example_mysql.py | No | Yes | Removed `default_args` pattern. |
   | airflow/providers/neo4j/example_dags/example_neo4j.py | No | Yes | Removed `default_args` pattern. |
   | airflow/providers/papermill/example_dags/example_papermill.py | No | Yes | Removed `default_args` pattern. |
   | airflow/providers/plexus/example_dags/example_plexus.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/postgres/example_dags/example_postgres.py | No | Yes | Removed `default_args` pattern. |
   | airflow/providers/qubole/example_dags/example_qubole.py | No | Yes | Updated to use `get_current_context()`. |
   | airflow/providers/singularity/example_dags/example_singularity.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/snowflake/example_dags/example_snowflake.py | No | Yes | Removed `default_args` pattern. |
   | airflow/providers/sqlite/example_dags/example_sqlite.py | No | Yes | Removed `default_args` pattern. |
   | airflow/providers/tableau/example_dags/example_tableau_refresh_workbook.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/telegram/example_dags/example_telegram.py | No | Yes | Removed `default_args` pattern. |
   | airflow/providers/yandex/example_dags/example_yandexcloud_dataproc.py | No | Yes | Removed `default_args` pattern. |
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell closed pull request #16837: Updating example DAG files to latest syntax via Taskflow API

Posted by GitBox <gi...@apache.org>.
josh-fell closed pull request #16837:
URL: https://github.com/apache/airflow/pull/16837


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #16837: Updating example DAG files to latest syntax via Taskflow API

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #16837:
URL: https://github.com/apache/airflow/pull/16837#issuecomment-875004304


   I think this PR should be broken down into smaller PRs. 
   My suggestion:
   1 PR for core examples.
   1 PR for each of the provider's example dags.
   
   Reviewing 86 files is not going to be easy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #16837: Updating example DAG files to latest syntax via Taskflow API

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #16837:
URL: https://github.com/apache/airflow/pull/16837#issuecomment-875011456


   > I think this PR should be broken down into smaller PRs.
   > My suggestion:
   > 1 PR for core examples.
   > 1 PR for each of the provider's example dags.
   > 
   > Reviewing 86 files is not going to be easy.
   
   Solid suggestion. I will close this PR and open smaller, more manageable ones.  Thanks for the feedback!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org