You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/07/08 01:45:41 UTC

[GitHub] [airflow] josh-fell opened a new pull request #16869: Updating Apache example DAGs to use XComArgs

josh-fell opened a new pull request #16869:
URL: https://github.com/apache/airflow/pull/16869


   Related to #10285
   
   - Updated example DAG files such that `xcom_pull()` calls use an operator's `.output` property as well access of `TaskInstance` objects from context to use `get_current_context()` function
   - Added comments to which task dependencies, if any, are handled and/or created via `XComArgs` for transparency
   - Removed or refactored the `default_args` pattern where necessary as requested by @ashb (i.e. removed a separated `default_args` declaration for deference for declaration as part of the `DAG` object)
   - Other miscellaneous updates based on `.output` refactoring
   
   >**Note:** There are several instances where the `xcom_pull()` call was not updated.  These instances involve accessing a specific value within the `XCom` or calling user-defined macros with an `XCom` value.  Reference #16618 for an open issue to enhance the `XComArg` functionality to provide similar behavior as the classic `xcom_pull()` method.
   
   > **Note:** Not all DAGs were tested functionally (i.e. with hard integrations to source systems and executed), however each DAG was tested to compile and generate a DAG graph as expected locally.
   
   An detailed summary of all changes made as part of this PR can be found below:
   | DAG File | Converted `xcom_pull()`? | Other Updates? | Comments |
   | ---------| ------------------------ | -------------- | -------- |
   | airflow/providers/apache/beam/example_dags/example_beam.py | No | No | Current `XComArg` object does not support accessing specific values of an iterable `XCom` value elegantly.  Could not update any occurrences in this DAG. |
   | airflow/providers/apache/cassandra/example_dags/example_cassandra_dag.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/providers/apache/hive/example_dags/example_twitter_dag.py | No | Yes | Refactored `default_args` pattern. |
   | airflow/providers/apache/kylin/example_dags/example_kylin_dag.py | Yes | Yes | Removed explicit task dependencies that are created via `XComArgs`. <br/><br/>Removed unneeded `default_args` pattern. |
   | airflow/providers/apache/pig/example_dags/example_pig.py | No | Yes | Removed unneeded `default_args` pattern. |
   | airflow/providers/apache/spark/example_dags/example_spark_dag.py | No | Yes | Removed unneeded `default_args` pattern. |
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #16869: Updating Apache example DAGs to use XComArgs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #16869:
URL: https://github.com/apache/airflow/pull/16869#issuecomment-885683207


   @kaxil Only 1 test failing after the rebase but for the same reason mentioned in [this comment ](https://github.com/apache/airflow/issues/17088#issuecomment-883032995) for other MSSQL failures.  How would you like me to proceed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #16869: Updating Apache example DAGs to use XComArgs

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #16869:
URL: https://github.com/apache/airflow/pull/16869


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on pull request #16869: Updating Apache example DAGs to use XComArgs

Posted by GitBox <gi...@apache.org>.
kaxil commented on pull request #16869:
URL: https://github.com/apache/airflow/pull/16869#issuecomment-885277169


   tests are failing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #16869: Updating Apache example DAGs to use XComArgs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #16869:
URL: https://github.com/apache/airflow/pull/16869#discussion_r674218931



##########
File path: airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
##########
@@ -57,8 +52,8 @@ def gen_build_time(**kwargs):
     project='learn_kylin',
     cube='kylin_sales_cube',
     command='build',
-    start_time="{{ task_instance.xcom_pull(task_ids='gen_build_time',key='date_start') }}",
-    end_time="{{ task_instance.xcom_pull(task_ids='gen_build_time',key='date_end') }}",
+    start_time=gen_build_time_task.output['date_start'],
+    end_time=gen_build_time_task.output['date_end'],

Review comment:
       Not specifically with Kylin since I don't have a Kylin integration handy but this test functions as expected:
   
   ```python
   def gen_build_time(**kwargs):
       """
       Gen build time and push to xcom
       :param kwargs:
       :return:
       """
       ti = kwargs["ti"]
       ti.xcom_push(key="date_start", value="1325347200000")
       ti.xcom_push(key="date_end", value="1325433600000")
   
   
   def print_stuff(date_start, date_end):
       print(date_start)
       print(date_end)
   
   
   dag = DAG("mwe", start_date=days_ago(14))
   
   with dag:
       op = PythonOperator(task_id="gen_build_time", python_callable=gen_build_time)
       op2 = PythonOperator(
           task_id="print_stuff",
           python_callable=print_stuff,
           op_kwargs={"date_start": op.output["date_start"], "date_end": op.output["date_end"]},
       )
   ```
   
   ![image](https://user-images.githubusercontent.com/48934154/126538640-67b81e31-9517-41aa-b23f-ba1fa91e74fa.png)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #16869: Updating Apache example DAGs to use XComArgs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #16869:
URL: https://github.com/apache/airflow/pull/16869#discussion_r674218931



##########
File path: airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
##########
@@ -57,8 +52,8 @@ def gen_build_time(**kwargs):
     project='learn_kylin',
     cube='kylin_sales_cube',
     command='build',
-    start_time="{{ task_instance.xcom_pull(task_ids='gen_build_time',key='date_start') }}",
-    end_time="{{ task_instance.xcom_pull(task_ids='gen_build_time',key='date_end') }}",
+    start_time=gen_build_time_task.output['date_start'],
+    end_time=gen_build_time_task.output['date_end'],

Review comment:
       Not specifically with Kylin since I don't have a Kylin integration handy but this test functions as expected:
   
   ```python
   def gen_build_time(**kwargs):
       """
       Gen build time and push to xcom
       :param kwargs:
       :return:
       """
       ti = kwargs["ti"]
       ti.xcom_push(key="date_start", value="1325347200000")
       ti.xcom_push(key="date_end", value="1325433600000")
   
   
   def gen_build_time(**kwargs):
       """
       Gen build time and push to xcom
       :param kwargs:
       :return:
       """
       ti = kwargs["ti"]
       ti.xcom_push(key="date_start", value="1325347200000")
       ti.xcom_push(key="date_end", value="1325433600000")
   
   
   def print_stuff(date_start, date_end):
       print(date_start)
       print(date_end)
   
   
   dag = DAG("mwe", start_date=days_ago(14))
   
   with dag:
       op = PythonOperator(task_id="gen_build_time", python_callable=gen_build_time)
       op2 = PythonOperator(
           task_id="print_stuff",
           python_callable=print_stuff,
           op_kwargs={"date_start": op.output["date_start"], "date_end": op.output["date_end"]},
       )
   
   
   
   dag = DAG("mwe", start_date=days_ago(14))
   
   with dag:
       op = PythonOperator(task_id="gen_build_time", python_callable=gen_build_time)
       op2 = PythonOperator(
           task_id="print_stuff",
           python_callable=print_stuff,
           op_kwargs={"date_start": op.output["date_start"], "date_end": op.output["date_end"]},
       )
   ```
   
   ![image](https://user-images.githubusercontent.com/48934154/126538640-67b81e31-9517-41aa-b23f-ba1fa91e74fa.png)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #16869: Updating Apache example DAGs to use XComArgs

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #16869:
URL: https://github.com/apache/airflow/pull/16869#issuecomment-885277321


   The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #16869: Updating Apache example DAGs to use XComArgs

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #16869:
URL: https://github.com/apache/airflow/pull/16869#issuecomment-885779167


   > @kaxil Only 1 test failing after the rebase but for the same reason mentioned in [this comment ](https://github.com/apache/airflow/issues/17088#issuecomment-883032995) for other MSSQL failures. How would you like me to proceed?
   
   You do not need to do anything. We - hopefully - will fix soon the mssql problem (it is still a bit experimental) but in the meantime we can merge PRs that failed with only "known" failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on pull request #16869: Updating Apache example DAGs to use XComArgs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on pull request #16869:
URL: https://github.com/apache/airflow/pull/16869#issuecomment-885350906


   > tests are failing
   
   These failures look to be resource-related. Rebasing and let's see how the new test runs turn out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] josh-fell commented on a change in pull request #16869: Updating Apache example DAGs to use XComArgs

Posted by GitBox <gi...@apache.org>.
josh-fell commented on a change in pull request #16869:
URL: https://github.com/apache/airflow/pull/16869#discussion_r674218931



##########
File path: airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
##########
@@ -57,8 +52,8 @@ def gen_build_time(**kwargs):
     project='learn_kylin',
     cube='kylin_sales_cube',
     command='build',
-    start_time="{{ task_instance.xcom_pull(task_ids='gen_build_time',key='date_start') }}",
-    end_time="{{ task_instance.xcom_pull(task_ids='gen_build_time',key='date_end') }}",
+    start_time=gen_build_time_task.output['date_start'],
+    end_time=gen_build_time_task.output['date_end'],

Review comment:
       Not specifically with Kylin since I don't have a Kylin integration handy but this test functions as expected:
   
   ```python
   def gen_build_time(**kwargs):
       """
       Gen build time and push to xcom
       :param kwargs:
       :return:
       """
       ti = kwargs["ti"]
       ti.xcom_push(key="date_start", value="1325347200000")
       ti.xcom_push(key="date_end", value="1325433600000")
   
   
   def print_stuff(date_start, date_end):
       print(date_start)
       print(date_end)
   
   
   dag = DAG("mwe", start_date=days_ago(14))
   
   with dag:
       op = PythonOperator(task_id="gen_build_time", python_callable=gen_build_time)
       op2 = PythonOperator(
           task_id="print_stuff",
           python_callable=print_stuff,
           op_kwargs={"date_start": op.output["date_start"], "date_end": op.output["date_end"]},
       )
   ```
   
   ![image](https://user-images.githubusercontent.com/48934154/126538640-67b81e31-9517-41aa-b23f-ba1fa91e74fa.png)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #16869: Updating Apache example DAGs to use XComArgs

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #16869:
URL: https://github.com/apache/airflow/pull/16869#discussion_r674203054



##########
File path: airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
##########
@@ -57,8 +52,8 @@ def gen_build_time(**kwargs):
     project='learn_kylin',
     cube='kylin_sales_cube',
     command='build',
-    start_time="{{ task_instance.xcom_pull(task_ids='gen_build_time',key='date_start') }}",
-    end_time="{{ task_instance.xcom_pull(task_ids='gen_build_time',key='date_end') }}",
+    start_time=gen_build_time_task.output['date_start'],
+    end_time=gen_build_time_task.output['date_end'],

Review comment:
       This currently does not work if the key is different, have you tested this!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org