You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/26 01:08:58 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #21829: Raise import error if a task uses a non-existent pool

ephraimbuddy opened a new pull request #21829:
URL: https://github.com/apache/airflow/pull/21829


   Closes: https://github.com/apache/airflow/issues/20788
   
   This PR proposes to raise import error if a task in a DAG is using
   a non-existent pool. This will free the scheduler from continuously
   trying to schedule a task with a non-existing pool
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1057646262


   > I may not be understanding fully well the idea you are proposing. If you don't mind, you can create a PR so we can compare the two?
   
   I'm just concerned that making this an import error will negatively impact developer experience, and I'm trying to help think about alternatives that achieve similar goals.  Because I think locally it's common you won't have all the pools, so it feels ilke it should be a warning of some kind but not a hard error (and if you just do a warning, presumably queries could be modified to filter out TIs with bad pools).   
   
   But if I think (1) being able to disable consideration of pools would make this work.  And another thing that I think would be really good would be (2) to make it so you didn't need to create the pools in the db e.g. if they could be defined in `airflow.cfg` or if you could do `AIRFLOW_POOL_MY_POOL=1`.  With either or both of these in place, I think raising would be ok.  It's easier to manage for local developers than having to add the pools to the db, and it would not be affected negatively by resetting the db.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1057445922


   > Ah i see. Well, alternatively we could amend the query to filter on pool in pools or join to pools table.
   
   In this case, the user won't know why the scheduler is not scheduling the task with non-existent pool.
   
   > I'm not sure we'd need to throw an error there. Couldn't we just not create the dag run? Or in any case, if we filter on "has existent pool" in the query above, it wouldn't even be considered for scheduling.
   
   The Dag would keep coming up for scheduler to create dagrun even when ignored and would block other eligible dags from creating dagruns
   
   > Perhaps alongside import_errors we could add an attribute configuration_errors or something to DagBag and then use this to bubble up a flash alert like [here](https://github.com/apache/airflow/blob/08575ddd8a72f96a3439f73e973ee9958188eb83/airflow/www/views.py#L784-L788).
   
   > If we had a configuration_errors thing, this would not be a hard error but something we'd want to warn user about. In this scenario we could also warn if pool has size 0. Thinking out loud a bit here.
   
   We currently have import error is pool size is less than 0, see [here](https://github.com/apache/airflow/blob/5b45a78dca284e504280964951f079fca1866226/airflow/models/baseoperator.py#L783-L785). That's still why I think we should have this as import error. I could have verified the pool name below the above verification but because we still commit when we use `provide_session`, the scheduler would crash.
   
   I may not be understanding fully well the idea you are proposing. If you don't mind, you can create a PR so we can compare the two?
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r815459948



##########
File path: airflow/models/dag.py
##########
@@ -2625,6 +2625,18 @@ def validate_schedule_and_params(self):
                     "DAG Schedule must be None, if there are any required params without default values"
                 )
 
+    @provide_session
+    def validate_task_pools(self, session=NEW_SESSION):
+        """Validates and raise exception if any task in a dag is using a non-existent pool"""
+        from airflow.models.pool import Pool
+
+        pools = [p.pool for p in Pool.get_pools(session)]

Review comment:
       Makes sense. Have used set to also get all the wrong pools




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r815897214



##########
File path: airflow/models/dag.py
##########
@@ -2625,6 +2625,17 @@ def validate_schedule_and_params(self):
                     "DAG Schedule must be None, if there are any required params without default values"
                 )
 
+    @provide_session
+    def validate_task_pools(self, session=NEW_SESSION):
+        """Validates and raise exception if any task in a dag is using a non-existent pool"""
+        from airflow.models.pool import Pool
+
+        pools = {p.pool for p in Pool.get_pools(session)}
+        task_pools = {task.pool for task in self.tasks}
+        diff = task_pools - pools
+        if diff:
+            raise PoolNotFound(f"The following pools: `{list(diff)}` does not exist in the database")

Review comment:
       ```suggestion
               raise PoolNotFound(f"The following pools: `{sorted(diff)}` does not exist in the database")
   ```
   
   For predicability.

