You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/11/08 17:00:55 UTC

[GitHub] [airflow] turbaszek opened a new pull request #12184: Create DAG-level cluster policy

turbaszek opened a new pull request #12184:
URL: https://github.com/apache/airflow/pull/12184


   This commit adds new concept of dag_policy which is checked
   once for every DAG when creating DagBag. It also improves
   documentation around cluster policies.
   
   closes: #12179
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#discussion_r520091963



##########
File path: docs/concepts.rst
##########
@@ -1194,72 +1194,67 @@ state.
 
 Cluster Policy
 ==============
-Cluster policies provide an interface for taking action on every Airflow task
-either at DAG load time or just before task execution.
-
-Cluster Policies for Task Mutation
------------------------------------
-In case you want to apply cluster-wide mutations to the Airflow tasks,
-you can either mutate the task right after the DAG is loaded or
-mutate the task instance before task execution.
-
-Mutate tasks after DAG loaded
------------------------------
-
-To mutate the task right after the DAG is parsed, you can define
-a ``policy`` function in ``airflow_local_settings.py`` that mutates the
-task based on other task or DAG attributes (through ``task.dag``).
-It receives a single argument as a reference to the task object and you can alter
-its attributes.
 
-For example, this function could apply a specific queue property when
-using a specific operator, or enforce a task timeout policy, making sure
-that no tasks run for more than 48 hours. Here's an example of what this
-may look like inside your ``airflow_local_settings.py``:
+Cluster policies provide an interface for taking action on every Airflow task
+or DAG either at DAG load time or just before task execution. In this way users
+are able to do the following:
 
+- set default arguments on each DAG/task
+- checks that DAG/task meets required standards
+- perform custom logic of routing task to a queue
 
-.. code-block:: python
+Any many other options. To use cluster-wide policies users can define in their

Review comment:
       ```suggestion
   And many other options. To use cluster-wide policies users can define in their
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#discussion_r520097076



##########
File path: UPDATING.md
##########
@@ -50,6 +50,14 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Rename policy to task_policy
+
+Because Airflow introduced DAG level policy (`dag_policy`) we decided to rename existing `policy`
+function to `task_policy` to make the distinction more profound and avoid any confusion.
+
+Users using cluster policy need to rename their `policy` functions in `airflow_local_settings.py`
+to `task_policy`.

Review comment:
       I think this information creates no confusion - users have to take an action (now or later). The sooner the better in my opinion. Imho we should encourage users to adjust. However, if you think this may create some doubts then I can change the entry. But it will have to be adjusted once we remove the deprecation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#discussion_r520097076



##########
File path: UPDATING.md
##########
@@ -50,6 +50,14 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Rename policy to task_policy
+
+Because Airflow introduced DAG level policy (`dag_policy`) we decided to rename existing `policy`
+function to `task_policy` to make the distinction more profound and avoid any confusion.
+
+Users using cluster policy need to rename their `policy` functions in `airflow_local_settings.py`
+to `task_policy`.

Review comment:
       I think this information creates no confusion - users have to take an action (now or latter). The sooner the better in my opinion. Imho we should encourage users to adjust. However, if you thing this may create some doubts then I can change the entry. But it will have to be adjusted once we remove the deprecation




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#discussion_r520091247



##########
File path: UPDATING.md
##########
@@ -50,6 +50,14 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Rename policy to task_policy
+
+Because Airflow introduced DAG level policy (`dag_policy`) we decided to rename existing `policy`
+function to `task_policy` to make the distinction more profound and avoid any confusion.
+
+Users using cluster policy need to rename their `policy` functions in `airflow_local_settings.py`
+to `task_policy`.

Review comment:
       `policy` would still work but raise deprecation warning and will be completely removed in Airflow 2.1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#discussion_r522142497



##########
File path: airflow/models/dagbag.py
##########
@@ -47,6 +47,16 @@
 from airflow.utils.session import provide_session
 from airflow.utils.timeout import timeout
 
+# TODO: Remove once deprecated
+if hasattr(settings, "policy"):
+    warnings.warn(
+        "Using `policy` in airflow_local_settings.py is deprecated. "
+        "Please rename your `policy` to `task_policy`.",
+        DeprecationWarning,
+        stacklevel=2,
+    )
+    setattr(settings, "task_policy", settings.policy)  # pylint: disable=no-member

Review comment:
       That was my intial idea but somehow I decided to do this in dagbag 🤷‍♂️  Done in a250f41




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#issuecomment-726328135


   The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek merged pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
turbaszek merged pull request #12184:
URL: https://github.com/apache/airflow/pull/12184


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#issuecomment-725951222


   The PR needs to run all tests because it modifies core of Airflow! Please rebase it to latest master or ask committer to re-run it!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#discussion_r522167065



##########
File path: airflow/models/dagbag.py
##########
@@ -47,6 +47,16 @@
 from airflow.utils.session import provide_session
 from airflow.utils.timeout import timeout
 
+# TODO: Remove once deprecated
+if hasattr(settings, "policy"):
+    warnings.warn(
+        "Using `policy` in airflow_local_settings.py is deprecated. "
+        "Please rename your `policy` to `task_policy`.",
+        DeprecationWarning,
+        stacklevel=2,
+    )
+    setattr(settings, "task_policy", settings.policy)  # pylint: disable=no-member

Review comment:
       I'm just conscious that DagBag may go away at some point soon -- it does a lot less than it used to.
   
   Also top-level code is a bit of a smell to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#issuecomment-723637696


   CC @olchas @TobKed @jaketf 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#discussion_r519465132



##########
File path: airflow/models/dagbag.py
##########
@@ -383,8 +383,11 @@ def bag_dag(self, dag, root_dag):
         dag.resolve_template_files()
         dag.last_loaded = timezone.utcnow()
 
+        # Check policies
+        settings.dag_policy(dag)
+
         for task in dag.tasks:
-            settings.policy(task)
+            settings.task_policy(task)

Review comment:
       Done, this is what users will see:
   ```
   root@817b1471dabe:/opt/airflow# airflow scheduler
   /opt/airflow/airflow/models/dag.py:61: DeprecationWarning: Using `policy` in airflow_local_settings.py is deprecated. Please rename your `policy` to `task_policy`.
     from airflow.models.dagbag import DagBag
     ____________       _____________
    ____    |__( )_________  __/__  /________      __
   ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
   ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
    _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
   [2020-11-08 19:09:09,005] {scheduler_job.py:1248} INFO - Starting the scheduler
   [2020-11-08 19:09:09,005] {scheduler_job.py:1253} INFO - Processing each file at most -1 times
   [2020-11-08 19:09:09,114] {scheduler_job.py:1275} INFO - Resetting orphaned tasks for active dag runs
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#discussion_r522098413



