You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/07/20 21:52:25 UTC

[GitHub] [airflow] shashwatsrivastava94 opened a new issue, #25207: Create custom DAG classes with custom `clear` etc.

shashwatsrivastava94 opened a new issue, #25207:
URL: https://github.com/apache/airflow/issues/25207

   ### Description
   
   I have a use case where we have a task which creates a Spark cluster for all other tasks to use. This task is the root of all tasks in the DAG so runs first when the DAG is kicked off. If anything fails in the DAG, I need to clear the failed task as well as the root task to create a cluster which can be reused. This results in me having to clear two tasks each time.
   
   I would love to be able to override the `clear` method in DAG https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L1821 in a custom DAG class to also always clear the root task. I tried doing this in my custom task but it didnt work. After some looking around, the reason is because of the way the DAGs are serialized to be stored in the metadata DB. They lose information about their original type so we are unable to overwrite any methods.
   This happens in this method: https://github.com/apache/airflow/blob/main/airflow/serialization/serialized_objects.py#L1000
   
   Anyone have an easier way to accomplishing this?
   
   
   One way to solve this would be to store the information about the class in this object - this code is adding `serialize_dag["_class"] = dag.__class__.__module__ + '.' + dag.__class__.__name__` in the `serialize_dag` and updating https://github.com/apache/airflow/blob/main/airflow/serialization/schema.json#L109 to include `_class`. Then the issue comes down to deserializing the DAG which Im struggling with. 
   We can run 
   ```
   from pydoc import locate
   dag_class = locate(encoded_dag["_class"])
   dag_class(dag.__dict__)
   ```
    here https://github.com/apache/airflow/blob/main/airflow/serialization/serialized_objects.py#L1120 but Im getting tripped up on how to define the attributes we pass to the `dag_class` constructor. Any help is appreciated 🙏 
   
   
   ### Use case/motivation
   
   I would like to be able to overwrite methods within the DAG class to customize the behaviour of my custom DAG classes which inherit from DAG
   
   ### Related issues
   
   I didnt find any :( 
   
   ### Are you willing to submit a PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on issue #25207: Create custom DAG classes with custom `clear` etc.

Posted by GitBox <gi...@apache.org>.
potiuk commented on issue #25207:
URL: https://github.com/apache/airflow/issues/25207#issuecomment-1191926593

   Also to add - the very important part of this is security. We are working on improving the security of Airflow and we are gradually fixing all the potential places where - especially - DAG author can provide a code that can potentially "escape" the sandboxes we provide. https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation introduced the possibiility of separation of the code provided by DAG authors from Scheduler. As of this chnage, you can only run the code provided by the DAG authors in a separate process, that can run on a separate machine. In the future it will be further isolated (so that the code execution from different teams happens in separate sendboxes, processes or even machines) - this way we want to introduce multi-tenancy. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] shashwatsrivastava94 commented on issue #25207: Create custom DAG classes with custom `clear` etc.

Posted by GitBox <gi...@apache.org>.
shashwatsrivastava94 commented on issue #25207:
URL: https://github.com/apache/airflow/issues/25207#issuecomment-1190862748

   I could overwrite this method https://github.com/apache/airflow/blob/main/airflow/models/dag.py#L2692 in my class but then the schema.json would need to be dynamic based on the results from this method


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on issue #25207: Create custom DAG classes with custom `clear` etc.

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on issue #25207:
URL: https://github.com/apache/airflow/issues/25207#issuecomment-1190798722

   Thanks for opening your first issue here! Be sure to follow the issue template!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #25207: Create custom DAG classes with custom `clear` etc.

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #25207:
URL: https://github.com/apache/airflow/issues/25207#issuecomment-1190913015

   Instead of overriding the class, it seems like something like an _on-clear hook_ would be more suitable? Say (illustrative only)
   
   ```python
   def on_dag_clear(dag: DAG) -> None:
       # Do something here...
   
   dag = DAG(..., on_clear=on_dag_clear)
   ```
   
   So the function is called when the DAG is cleared. We generally don’t want to allow the user to override behaviour in DAG because it’s executed directly in the Airflow process, and bad code can easily hang or even crash Airflow entirely.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on issue #25207: Create custom DAG classes with custom `clear` etc.

Posted by GitBox <gi...@apache.org>.
uranusjr commented on issue #25207:
URL: https://github.com/apache/airflow/issues/25207#issuecomment-1191106777

   > I cant figure out where we deserialize or recreate the callback methods for `on_success/failure`
   
   The actual callbacks are _not set_ on the deserialized/recreated DAG because it’s impossible to do without importing the original DAG files. Airflow only imports the DAG files in worker processes so user code is never run in the scheduler (which always only use deserialized DAGs). Thosee callbacks are instead executed by the DAG processor process, which imports the DAG files and thus has access to the actual Python function object.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] HugoCornu commented on issue #25207: Create custom DAG classes with custom `clear` etc.

Posted by GitBox <gi...@apache.org>.
HugoCornu commented on issue #25207:
URL: https://github.com/apache/airflow/issues/25207#issuecomment-1219610259

   @shashwatsrivastava94 
   did you find a solution in the end ? 
   I am also looking for ways to override some methods of my custom operators.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] shashwatsrivastava94 commented on issue #25207: Create custom DAG classes with custom `clear` etc.

Posted by GitBox <gi...@apache.org>.
shashwatsrivastava94 commented on issue #25207:
URL: https://github.com/apache/airflow/issues/25207#issuecomment-1191009142

   That makes sense
   
   > We generally don’t want to allow the user to override behaviour in DAG because it’s executed directly in the Airflow process
   
   This reasoning makes sense - but if Im understanding this correctly, the callback functions also run in the Airflow process correct? I can see messing up this method being less common than overriding the methods but can you confirm that this would have the same affect. Want to make sure that Im following the code correctly.
   
   Can you also point me to where we re-create the `on_success_callback` method when deserializing a DAG from the metadata DB? I see this code https://github.com/apache/airflow/blob/cff7d9194f549d801947f47dfce4b5d6870bfaaa/airflow/serialization/serialized_objects.py#L1091 to add `has_on_success_callback` but I cant figure out where we deserialize or recreate the callback methods for `on_success/failure`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org