##########
File path: airflow/models/dag.py
##########
@@ -2625,6 +2625,17 @@ def validate_schedule_and_params(self):
                     "DAG Schedule must be None, if there are any required params without default values"
                 )
 
+    @provide_session
+    def validate_task_pools(self, session=NEW_SESSION):
+        """Validates and raise exception if any task in a dag is using a non-existent pool"""
+        from airflow.models.pool import Pool
+
+        pools = {p.pool for p in Pool.get_pools(session)}
+        task_pools = {task.pool for task in self.tasks}
+        diff = task_pools - pools
+        if diff:
+            raise PoolNotFound(f"The following pools: `{list(diff)}` does not exist in the database")

Review comment:
       ```suggestion
               raise PoolNotFound(f"The following pools: `{sorted(diff)}` do not exist in the database")
   ```
   
   For predicability.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1057371224


   > I think it's a good idea to improve messaging around this but I'm not sure the best way. I think that ideally we allow the dag to exist, but maybe just bubble up a "flash" message alerting with the offending dag and task.
   
   The problem with the dag existing is that the scheduler would keep on trying to queue it. If you check the linked issue, it blocks some other tasks from being queued.
   
   > One reason is, if you are developing locally, you might want to use airflow dags list to verify your dag parses ok. Or you might want to run airflow dags test or airflow tasks test to run your task, without creating the pool. And what if perhaps you remove the pool to "temporarily disable" a set of tasks. Maybe in this case it's better to alert about "misconfiguration" but allow the dag to remain in the system.
   
   I thought about not creating dagrun if a task has wrong pool but it looks like raising error at that point could keep the scheduler crash looping. Also, since by default, if a task doesn't have a pool, it's assigned the `default_pool`, I think it's OK to raise import error if a task has a wrong pool. 
   This is something that can cause issues for the scheduler and it is a user error, that's why I think raising the error is better.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1057646262


   > I may not be understanding fully well the idea you are proposing. If you don't mind, you can create a PR so we can compare the two?
   
   I'm just concerned that making this an import error will negatively impact developer experience, and I'm trying to help think about alternatives that achieve similar goals.  Because I think locally it's common you won't have all the pools, so it feels ilke it should be a warning of some kind but not a hard error (and if you just do a warning, presumably the concerns in the scheduler can be addressed by modifying queries to filter out TIs with bad pools).   
   
   But if I think (1) being able to disable consideration of pools would make this work.  And another thing that I think would be really good would be (2) to make it so you didn't need to create the pools in the db e.g. if they could be defined in `airflow.cfg` or if you could do `AIRFLOW_POOL_MY_POOL=1`.  With either or both of these in place, I think raising would be ok.  It's easier to manage for local developers than having to add the pools to the db, and it would not be affected negatively by resetting the db.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r815249494



##########
File path: airflow/models/dag.py
##########
@@ -2625,6 +2625,18 @@ def validate_schedule_and_params(self):
                     "DAG Schedule must be None, if there are any required params without default values"
                 )
 
+    @provide_session
+    def validate_task_pools(self, session=NEW_SESSION):
+        """Validates and raise exception if any task in a dag is using a non-existent pool"""
+        from airflow.models.pool import Pool
+
+        pools = [p.pool for p in Pool.get_pools(session)]
+        for task in self.tasks:
+            if task.pool not in pools:
+                raise PoolNotFound(

Review comment:
       In the event of multiple tasks with non-existent pools, is it preferable to fail as fast as possible and only warn about the first such occurrence, or should we first check all of the tasks and then raise with an error which reports all of the issues?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r815347502



##########
File path: airflow/models/dag.py
##########
@@ -2625,6 +2625,18 @@ def validate_schedule_and_params(self):
                     "DAG Schedule must be None, if there are any required params without default values"
                 )
 
+    @provide_session
+    def validate_task_pools(self, session=NEW_SESSION):
+        """Validates and raise exception if any task in a dag is using a non-existent pool"""
+        from airflow.models.pool import Pool
+
+        pools = [p.pool for p in Pool.get_pools(session)]

