You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/08 11:15:56 UTC
[GitHub] [airflow] tanelk opened a new issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn
tanelk opened a new issue #22078:
URL: https://github.com/apache/airflow/issues/22078
### Apache Airflow version
main (development)
### What happened
`ExternalTaskSensorLink` points to a DAG run with `execution_date` same as the `ExternalTaskSensor`.
### What you expected to happen
`ExternalTaskSensorLink` should point to the actual DAG run it is waiting on.
### How to reproduce
For example with by modifing this UT:
```diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py
index 6e7222ed4..9aa941f9e 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -413,6 +413,7 @@ def test_external_task_sensor_templated(dag_maker, app):
task_id='templated_task',
external_dag_id='dag_{{ ds }}',
external_task_id='task_{{ ds }}',
+ execution_delta=timedelta(days=1)
)
dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE)
@@ -427,7 +428,7 @@ def test_external_task_sensor_templated(dag_maker, app):
with app.app_context():
url = instance.task.get_extra_links(instance, "External DAG")
- assert f"tree?dag_id=dag_{DEFAULT_DATE.date()}" in url
+ assert f"tree?dag_id=dag_{DEFAULT_DATE.date()}&execution_date={(DEFAULT_DATE - timedelta(days=1)).date()}" in url
```
Curreantly it fails with `E AssertionError: assert 'tree?dag_id=dag_2015-01-01&execution_date=2014-12-31' in 'http:///tree?dag_id=dag_2015-01-01&execution_date=2015-01-01T00%3A00%3A00%2B00%3A00'`, but should succeed
### Operating System
Arch Linux
### Versions of Apache Airflow Providers
_No response_
### Deployment
Virtualenv installation
### Deployment details
_No response_
### Anything else
Should modify the `ExternalTaskSensorLink.get_link` method.
The `operator` in there is a `SerializedBaseOperator` and I did not figure out how to access the `execution_delta` and `execution_date_fn` fields from there.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tanelk edited a comment on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn
Posted by GitBox <gi...@apache.org>.
tanelk edited a comment on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1067650427
Meanwhile these unit tests pass:
```
def test_external_task_sensor_link_execution_delta(dag_maker, app):
with dag_maker():
ExternalTaskSensor(
task_id='external_task_sensor',
external_dag_id='external_dag_id',
execution_delta=timedelta(days=1),
)
dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE)
(instance,) = dagrun.task_instances
instance.render_templates()
# Verify that the operator link has correct execution_date
app.config['SERVER_NAME'] = ""
with app.app_context():
url = instance.task.get_extra_links(instance, "External DAG")
assert f"tree?dag_id=external_dag_id&execution_date={(DEFAULT_DATE - timedelta(days=1)).date()}" in url
def test_external_task_sensor_link_execution_date_fn(dag_maker, app):
with dag_maker():
ExternalTaskSensor(
task_id='external_task_sensor',
external_dag_id='external_dag_id',
execution_date_fn=lambda date: date - timedelta(days=1),
)
dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE)
(instance,) = dagrun.task_instances
instance.render_templates()
# Verify that the operator link has correct execution_date
app.config['SERVER_NAME'] = ""
with app.app_context():
url = instance.task.get_extra_links(instance, "External DAG")
assert f"tree?dag_id=external_dag_id&execution_date={(DEFAULT_DATE - timedelta(days=1)).date()}" in url
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn
Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1066038666
I believe those attributes are set on the `SerializedBaseOperator` instance directly. It should be, otherwise the deserialization has a bug…
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1061676752
Would you like to attempt to make a pr for that one @tanelk ? Sounds like rather simple fix and you should be able to test it locally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tanelk commented on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn
Posted by GitBox <gi...@apache.org>.
tanelk commented on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1061681406
I'l quoute myself:
Should modify the `ExternalTaskSensorLink.get_link` method.
The operator in there is a `SerializedBaseOperator` and I did not figure out how to access the `execution_delta` and `execution_date_fn` fields from there.
So, yeah, I did take a look, but I'm not sure how to access the relevant information.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tanelk commented on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn
Posted by GitBox <gi...@apache.org>.
tanelk commented on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1067649663
With this change
```diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index b0ffed69b..0aafb7541 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -42,7 +42,18 @@ class ExternalTaskSensorLink(BaseOperatorLink):
def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
operator.render_template_fields(ti.get_template_context())
- query = {"dag_id": operator.external_dag_id, "execution_date": dttm.isoformat()}
+
+ if operator.execution_delta:
+ execution_date = dttm - operator.execution_delta
+ elif operator.execution_date_fn:
+ execution_date = operator._handle_execution_date_fn(context=ti.get_template_context())
+ else:
+ execution_date = dttm
+
+ query = {
+ "dag_id": operator.external_dag_id,
+ "execution_date": execution_date.isoformat()
+ }
return build_airflow_url_with_query(query)
```
I get this exception when opening the task instance pop-up in the UI:
```
[2022-03-15 07:10:14,468] {app.py:1892} ERROR - Exception on /extra_links [GET]
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/opt/airflow/airflow/www/auth.py", line 51, in decorated
return func(*args, **kwargs)
File "/opt/airflow/airflow/www/decorators.py", line 80, in wrapper
return f(*args, **kwargs)
File "/opt/airflow/airflow/utils/session.py", line 71, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/airflow/airflow/www/views.py", line 3241, in extra_links
url = task.get_extra_links(ti, link_name)
File "/opt/airflow/airflow/models/abstractoperator.py", line 268, in get_extra_links
return link.get_link(self, ti.dag_run.logical_date) # type: ignore[misc]
File "/opt/airflow/airflow/sensors/external_task.py", line 46, in get_link
if operator.execution_delta:
AttributeError: 'SerializedBaseOperator' object has no attribute 'execution_delta'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn
Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1061687083
Ah - can you use `run_id` for that instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] tanelk commented on issue #22078: ExternalTaskSensorLink does not respect execution_delta and execution_date_fn
Posted by GitBox <gi...@apache.org>.
tanelk commented on issue #22078:
URL: https://github.com/apache/airflow/issues/22078#issuecomment-1067650427
Meanwhile these unit tests pass:
```def test_external_task_sensor_link_execution_delta(dag_maker, app):
with dag_maker():
ExternalTaskSensor(
task_id='external_task_sensor',
external_dag_id='external_dag_id',
execution_delta=timedelta(days=1),
)
dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE)
(instance,) = dagrun.task_instances
instance.render_templates()
# Verify that the operator link has correct execution_date
app.config['SERVER_NAME'] = ""
with app.app_context():
url = instance.task.get_extra_links(instance, "External DAG")
assert f"tree?dag_id=external_dag_id&execution_date={(DEFAULT_DATE - timedelta(days=1)).date()}" in url
def test_external_task_sensor_link_execution_date_fn(dag_maker, app):
with dag_maker():
ExternalTaskSensor(
task_id='external_task_sensor',
external_dag_id='external_dag_id',
execution_date_fn=lambda date: date - timedelta(days=1),
)
dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, execution_date=DEFAULT_DATE)
(instance,) = dagrun.task_instances
instance.render_templates()
# Verify that the operator link has correct execution_date
app.config['SERVER_NAME'] = ""
with app.app_context():
url = instance.task.get_extra_links(instance, "External DAG")
assert f"tree?dag_id=external_dag_id&execution_date={(DEFAULT_DATE - timedelta(days=1)).date()}" in url
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org