You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/22 23:41:59 UTC
[GitHub] [airflow] Taragolis opened a new pull request, #25248: WIP: Serialized object property
Taragolis opened a new pull request, #25248:
URL: https://github.com/apache/airflow/pull/25248
At that moment Airflow can't serialize DAG with dynamic tasks if `operator_extra_links` set as property
related: #25243, #25215, #24676
I've tried to add get actual values of property.
---
It works in simple cases: #25215, #24676
```python
from pendulum import datetime
from airflow.decorators import dag
from airflow.sensors.external_task import ExternalTaskSensor
@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def external_task_sensor():
ExternalTaskSensor.partial(
task_id='wait',
).expand(external_dag_id=["dag_1", "dag_2", "dag_3"])
_ = external_task_sensor()
```
![image](https://user-images.githubusercontent.com/3998685/180580272-b9011b08-4156-4518-9929-8fbc7133340e.png)
However there is no reason use property `operator_extra_links` in this cases
---
It still not completely help in case when property uses for dynamic links selections such as: #25243
```python
from pendulum import datetime
from airflow.decorators import dag
from airflow.providers.amazon.aws.operators.batch import BatchOperator
@dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
def batchop_dtm():
BatchOperator.partial(
task_id='submit_batch_job',
job_queue="batch_job_queue_name",
job_definition="batch_job_definition_name",
overrides={},
# Set this flag to False, so we can test the sensor below
wait_for_completion=False,
).expand(job_name=["job_1", "job_2", "job_3"])
_ = batchop_dtm()
```
With this PoC error changed, `airflow.models.mappedoperator.MappedOperator` doesn't have access to attributes of Task
```
Broken DAG: [/files/dags/batch_mapping.py] Traceback (most recent call last):
File "/opt/airflow/airflow/utils/helpers.py", line 390, in resolve_property_value
return prop.fget(obj)
File "/opt/airflow/airflow/providers/amazon/aws/operators/batch.py", line 117, in operator_extra_links
if self.wait_for_completion:
AttributeError: type object 'SerializedBaseOperator' has no attribute 'wait_for_completion'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1178, in to_dict
json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1086, in serialize_dag
raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'batchop_dtm': type object 'SerializedBaseOperator' has no attribute 'wait_for_completion'
```
cc: @josh-fell
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] Taragolis commented on pull request #25248: WIP: Serialized object property
Posted by GitBox <gi...@apache.org>.
Taragolis commented on PR #25248:
URL: https://github.com/apache/airflow/pull/25248#issuecomment-1195361223
I found that Extra Links do not work with dynamic tasks at all.
Extra links assign to parent task instance (i do not know how to correct name this TI) but not to actual mapped TIs.
As result we only have `number extra links defined in operator` not `(number extra links defined in operator) x number of mapped TIs`
### Sample DAG
```python
from pendulum import datetime
from airflow.decorators import dag
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.empty import EmptyOperator
EXTERNAL_DAG_IDS = [f"example_external_dag_{ix:02d}" for ix in range(3)]
DAG_KWARGS = {
"start_date": datetime(2022, 7, 1),
"schedule_interval": "@daily",
"catchup": False,
"tags": ["mapped_extra_links", "AIP-42", "serialization"],
}
def external_dags():
EmptyOperator(task_id="dummy")
@dag(**DAG_KWARGS)
def external_regular_task_sensor():
for external_dag_id in EXTERNAL_DAG_IDS:
ExternalTaskSensor(
task_id=f'wait_for_{external_dag_id}',
external_dag_id=external_dag_id,
poke_interval=5,
)
@dag(**DAG_KWARGS)
def external_mapped_task_sensor():
ExternalTaskSensor.partial(
task_id='wait',
poke_interval=5,
).expand(external_dag_id=EXTERNAL_DAG_IDS)
dag_external_regular_task_sensor = external_regular_task_sensor()
dag_external_mapped_task_sensor = external_mapped_task_sensor()
for dag_id in EXTERNAL_DAG_IDS:
globals()[dag_id] = dag(dag_id=dag_id, **DAG_KWARGS)(external_dags)()
```
<details>
<summary>Video Demo</summary>
https://user-images.githubusercontent.com/3998685/180994213-847b3fd3-d351-4836-b246-b54056f34ad6.mp4
</details>
Might be better do not serialize Extra Links for dynamic tasks at all?
WDYT @potiuk @uranusjr
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] Taragolis commented on a diff in pull request #25248: WIP: Serialized object property
Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #25248:
URL: https://github.com/apache/airflow/pull/25248#discussion_r932015431
##########
airflow/utils/helpers.py:
##########
@@ -376,3 +376,16 @@ def is_empty(x):
return new_list
else:
return val
+
+
+def resolve_property_value(obj, prop):
+ """Return class property value.
+
+ :param obj: Reference to class.
+ :param prop: Reference to class property.
+ :returns: If ``prop`` property than return property value,
+ otherwise it returns class attribute value.
+ """
+ if isinstance(prop, property):
+ return prop.fget(obj)
+ return prop
Review Comment:
Sorry for confusing. Initially I think you mean add some implementation `__serialized_fields` in BatchOperator.
Now I understand what you mean place in `serialized_objects` module.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk closed pull request #25248: WIP: Serialized object property
Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #25248: WIP: Serialized object property
URL: https://github.com/apache/airflow/pull/25248
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] Taragolis commented on pull request #25248: WIP: Serialized object property
Posted by GitBox <gi...@apache.org>.
Taragolis commented on PR #25248:
URL: https://github.com/apache/airflow/pull/25248#issuecomment-1193011773
BTW, seems like all XCom based External Links won't work with dynamic tasks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] Taragolis commented on a diff in pull request #25248: WIP: Serialized object property
Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #25248:
URL: https://github.com/apache/airflow/pull/25248#discussion_r932015431
##########
airflow/utils/helpers.py:
##########
@@ -376,3 +376,16 @@ def is_empty(x):
return new_list
else:
return val
+
+
+def resolve_property_value(obj, prop):
+ """Return class property value.
+
+ :param obj: Reference to class.
+ :param prop: Reference to class property.
+ :returns: If ``prop`` property than return property value,
+ otherwise it returns class attribute value.
+ """
+ if isinstance(prop, property):
+ return prop.fget(obj)
+ return prop
Review Comment:
Sorry for confuse you. Initially I think you mean add some implementation `__serialized_fields` in BatchOperator.
Now I understand what you mean place in `serialized_objects` module.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #25248: WIP: Serialized object property
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25248:
URL: https://github.com/apache/airflow/pull/25248#discussion_r930630835
##########
airflow/utils/helpers.py:
##########
@@ -376,3 +376,16 @@ def is_empty(x):
return new_list
else:
return val
+
+
+def resolve_property_value(obj, prop):
+ """Return class property value.
+
+ :param obj: Reference to class.
+ :param prop: Reference to class property.
+ :returns: If ``prop`` property than return property value,
+ otherwise it returns class attribute value.
+ """
+ if isinstance(prop, property):
+ return prop.fget(obj)
+ return prop
Review Comment:
This can live directly in `serialized_objects` instead (and be a private function)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on pull request #25248: WIP: Serialized object property
Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25248:
URL: https://github.com/apache/airflow/pull/25248#issuecomment-1203795870
Yeah. I think @uranusjr works on fixing this problem "properly" in https://github.com/apache/airflow/pull/25500 . Closing this one.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] Taragolis commented on pull request #25248: WIP: Serialized object property
Posted by GitBox <gi...@apache.org>.
Taragolis commented on PR #25248:
URL: https://github.com/apache/airflow/pull/25248#issuecomment-1203747159
@potiuk @uranusjr Do we need this PR or I could close it?
The only reason why I created this PR, try to get property values for extra_link property but this still can't be for `BatchOperator`. Seems it might solved by #25332
Also I could make changes in `BatchOperator` and define as tuple. In this case with some arguments in Operator some links always would greyed however it might be better rather than have serialisation error on mapped tasks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] uranusjr commented on a diff in pull request #25248: WIP: Serialized object property
Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25248:
URL: https://github.com/apache/airflow/pull/25248#discussion_r931984660
##########
airflow/utils/helpers.py:
##########
@@ -376,3 +376,16 @@ def is_empty(x):
return new_list
else:
return val
+
+
+def resolve_property_value(obj, prop):
+ """Return class property value.
+
+ :param obj: Reference to class.
+ :param prop: Reference to class property.
+ :returns: If ``prop`` property than return property value,
+ otherwise it returns class attribute value.
+ """
+ if isinstance(prop, property):
+ return prop.fget(obj)
+ return prop
Review Comment:
What do you mean? Not sure I understand the question.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] Taragolis commented on pull request #25248: WIP: Serialized object property
Posted by GitBox <gi...@apache.org>.
Taragolis commented on PR #25248:
URL: https://github.com/apache/airflow/pull/25248#issuecomment-1193017352
List of External Links which define `operator_extra_links` as property
- `airflow.sensors.external_task.ExternalTaskSensor` - #25215
- `airflow.sensors.external_task.ExternalTaskMarker` - #25215
- `airflow.providers.amazon.aws.operators.batch.BatchOperator` - #25243
- `airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator` - marked as deprecated
- `tests.test_utils.mock_operators.CustomOperator` - use in tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on pull request #25248: WIP: Serialized object property
Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25248:
URL: https://github.com/apache/airflow/pull/25248#issuecomment-1193156526
Nice. I saw those issues too. I know it's draft but a look at that might be useful @uranusjr :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] Taragolis commented on a diff in pull request #25248: WIP: Serialized object property
Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #25248:
URL: https://github.com/apache/airflow/pull/25248#discussion_r931305254
##########
airflow/utils/helpers.py:
##########
@@ -376,3 +376,16 @@ def is_empty(x):
return new_list
else:
return val
+
+
+def resolve_property_value(obj, prop):
+ """Return class property value.
+
+ :param obj: Reference to class.
+ :param prop: Reference to class property.
+ :returns: If ``prop`` property than return property value,
+ otherwise it returns class attribute value.
+ """
+ if isinstance(prop, property):
+ return prop.fget(obj)
+ return prop
Review Comment:
Just to clarify - is it related to Operator?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org