Review comment:
       Do you think it would be more performant to use a set instead of a list here since we're doing potentially a lot of `task not in pools`? 
   
   The list is pretty small so I'm not sure how significant the difference would be.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1057411344


   Other comment along same theme as my other comments, at prev company I had unit tests testing "all dags parse without error".  If we make this change, then we'd have to add a step to our CI testing process to create the pools (and recreate them locally to run tests locally). Small thing but might but could be inconvenient for folks.  Disableable pools (or autocreate pools) would avoid this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1058194655


   > The local development story could be (partially?) fixed by having `dags test` or `tasks test` not care about pools, but "normal" parsing should care and error?
   
   Just saw that `dags test` requires a pool. That if the pool is not existing, the taste is not run. It fails at https://github.com/apache/airflow/blob/0d856b1ab9804dd5d4dc95537e3bbff18488aaa3/airflow/jobs/backfill_job.py#L545-L547
   
   `tasks test` also gives the error that dependencies are not met if the pool is not existing:
   
   ```log
   [2022-03-03 15:51:31,106] {taskinstance.py:1044} INFO - Dependencies not met for <TaskInstance: example_bash_operator.run_this_last __airflow_in_memory_dagrun__ [None]>, dependency 'Pool Slots Available' FAILED: ("Tasks using non-existent pool '%s' will not be scheduled", 'test-pool')
   ```
   Tested on main
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r815274573



##########
File path: airflow/models/dag.py
##########
@@ -2625,6 +2625,18 @@ def validate_schedule_and_params(self):
                     "DAG Schedule must be None, if there are any required params without default values"
                 )
 
+    @provide_session
+    def validate_task_pools(self, session=NEW_SESSION):
+        """Validates and raise exception if any task in a dag is using a non-existent pool"""
+        from airflow.models.pool import Pool
+
+        pools = [p.pool for p in Pool.get_pools(session)]
+        for task in self.tasks:
+            if task.pool not in pools:
+                raise PoolNotFound(

Review comment:
       I thought about that and feel that failing as fast as possible would be faster because a dag may have a lot of tasks and walking through all the tasks is likely to take time. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r815896292



##########
File path: airflow/models/dag.py
##########
@@ -2625,6 +2625,17 @@ def validate_schedule_and_params(self):
                     "DAG Schedule must be None, if there are any required params without default values"
                 )
 
+    @provide_session
+    def validate_task_pools(self, session=NEW_SESSION):

Review comment:
       Lets annotate this? Also probably want to add `:meta private:` so this does not become public API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r815274377



##########
File path: tests/dags/test_non_existing_pool.py
##########
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+
+from airflow.models import DAG
+from airflow.operators.bash import BashOperator
+
+DEFAULT_DATE = datetime(2016, 1, 1)

