You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/08 11:15:56 UTC

[GitHub] [airflow] tanelk opened a new issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn

tanelk opened a new issue #22078:
URL: https://github.com/apache/airflow/issues/22078


   ### Apache Airflow version
   
   main (development)
   
   ### What happened
   
   `ExternalTaskSensorLink` points to a DAG run with `execution_date` same as the `ExternalTaskSensor`.
   
   ### What you expected to happen
   
   `ExternalTaskSensorLink` should point to the actual DAG run it is waiting on.
   
   ### How to reproduce
   
   For example with by modifing this UT:
   ```diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
   index 6e7222ed4..9aa941f9e 100644
   --- a/tests/sensors/test_external_task_sensor.py
   +++ b/tests/sensors/test_external_task_sensor.py
   @@ -413,6 +413,7 @@ def test_external_task_sensor_templated(dag_maker, app):
                task_id='templated_task',
                external_dag_id='dag_{{ ds }}',
                external_task_id='task_{{ ds }}',
   +            execution_delta=timedelta(days=1)
            )
   
        dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE)
   @@ -427,7 +428,7 @@ def test_external_task_sensor_templated(dag_maker, app):
        with app.app_context():
            url = instance.task.get_extra_links(instance, "External DAG")
   
   -        assert f"tree?dag_id=dag_{DEFAULT_DATE.date()}" in url
   +        assert f"tree?dag_id=dag_{DEFAULT_DATE.date()}&execution_date={(DEFAULT_DATE - timedelta(days=1)).date()}" in url
   ```
   
   Curreantly it fails with `E           AssertionError: assert 'tree?dag_id=dag_2015-01-01&execution_date=2014-12-31' in 'http:///tree?dag_id=dag_2015-01-01&execution_date=2015-01-01T00%3A00%3A00%2B00%3A00'`, but should succeed
   
   ### Operating System
   
   Arch Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   Should modify the `ExternalTaskSensorLink.get_link` method.
   The `operator` in there is a `SerializedBaseOperator` and I did not figure out how to access the `execution_delta` and `execution_date_fn` fields from there.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk edited a comment on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn

Posted by GitBox <gi...@apache.org>.
tanelk edited a comment on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1067650427


   Meanwhile these unit tests pass:
   
   ```
   def test_external_task_sensor_link_execution_delta(dag_maker, app):
       with dag_maker():
           ExternalTaskSensor(
               task_id='external_task_sensor',
               external_dag_id='external_dag_id',
               execution_delta=timedelta(days=1),
           )
   
       dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE)
       (instance,) = dagrun.task_instances
       instance.render_templates()
   
       # Verify that the operator link has correct execution_date
       app.config['SERVER_NAME'] = ""
       with app.app_context():
           url = instance.task.get_extra_links(instance, "External DAG")
   
           assert f"tree?dag_id=external_dag_id&execution_date={(DEFAULT_DATE - timedelta(days=1)).date()}" in url
   
   
   def test_external_task_sensor_link_execution_date_fn(dag_maker, app):
       with dag_maker():
           ExternalTaskSensor(
               task_id='external_task_sensor',
               external_dag_id='external_dag_id',
               execution_date_fn=lambda date: date - timedelta(days=1),
           )
   
       dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE)
       (instance,) = dagrun.task_instances
       instance.render_templates()
   
       # Verify that the operator link has correct execution_date
       app.config['SERVER_NAME'] = ""
       with app.app_context():
           url = instance.task.get_extra_links(instance, "External DAG")
   
           assert f"tree?dag_id=external_dag_id&execution_date={(DEFAULT_DATE - timedelta(days=1)).date()}" in url
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1066038666


   I believe those attributes are set on the `SerializedBaseOperator` instance directly. It should be, otherwise the deserialization has a bug…


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1061676752


   Would you like to attempt to make a pr for that one @tanelk  ? Sounds like rather simple fix and you should be able to test it locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn

Posted by GitBox <gi...@apache.org>.
tanelk commented on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1061681406


   I'l quoute myself:
   
   Should modify the `ExternalTaskSensorLink.get_link` method.
   The operator in there is a `SerializedBaseOperator` and I did not figure out how to access the `execution_delta` and `execution_date_fn` fields from there.
   
   So, yeah, I did take a look, but I'm not sure how to access the relevant information.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn

Posted by GitBox <gi...@apache.org>.
tanelk commented on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1067649663


   With this change
   
   ```diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
   index b0ffed69b..0aafb7541 100644
   --- a/airflow/sensors/external_task.py
   +++ b/airflow/sensors/external_task.py
   @@ -42,7 +42,18 @@ class ExternalTaskSensorLink(BaseOperatorLink):
        def get_link(self, operator, dttm):
            ti = TaskInstance(task=operator, execution_date=dttm)
            operator.render_template_fields(ti.get_template_context())
   -        query = {"dag_id": operator.external_dag_id, "execution_date": dttm.isoformat()}
   +
   +        if operator.execution_delta:
   +            execution_date = dttm - operator.execution_delta
   +        elif operator.execution_date_fn:
   +            execution_date = operator._handle_execution_date_fn(context=ti.get_template_context())
   +        else:
   +            execution_date = dttm
   +
   +        query = {
   +            "dag_id": operator.external_dag_id,
   +            "execution_date": execution_date.isoformat()
   +        }
            return build_airflow_url_with_query(query)
   ```
   
   I get this exception when opening the task instance pop-up in the UI:
   ```
   [2022-03-15 07:10:14,468] {app.py:1892} ERROR - Exception on /extra_links [GET]
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2447, in wsgi_app
       response = self.full_dispatch_request()
     File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1952, in full_dispatch_request
       rv = self.handle_user_exception(e)
     File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1821, in handle_user_exception
       reraise(exc_type, exc_value, tb)
     File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
       raise value
     File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1950, in full_dispatch_request
       rv = self.dispatch_request()
     File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1936, in dispatch_request
       return self.view_functions[rule.endpoint](**req.view_args)
     File "/opt/airflow/airflow/www/auth.py", line 51, in decorated
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/www/decorators.py", line 80, in wrapper
       return f(*args, **kwargs)
     File "/opt/airflow/airflow/utils/session.py", line 71, in wrapper
       return func(*args, session=session, **kwargs)
     File "/opt/airflow/airflow/www/views.py", line 3241, in extra_links
       url = task.get_extra_links(ti, link_name)
     File "/opt/airflow/airflow/models/abstractoperator.py", line 268, in get_extra_links
       return link.get_link(self, ti.dag_run.logical_date)  # type: ignore[misc]
     File "/opt/airflow/airflow/sensors/external_task.py", line 46, in get_link
       if operator.execution_delta:
   AttributeError: 'SerializedBaseOperator' object has no attribute 'execution_delta'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1061687083


   Ah - can you use `run_id` for that instead? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] tanelk commented on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn

Posted by GitBox <gi...@apache.org>.
tanelk commented on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1067650427


   Meanwhile these unit tests pass:
   
   ```def test_external_task_sensor_link_execution_delta(dag_maker, app):
       with dag_maker():
           ExternalTaskSensor(
               task_id='external_task_sensor',
               external_dag_id='external_dag_id',
               execution_delta=timedelta(days=1),
           )
   
       dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE)
       (instance,) = dagrun.task_instances
       instance.render_templates()
   
       # Verify that the operator link has correct execution_date
       app.config['SERVER_NAME'] = ""
       with app.app_context():
           url = instance.task.get_extra_links(instance, "External DAG")
   
           assert f"tree?dag_id=external_dag_id&execution_date={(DEFAULT_DATE - timedelta(days=1)).date()}" in url
   
   
   def test_external_task_sensor_link_execution_date_fn(dag_maker, app):
       with dag_maker():
           ExternalTaskSensor(
               task_id='external_task_sensor',
               external_dag_id='external_dag_id',
               execution_date_fn=lambda date: date - timedelta(days=1),
           )
   
       dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE)
       (instance,) = dagrun.task_instances
       instance.render_templates()
   
       # Verify that the operator link has correct execution_date
       app.config['SERVER_NAME'] = ""
       with app.app_context():
           url = instance.task.get_extra_links(instance, "External DAG")
   
           assert f"tree?dag_id=external_dag_id&execution_date={(DEFAULT_DATE - timedelta(days=1)).date()}" in url
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org