You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/22 23:41:59 UTC

[GitHub] [airflow] Taragolis opened a new pull request, #25248: WIP: Serialized object property

Taragolis opened a new pull request, #25248:
URL: https://github.com/apache/airflow/pull/25248

   At that moment Airflow can't serialize DAG with dynamic tasks if `operator_extra_links` set as property 
   related: #25243, #25215, #24676
   
   I've tried to add get actual values of property.
   
   ---
   It works in simple cases: #25215, #24676
   
   ```python
   from pendulum import datetime
   
   from airflow.decorators import dag
   from airflow.sensors.external_task import ExternalTaskSensor
   
   
   @dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
   def external_task_sensor():
       ExternalTaskSensor.partial(
           task_id='wait',
       ).expand(external_dag_id=["dag_1", "dag_2", "dag_3"])
   
   _ = external_task_sensor()
   
   
   ```
   ![image](https://user-images.githubusercontent.com/3998685/180580272-b9011b08-4156-4518-9929-8fbc7133340e.png)
   
   However there is no reason use property `operator_extra_links` in this cases
   
   ---
   
   It still not completely help in case when property uses for dynamic links selections such as: #25243
   
   ```python
   from pendulum import datetime
   
   from airflow.decorators import dag
   from airflow.providers.amazon.aws.operators.batch import BatchOperator
   
   
   @dag(start_date=datetime(2022, 1, 1), schedule_interval=None)
   def batchop_dtm():
       BatchOperator.partial(
           task_id='submit_batch_job',
           job_queue="batch_job_queue_name",
           job_definition="batch_job_definition_name",
           overrides={},
           # Set this flag to False, so we can test the sensor below
           wait_for_completion=False,
       ).expand(job_name=["job_1", "job_2", "job_3"])
   
   
   _ = batchop_dtm()
   ```
   
   With this PoC error changed, `airflow.models.mappedoperator.MappedOperator` doesn't have access to attributes of Task
   
   ```
   Broken DAG: [/files/dags/batch_mapping.py] Traceback (most recent call last):
     File "/opt/airflow/airflow/utils/helpers.py", line 390, in resolve_property_value
       return prop.fget(obj)
     File "/opt/airflow/airflow/providers/amazon/aws/operators/batch.py", line 117, in operator_extra_links
       if self.wait_for_completion:
   AttributeError: type object 'SerializedBaseOperator' has no attribute 'wait_for_completion'
   
   During handling of the above exception, another exception occurred:
   
   Traceback (most recent call last):
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1178, in to_dict
       json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
     File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1086, in serialize_dag
       raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
   airflow.exceptions.SerializationError: Failed to serialize DAG 'batchop_dtm': type object 'SerializedBaseOperator' has no attribute 'wait_for_completion'
   ```
   
   cc: @josh-fell 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #25248: WIP: Serialized object property

Posted by GitBox <gi...@apache.org>.
Taragolis commented on PR #25248:
URL: https://github.com/apache/airflow/pull/25248#issuecomment-1195361223

   I found that Extra Links do not work with dynamic tasks at all.
   Extra links assign to parent task instance (i do not know how to correct name this TI) but not to actual mapped TIs.
   As result we only have `number extra links defined in operator` not `(number extra links defined in operator) x number of mapped TIs`
   
   ### Sample DAG
   
   ```python
   from pendulum import datetime
   
   from airflow.decorators import dag
   from airflow.sensors.external_task import ExternalTaskSensor
   from airflow.operators.empty import EmptyOperator
   
   EXTERNAL_DAG_IDS = [f"example_external_dag_{ix:02d}" for ix in range(3)]
   DAG_KWARGS = {
       "start_date": datetime(2022, 7, 1),
       "schedule_interval": "@daily",
       "catchup": False,
       "tags": ["mapped_extra_links", "AIP-42", "serialization"],
   }
   
   
   def external_dags():
       EmptyOperator(task_id="dummy")
   
   
   @dag(**DAG_KWARGS)
   def external_regular_task_sensor():
       for external_dag_id in EXTERNAL_DAG_IDS:
           ExternalTaskSensor(
               task_id=f'wait_for_{external_dag_id}',
               external_dag_id=external_dag_id,
               poke_interval=5,
           )
   
   
   @dag(**DAG_KWARGS)
   def external_mapped_task_sensor():
       ExternalTaskSensor.partial(
           task_id='wait',
           poke_interval=5,
       ).expand(external_dag_id=EXTERNAL_DAG_IDS)
   
   
   dag_external_regular_task_sensor = external_regular_task_sensor()
   dag_external_mapped_task_sensor = external_mapped_task_sensor()
   
   for dag_id in EXTERNAL_DAG_IDS:
       globals()[dag_id] = dag(dag_id=dag_id, **DAG_KWARGS)(external_dags)()
   
   ```
   
   <details>
     <summary>Video Demo</summary>
   
   https://user-images.githubusercontent.com/3998685/180994213-847b3fd3-d351-4836-b246-b54056f34ad6.mp4
   
   </details>
   
   Might be better do not serialize Extra Links for dynamic tasks at all?
   WDYT @potiuk @uranusjr 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #25248: WIP: Serialized object property

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #25248:
URL: https://github.com/apache/airflow/pull/25248#discussion_r932015431


##########
airflow/utils/helpers.py:
##########
@@ -376,3 +376,16 @@ def is_empty(x):
         return new_list
     else:
         return val
+
+
+def resolve_property_value(obj, prop):
+    """Return class property value.
+
+    :param obj: Reference to class.
+    :param prop: Reference to class property.
+    :returns: If ``prop`` property than return property value,
+        otherwise it returns class attribute value.
+    """
+    if isinstance(prop, property):
+        return prop.fget(obj)
+    return prop

Review Comment:
   Sorry for confusing. Initially I think you mean add some implementation `__serialized_fields` in BatchOperator.
   Now I understand what you mean place in `serialized_objects` module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed pull request #25248: WIP: Serialized object property

Posted by GitBox <gi...@apache.org>.
potiuk closed pull request #25248: WIP: Serialized object property
URL: https://github.com/apache/airflow/pull/25248


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #25248: WIP: Serialized object property

Posted by GitBox <gi...@apache.org>.
Taragolis commented on PR #25248:
URL: https://github.com/apache/airflow/pull/25248#issuecomment-1193011773

   BTW, seems like all XCom based External Links won't work with dynamic tasks.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #25248: WIP: Serialized object property

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #25248:
URL: https://github.com/apache/airflow/pull/25248#discussion_r932015431


##########
airflow/utils/helpers.py:
##########
@@ -376,3 +376,16 @@ def is_empty(x):
         return new_list
     else:
         return val
+
+
+def resolve_property_value(obj, prop):
+    """Return class property value.
+
+    :param obj: Reference to class.
+    :param prop: Reference to class property.
+    :returns: If ``prop`` property than return property value,
+        otherwise it returns class attribute value.
+    """
+    if isinstance(prop, property):
+        return prop.fget(obj)
+    return prop

Review Comment:
   Sorry for confuse you. Initially I think you mean add some implementation `__serialized_fields` in BatchOperator.
   Now I understand what you mean place in `serialized_objects` module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25248: WIP: Serialized object property

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25248:
URL: https://github.com/apache/airflow/pull/25248#discussion_r930630835


##########
airflow/utils/helpers.py:
##########
@@ -376,3 +376,16 @@ def is_empty(x):
         return new_list
     else:
         return val
+
+
+def resolve_property_value(obj, prop):
+    """Return class property value.
+
+    :param obj: Reference to class.
+    :param prop: Reference to class property.
+    :returns: If ``prop`` property than return property value,
+        otherwise it returns class attribute value.
+    """
+    if isinstance(prop, property):
+        return prop.fget(obj)
+    return prop

Review Comment:
   This can live directly in `serialized_objects` instead (and be a private function)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25248: WIP: Serialized object property

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25248:
URL: https://github.com/apache/airflow/pull/25248#issuecomment-1203795870

   Yeah. I think @uranusjr works on fixing this problem "properly" in https://github.com/apache/airflow/pull/25500 . Closing this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #25248: WIP: Serialized object property

Posted by GitBox <gi...@apache.org>.
Taragolis commented on PR #25248:
URL: https://github.com/apache/airflow/pull/25248#issuecomment-1203747159

   @potiuk @uranusjr Do we need this PR or I could close it?
   
   The only reason why I created this PR, try to get property values for extra_link property but this still can't be for `BatchOperator`. Seems it might solved by #25332
   
   Also I could make changes in `BatchOperator` and define as tuple. In this case with some arguments in Operator some links always would greyed however it might be better rather than have serialisation error on mapped tasks.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #25248: WIP: Serialized object property

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #25248:
URL: https://github.com/apache/airflow/pull/25248#discussion_r931984660


##########
airflow/utils/helpers.py:
##########
@@ -376,3 +376,16 @@ def is_empty(x):
         return new_list
     else:
         return val
+
+
+def resolve_property_value(obj, prop):
+    """Return class property value.
+
+    :param obj: Reference to class.
+    :param prop: Reference to class property.
+    :returns: If ``prop`` property than return property value,
+        otherwise it returns class attribute value.
+    """
+    if isinstance(prop, property):
+        return prop.fget(obj)
+    return prop

Review Comment:
   What do you mean? Not sure I understand the question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #25248: WIP: Serialized object property

Posted by GitBox <gi...@apache.org>.
Taragolis commented on PR #25248:
URL: https://github.com/apache/airflow/pull/25248#issuecomment-1193017352

   List of External Links which define `operator_extra_links` as property
   - `airflow.sensors.external_task.ExternalTaskSensor`  - #25215
   - `airflow.sensors.external_task.ExternalTaskMarker` - #25215
   - `airflow.providers.amazon.aws.operators.batch.BatchOperator` - #25243
   - `airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator` - marked as deprecated
   - `tests.test_utils.mock_operators.CustomOperator` - use in tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #25248: WIP: Serialized object property

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #25248:
URL: https://github.com/apache/airflow/pull/25248#issuecomment-1193156526

   Nice. I saw those issues too. I know it's draft but a look at that might be useful @uranusjr :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #25248: WIP: Serialized object property

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #25248:
URL: https://github.com/apache/airflow/pull/25248#discussion_r931305254


##########
airflow/utils/helpers.py:
##########
@@ -376,3 +376,16 @@ def is_empty(x):
         return new_list
     else:
         return val
+
+
+def resolve_property_value(obj, prop):
+    """Return class property value.
+
+    :param obj: Reference to class.
+    :param prop: Reference to class property.
+    :returns: If ``prop`` property than return property value,
+        otherwise it returns class attribute value.
+    """
+    if isinstance(prop, property):
+        return prop.fget(obj)
+    return prop

Review Comment:
   Just to clarify - is it related to Operator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org