Review comment:
       Not needed...copy paste :(




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r815460895



##########
File path: tests/dags/test_issue_1225.py
##########
@@ -43,7 +43,7 @@ def fail():
 dag1_task1 = DummyOperator(
     task_id='test_backfill_pooled_task',
     dag=dag1,
-    pool='test_backfill_pooled_task_pool',
+    pool='default_pool',

Review comment:
       I don't think the name of the pool is relevant here, so I changed this to an existing pool name. With git blame, I saw that when this file was created(https://github.com/apache/airflow/pull/1271), there was no `default_pool ` for tasks. 
   
   Happy to hear your thoughts on this
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1057646262


   > > Ah i see. Well, alternatively we could amend the query to filter on pool in pools or join to pools table.
   >
   > In this case, the user won't know why the scheduler is not scheduling the task with non-existent pool.
   
   We'd still want to alert users to the bad configuration.  Was just saying maybe don't fail the dag parse.  E.g. along with `import_errors`, store `configuration_errors` or something.
   
   > > I'm not sure we'd need to throw an error there. Couldn't we just not create the dag run? Or in any case, if we filter on "has existent pool" in the query above, it wouldn't even be considered for scheduling.
   >
   > The Dag would keep coming up for scheduler to create dagrun even when ignored and would block other eligible dags from creating dagruns
   
   I guess it's not dag run (since they don't have pools) but task instance... But anyway if we join to pool they'd be filtered out right?  Anyway just an idea.
   
   > > Perhaps alongside import_errors we could add an attribute configuration_errors or something to DagBag and then use this to bubble up a flash alert like [here](https://github.com/apache/airflow/blob/08575ddd8a72f96a3439f73e973ee9958188eb83/airflow/www/views.py#L784-L788).
   > > If we had a configuration_errors thing, this would not be a hard error but something we'd want to warn user about. In this scenario we could also warn if pool has size 0. Thinking out loud a bit here.
   > 
   > We currently have import error is pool size is less than 0, see [here](https://github.com/apache/airflow/blob/5b45a78dca284e504280964951f079fca1866226/airflow/models/baseoperator.py#L783-L785). That's still why I think we should have this as import error. I could have verified the pool name below the above verification but because we still commit when we use `provide_session`, the scheduler would crash.
   
   That is something slightly different.  That's how many slots that particular task should be configured to occupy.  E.g. if you have 10 slots in your "sql server" queue, and you have a very heavy query, you could have it take up 5 slots -- but it's different from pool size.
   
   > I may not be understanding fully well the idea you are proposing. If you don't mind, you can create a PR so we can compare the two?
   
   I'm just concerned that making this an import error will negatively impact developer experience, and I'm trying to help think about alternatives that achieve similar goals.  Because I think locally it's common you won't have all the pools, so it feels ilke it should be a warning of some kind but not a hard error (and if you just do a warning, presumably queries could be modified to filter out TIs with bad pools).   
   
   But if I think (1) being able to disable consideration of pools would make this work.  And another thing that I think would be really good would be (2) to make it so you didn't need to create the pools in the db e.g. if they could be defined in `airflow.cfg` or if you could do `AIRFLOW_POOL_MY_POOL=1`.  With either or both of these in place, I think raising would be ok.  It's easier to manage for local developers than having to add the pools to the db, and it would not be affected negatively by resetting the db.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1058097682


   The local development story could be (partially?) fixed by having `dags test` or `tasks test` not care about pools, but "normal" parsing should care and error?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r815347290



##########
File path: airflow/models/dag.py
##########
@@ -2625,6 +2625,18 @@ def validate_schedule_and_params(self):
                     "DAG Schedule must be None, if there are any required params without default values"
                 )
 
+    @provide_session
+    def validate_task_pools(self, session=NEW_SESSION):
+        """Validates and raise exception if any task in a dag is using a non-existent pool"""
+        from airflow.models.pool import Pool
+
+        pools = [p.pool for p in Pool.get_pools(session)]
+        for task in self.tasks:
+            if task.pool not in pools:
+                raise PoolNotFound(

Review comment:
       I think either way is fine, but looking into it more I think that most other DAG parser failure modes (ImportError, Syntax Errors, AirflowClusterPolicyViolation, etc) will fail on the first error, os we should probably do the same here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r815895910



##########
File path: tests/dags/test_issue_1225.py
##########
@@ -43,7 +43,7 @@ def fail():
 dag1_task1 = DummyOperator(
     task_id='test_backfill_pooled_task',
     dag=dag1,
-    pool='test_backfill_pooled_task_pool',
+    pool='default_pool',

Review comment:
       Anyone knows the context when #1225 was filed? Was it possible to schedule a task _without_ a pool back then?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1057411344


   Other comment along same theme as my other comments, at prev company I had unit tests testing "all dags parse without error".  If we make this change, then we'd have to add a step to our CI testing process to create the pools. Small thing but might but could be inconvenient for folks.  Disableable pools (or autocreate pools) would avoid this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1057411344


   Other comment along same theme as my other comments, at prev company I had unit tests testing "all dags parse without error".  If we make this change, then we'd have to add a step to our CI testing process to create the pools. Small thing but might but could be inconvenient for folks.  Disableable pools (or autocreate pools) could resolve.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1057646262


   > I may not be understanding fully well the idea you are proposing. If you don't mind, you can create a PR so we can compare the two?
   
   I'm just concerned that making this an import error will negatively impact developer experience, and I'm trying to help think about alternatives that achieve similar goals.  Because I think locally it's common you won't have all the pools, so it feels ilke it should be a warning of some kind but not a hard error (and if you just do a warning, presumably the concerns in the scheduler can be addressed by modifying queries to filter out TIs with bad pools).   
   
   But I think (1) being able to disable consideration of pools would make this work.  And another thing that I think would work, and which would be really nice in general, would be (2) to make it so you didn't need to create the pools _in the db_ e.g. if they could be defined in `airflow.cfg` or if you could do `AIRFLOW_POOL_MY_POOL=1`.  With either or both of these in place, I think raising would be ok.  It's easier to manage for local developers than having to add the pools to the db, and it would not be affected negatively by resetting the db.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r818024580



##########
File path: airflow/models/dag.py
##########
@@ -2630,6 +2630,21 @@ def validate_schedule_and_params(self):
                     "DAG Schedule must be None, if there are any required params without default values"
                 )
 
+    @provide_session
+    def validate_task_pools(self, session: Session = NEW_SESSION):
+        """
+        Validates and raise exception if any task in a dag is using a non-existent pool
+
+        :meta private:
+        """
+        from airflow.models.pool import Pool
+
+        pools = {p.pool for p in Pool.get_pools(session)}
+        task_pools = {task.pool for task in self.tasks}
+        diff = task_pools - pools
+        if diff:
+            raise PoolNotFound(f"The following pools: `{sorted(diff)}` do not exist in the database")

Review comment:
       ```suggestion
               raise PoolNotFound(f"The following pools do not exist in the database: `{sorted(diff)}`")
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
dstandish commented on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1057405118


   > > I think it's a good idea to improve messaging around this but I'm not sure the best way. I think that ideally we allow the dag to exist, but maybe just bubble up a "flash" message alerting with the offending dag and task.
   > 
   > The problem with the dag existing is that the scheduler would keep on trying to queue it. If you check the linked issue, it blocks some other tasks from being queued.
   
   Ah i see.  Well, alternatively we could amend the query to filter on `pool in pools` or join to pools table.
   
   > > One reason is, if you are developing locally, you might want to use airflow dags list to verify your dag parses ok. Or you might want to run airflow dags test or airflow tasks test to run your task, without creating the pool. And what if perhaps you remove the pool to "temporarily disable" a set of tasks. Maybe in this case it's better to alert about "misconfiguration" but allow the dag to remain in the system.
   > 
   > I thought about not creating dagrun if a task has wrong pool but it looks like raising error at that point could keep the scheduler crash looping. 
   
   I'm not sure we'd need to throw an error there.  Couldn't we just not create the dag run?  Or in any case, if we filter on "has existent pool" in the query above, it wouldn't even be _considered_ for scheduling.
   
   Perhaps alongside `import_errors` we could add an attribute `configuration_errors` or something to DagBag and then use this to bubble up a flash alert like [here](https://github.com/apache/airflow/blob/08575ddd8a72f96a3439f73e973ee9958188eb83/airflow/www/views.py#L784-L788).  
   
   If we had a `configuration_errors` thing, this would not be a hard error but something we'd want to warn user about.  In this scenario we could also warn if pool has size 0. Thinking out loud a bit here.
   
   > Also, since by default, if a task doesn't have a pool, it's assigned the `default_pool`, I think it's OK to raise import error if a task has a wrong pool. This is something that can cause issues for the scheduler and it is a user error, that's why I think raising the error is better.
   
   Yeah I'm just thinking about the local developer experience.  
   
   ### idea
   
   You know, maybe another way to this concern while keeping your approach would be something like `AIRFLOW__SCHEDULER__IGNORE_POOLS` which you could set True for local dev and then this would be non-issue for local dev.  Then locally the dag parses, and it runs, and no need to worry about pools.
   
   ### another idea
   
   What if instead of failing in this scenario, we _create_ the pool with default size = 10 or something?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
uranusjr commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r815896292



##########
File path: airflow/models/dag.py
##########
@@ -2625,6 +2625,17 @@ def validate_schedule_and_params(self):
                     "DAG Schedule must be None, if there are any required params without default values"
                 )
 
+    @provide_session
+    def validate_task_pools(self, session=NEW_SESSION):

Review comment:
       Lets annotate this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish edited a comment on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
dstandish edited a comment on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1057646262


   > I may not be understanding fully well the idea you are proposing. If you don't mind, you can create a PR so we can compare the two?
   
   I'm just concerned that making this an import error will negatively impact developer experience, and I'm trying to help think about alternatives that achieve similar goals.  Because I think locally it's common you won't have all the pools, so it feels ilke it should be a warning of some kind but not a hard error (and if you just do a warning, presumably the concerns in the scheduler can be addressed by modifying queries to filter out TIs with bad pools).   
   
   But I think (1) being able to disable consideration (in the scheduler) of pools would make this work.  And another thing that I think would work, and which would be really nice in general, would be (2) to make it so you didn't need to create the pools _in the db_ e.g. if they could be defined in `airflow.cfg` or if you could do `AIRFLOW_POOL_MY_POOL=1`.  With either or both of these in place, I think raising would be ok.  It's easier to manage for local developers than having to add the pools to the db, and it would not be affected negatively by resetting the db.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#discussion_r815249086



##########
File path: tests/dags/test_non_existing_pool.py
##########
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datetime import datetime
+
+from airflow.models import DAG
+from airflow.operators.bash import BashOperator
+
+DEFAULT_DATE = datetime(2016, 1, 1)

Review comment:
       Is this value needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1057717954


   > I'm just concerned that making this an import error will negatively impact developer experience, and I'm trying to help think about alternatives that achieve similar goals. Because I think locally it's common you won't have all the pools, so it feels ilke it should be a warning of some kind but not a hard error (and if you just do a warning, presumably the concerns in the scheduler can be addressed by modifying queries to filter out TIs with bad pools).
   
   
   I understand and I'm not arguing about it.
   
   > But I think (1) being able to disable consideration (in the scheduler) of pools would make this work. And another thing that I think would work, and which would be really nice in general, would be (2) to make it so you didn't need to create the pools in the db e.g. if they could be defined in airflow.cfg or if you could do AIRFLOW_POOL_MY_POOL=1. With either or both of these in place, I think raising would be ok. It's easier to manage for local developers than having to add the pools to the db, and it would not be affected negatively by resetting the db.
   
   
   I'll spend some time on it today to make a different PRs based on the suggestions
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1058194655


   > The local development story could be (partially?) fixed by having `dags test` or `tasks test` not care about pools, but "normal" parsing should care and error?
   
   Just saw that `dags test` requires a pool. That if the pool is not existing, the task is not run. It fails at https://github.com/apache/airflow/blob/0d856b1ab9804dd5d4dc95537e3bbff18488aaa3/airflow/jobs/backfill_job.py#L545-L547
   
   `tasks test` also gives the error that dependencies are not met if the pool is not existing:
   
   ```log
   [2022-03-03 15:51:31,106] {taskinstance.py:1044} INFO - Dependencies not met for <TaskInstance: example_bash_operator.run_this_last __airflow_in_memory_dagrun__ [None]>, dependency 'Pool Slots Available' FAILED: ("Tasks using non-existent pool '%s' will not be scheduled", 'test-pool')
   ```
   Tested on main
   
   However, `dags list` is different. It shows all dags even if they have wrong pool


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy edited a comment on pull request #21829: Raise import error if a task uses a non-existent pool

Posted by GitBox <gi...@apache.org>.
ephraimbuddy edited a comment on pull request #21829:
URL: https://github.com/apache/airflow/pull/21829#issuecomment-1058194655


   > The local development story could be (partially?) fixed by having `dags test` or `tasks test` not care about pools, but "normal" parsing should care and error?
   
   Just saw that `dags test` requires a pool. That if the pool is not existing, the task is not run. It fails at https://github.com/apache/airflow/blob/0d856b1ab9804dd5d4dc95537e3bbff18488aaa3/airflow/jobs/backfill_job.py#L545-L547
   
   `tasks test` also gives the error that dependencies are not met if the pool is not existing:
   
   ```log
   [2022-03-03 15:51:31,106] {taskinstance.py:1044} INFO - Dependencies not met for <TaskInstance: example_bash_operator.run_this_last __airflow_in_memory_dagrun__ [None]>, dependency 'Pool Slots Available' FAILED: ("Tasks using non-existent pool '%s' will not be scheduled", 'test-pool')
   ```
   Tested on main
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org