##########
File path: airflow/models/dagbag.py
##########
@@ -47,6 +47,16 @@
 from airflow.utils.session import provide_session
 from airflow.utils.timeout import timeout
 
+# TODO: Remove once deprecated
+if hasattr(settings, "policy"):
+    warnings.warn(
+        "Using `policy` in airflow_local_settings.py is deprecated. "
+        "Please rename your `policy` to `task_policy`.",
+        DeprecationWarning,
+        stacklevel=2,
+    )
+    setattr(settings, "task_policy", settings.policy)  # pylint: disable=no-member

Review comment:
       This should be done in `airflow.settings.import_local_settings` function please.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#discussion_r522169900



##########
File path: airflow/settings.py
##########
@@ -350,6 +377,17 @@ def import_local_settings():
                 if not k.startswith("__"):
                     globals()[k] = v
 
+        # TODO: Remove once deprecated
+        if "policy" in globals():

Review comment:
       Good Point




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#discussion_r522167930



##########
File path: airflow/settings.py
##########
@@ -350,6 +377,17 @@ def import_local_settings():
                 if not k.startswith("__"):
                     globals()[k] = v
 
+        # TODO: Remove once deprecated
+        if "policy" in globals():

Review comment:
       Worth supporting people who use 1.10 and 2.0 with the same file?
   
   ```suggestion
           if "policy" in globals() and "task_policy" not in globals():
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#discussion_r519458218



##########
File path: UPDATING.md
##########
@@ -50,6 +50,14 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Rename policy to task_policy
+
+Because Airflow introduced DAG level policy (`dag_policy`) we decided to rename existing `policy`
+function to `task_policy` to make the distinction more profound and avoid any confusion.
+
+Users using cluster policy need to rename their `policy` functions in `airflow_local_settings.py`
+to `task_policy`.
+

Review comment:
       I think we should deprecate it rather than remove.

##########
File path: airflow/models/dagbag.py
##########
@@ -383,8 +383,11 @@ def bag_dag(self, dag, root_dag):
         dag.resolve_template_files()
         dag.last_loaded = timezone.utcnow()
 
+        # Check policies
+        settings.dag_policy(dag)
+
         for task in dag.tasks:
-            settings.policy(task)
+            settings.task_policy(task)

Review comment:
       Here we should check if 'policy' is defined and run it as well but with deprecation warning.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #12184: Create DAG-level cluster policy

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #12184:
URL: https://github.com/apache/airflow/pull/12184#discussion_r520099154



##########
File path: UPDATING.md
##########
@@ -50,6 +50,14 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Rename policy to task_policy
+
+Because Airflow introduced DAG level policy (`dag_policy`) we decided to rename existing `policy`
+function to `task_policy` to make the distinction more profound and avoid any confusion.
+
+Users using cluster policy need to rename their `policy` functions in `airflow_local_settings.py`
+to `task_policy`.

Review comment:
       hmm... 🤔 yeah maybe keep it as it is




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org