You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/11/02 13:37:06 UTC

[GitHub] [airflow] ephraimbuddy opened a new pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

ephraimbuddy opened a new pull request #19367:
URL: https://github.com/apache/airflow/pull/19367


   The dag_model.max_active_runs should actually be set when DAG is parsed
   or DagModel is instantiated but for some reason, it looks like it doesn't happen so.
   This PR uses dag.max_active_runs instead
   
   Slack conversation: https://apache-airflow.slack.com/archives/CCQ7EGB1P/p1635853475404200
   
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/main/UPDATING.md).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r754296698



##########
File path: docs/apache-airflow/migrations-ref.rst
##########
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``7b2661a43ba3`` (head)        | ``142555e44c17`` | ``2.2.0``       | Change ``TaskInstance`` and ``TaskReschedule`` tables from execution_date to run_id.  |
+| ``be2bfac3da23`` (head)        | ``7b2661a43ba3`` | ``2.2.3``       | Add has_import_errors column to DagModel                                              |

Review comment:
       I'm tempted to say this should be 2.3.0 -- it's maybe a bug fix, but this behaviour has been the same for all of 2.x so far.
   
   What do others think here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r754519696



##########
File path: tests/dag_processing/test_processor.py
##########
@@ -481,6 +481,34 @@ def test_add_unparseable_zip_file_creates_import_error(self, tmpdir):
             assert import_error.stacktrace == f"invalid syntax ({TEMP_DAG_FILENAME}, line 1)"
             session.rollback()
 
+    @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
+    def test_dag_model_has_import_error_is_true_when_import_error_exists(self, tmpdir):
+        dag_file = os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py")
+        temp_dagfile = os.path.join(tmpdir, TEMP_DAG_FILENAME)
+        with open(dag_file) as main_dag, open(temp_dagfile, 'w') as next_dag:
+            for line in main_dag:
+                next_dag.write(line)
+        session = settings.Session()
+        # first we parse the dag
+        self._process_file(temp_dagfile, session)
+        # assert DagModel.has_import_errors is false
+        dm = session.query(DagModel).filter(DagModel.fileloc == temp_dagfile).first()
+        assert not dm.has_import_errors
+        # corrupt the file
+        with open(temp_dagfile, 'a') as file:
+            file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+
+        self._process_file(temp_dagfile, session)
+        import_errors = session.query(errors.ImportError).all()
+
+        assert len(import_errors) == 1
+        import_error = import_errors[0]
+        assert import_error.filename == temp_dagfile
+        assert import_error.stacktrace == f"invalid syntax ({TEMP_DAG_FILENAME}, line 53)"

Review comment:
       At least in my view, it's needlessly fragile. since we append, that line number could change if that test dag is updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-959724479






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742143431



##########
File path: airflow/models/dag.py
##########
@@ -2743,7 +2743,9 @@ def __init__(self, concurrency=None, **kwargs):
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
         if self.max_active_runs is None:
-            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
+            self.max_active_runs = kwargs.get(
+                'max_active_runs', conf.getint('core', 'max_active_runs_per_dag', fallback=16)
+            )

Review comment:
       Wait, why do we need the fallback? It should get it from the default config anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-961486413


   > This feels like only a partial fix.
   > 
   > If a DAG is currently working (and has max_active_runs) and _then_ the file is updated and now has an error, the DAG will still be scheduled, but it probably shouldn't be.
   > 
   > To do that without needing to join against ImportError perhaps we need to add a new column to DAG -- we have `is_active` which is close, but setting that to false hides it from the UI which isn't the behaviour we want here.
   
   I have updated it and it looks good. Thanks for the review!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-976531162


   Yeah, we _could_ do it with a join, but this is in the hot path for the main scheduler loop so it seems better to denormalise it here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-966412214


   > > > Happy to discuss about if we still need the max_active_runs condition or not too.
   > > 
   > > 
   > > I agree we no longer need it.
   > > I restricted the creation of dagruns on the UI and API too. Let me know if I should remove these restrictions
   > 
   > If you want to reduce duplication, but keep the rollback you can go from this:
   > 
   > ```python
   >      def test_dags_needing_dagruns_not_too_early(self):
   >          ...
   >         session = settings.Session()
   > ```
   > 
   > to this:
   > 
   > ```python
   >      def test_dags_needing_dagruns_not_too_early(self, session):
   > ```
   > 
   > And remove the `session = settings.Session()`. That session pytest fixture defined in `tests/conftest.py` already does the rollback.
   > 
   > But that should probably be a separate change to this PR.
   
   That's true, will work on it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19367: Do not send DAGs with import errors to the scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742659192



##########
File path: airflow/models/dag.py
##########
@@ -2880,12 +2883,16 @@ def dags_needing_dagruns(cls, session: Session):
         # TODO[HA]: Bake this query, it is run _A lot_
         # We limit so that _one_ scheduler doesn't try to do all the creation
         # of dag runs
+        # exclude dags with import error
+        import_errors = [c.filename for c in session.query(importerror.filename).all()]
         query = (
             session.query(cls)
             .filter(
+                cls.max_active_runs != None,  # noqa
                 cls.is_paused == expression.false(),
                 cls.is_active == expression.true(),
                 cls.next_dagrun_create_after <= func.now(),
+                cls.fileloc.notin_(import_errors),

Review comment:
       We should find another way of doing this -- this function is part of the "hot" path of the scheduler, and a) adding an extra query and by sending (possibly) a large number of params via this "NOT IN (?,?,?)" etc is not a great idea. 

##########
File path: airflow/models/dag.py
##########
@@ -2880,12 +2883,16 @@ def dags_needing_dagruns(cls, session: Session):
         # TODO[HA]: Bake this query, it is run _A lot_
         # We limit so that _one_ scheduler doesn't try to do all the creation
         # of dag runs
+        # exclude dags with import error
+        import_errors = [c.filename for c in session.query(importerror.filename).all()]
         query = (
             session.query(cls)
             .filter(
+                cls.max_active_runs != None,  # noqa

Review comment:
       What does this have to do with import errors?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-959724479






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19367: Do not send DAGs with import errors to the scheduler

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742361388



##########
File path: airflow/models/dag.py
##########
@@ -2880,12 +2883,16 @@ def dags_needing_dagruns(cls, session: Session):
         # TODO[HA]: Bake this query, it is run _A lot_
         # We limit so that _one_ scheduler doesn't try to do all the creation
         # of dag runs
+        # exclude dags with import error
+        import_errors = [c.filename for c in session.query(importerror.filename).all()]

Review comment:
       Can you test this with zip file too please




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r743252849



##########
File path: tests/models/test_dag.py
##########
@@ -1848,9 +1888,6 @@ def test_dags_needing_dagruns_not_too_early(self):
         dag_models = DagModel.dags_needing_dagruns(session).all()
         assert dag_models == []
 
-        session.rollback()
-        session.close()

Review comment:
       Removed this because of the added clean_db method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-957958057


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-959724479






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742146452



##########
File path: airflow/models/dag.py
##########
@@ -2743,7 +2743,9 @@ def __init__(self, concurrency=None, **kwargs):
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
         if self.max_active_runs is None:
-            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
+            self.max_active_runs = kwargs.get(
+                'max_active_runs', conf.getint('core', 'max_active_runs_per_dag', fallback=16)
+            )

Review comment:
       I just added it to be safe in case max_active_runs_per_dag is removed from the configuration. 
   Can't think of any other reason why they'd get that errror




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r743970499



##########
File path: airflow/dag_processing/processor.py
##########
@@ -533,6 +533,10 @@ def update_import_errors(session: Session, dagbag: DagBag) -> None:
 
         # Add the errors of the processed files
         for filename, stacktrace in dagbag.import_errors.items():
+            dag_model = session.query(DagModel).filter(DagModel.fileloc == filename).first()

Review comment:
       I don't think this can be on just the first DagModel, don't we need to set it on all of them from that file?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r754517264



##########
File path: tests/models/test_dag.py
##########
@@ -822,6 +822,36 @@ def test_bulk_write_to_db_max_active_runs(self, state):
         model = session.query(DagModel).get((dag.dag_id,))
         assert model.next_dagrun_create_after is None
 
+    def test_bulk_write_to_db_has_import_error(self):
+        """
+        Test that DagModel.has_import_error is set to false if no import errors.
+        """
+        dag = DAG(dag_id='test_has_import_error', start_date=DEFAULT_DATE)
+
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()

Review comment:
       Ah, yeah. That's better




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r754546911



##########
File path: tests/models/test_dag.py
##########
@@ -822,6 +822,36 @@ def test_bulk_write_to_db_max_active_runs(self, state):
         model = session.query(DagModel).get((dag.dag_id,))
         assert model.next_dagrun_create_after is None
 
+    def test_bulk_write_to_db_has_import_error(self):
+        """
+        Test that DagModel.has_import_error is set to false if no import errors.
+        """
+        dag = DAG(dag_id='test_has_import_error', start_date=DEFAULT_DATE)
+
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()

Review comment:
       Turns out the class TestDag is using unittest.TestCase and not pytest. Wondering if it's worth it to convert it to pytest in this PR so we can use the session fixture or create a session fixture inside the unittest class as shown here?: https://pytest.org/en/latest/how-to/unittest.html#using-autouse-fixtures-and-accessing-other-fixtures




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy merged pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
ephraimbuddy merged pull request #19367:
URL: https://github.com/apache/airflow/pull/19367


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-959724479


   > Can we add a test for this case please?
   > 
   > Separately let's check why `max_active_runs` is not set via tests too
   
   I can't figure a test to add. 
   My guess for the issue is that they removed `max_active_runs_per_dag`  from their configuration. 
   To check this, I have added a fallback and also updated dag's test to ensure once DagModel is instantiated, it'll have a `max_active_runs`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742146452



##########
File path: airflow/models/dag.py
##########
@@ -2743,7 +2743,9 @@ def __init__(self, concurrency=None, **kwargs):
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
         if self.max_active_runs is None:
-            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
+            self.max_active_runs = kwargs.get(
+                'max_active_runs', conf.getint('core', 'max_active_runs_per_dag', fallback=16)
+            )

Review comment:
       I just added it to be safe in case max_active_runs_per_dag is removed from the configuration. 
   Can't think of any other reason why they'd get that errror




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r743252849



##########
File path: tests/models/test_dag.py
##########
@@ -1848,9 +1888,6 @@ def test_dags_needing_dagruns_not_too_early(self):
         dag_models = DagModel.dags_needing_dagruns(session).all()
         assert dag_models == []
 
-        session.rollback()
-        session.close()

Review comment:
       Removed this because of the added clean_db method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-961486413


   > This feels like only a partial fix.
   > 
   > If a DAG is currently working (and has max_active_runs) and _then_ the file is updated and now has an error, the DAG will still be scheduled, but it probably shouldn't be.
   > 
   > To do that without needing to join against ImportError perhaps we need to add a new column to DAG -- we have `is_active` which is close, but setting that to false hides it from the UI which isn't the behaviour we want here.
   
   I have updated it and it looks good. Thanks for the review!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-977147778


   Denormalise to optimise makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r754430619



##########
File path: tests/dag_processing/test_processor.py
##########
@@ -481,6 +481,34 @@ def test_add_unparseable_zip_file_creates_import_error(self, tmpdir):
             assert import_error.stacktrace == f"invalid syntax ({TEMP_DAG_FILENAME}, line 1)"
             session.rollback()
 
+    @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
+    def test_dag_model_has_import_error_is_true_when_import_error_exists(self, tmpdir):
+        dag_file = os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py")
+        temp_dagfile = os.path.join(tmpdir, TEMP_DAG_FILENAME)
+        with open(dag_file) as main_dag, open(temp_dagfile, 'w') as next_dag:
+            for line in main_dag:
+                next_dag.write(line)
+        session = settings.Session()
+        # first we parse the dag
+        self._process_file(temp_dagfile, session)
+        # assert DagModel.has_import_errors is false
+        dm = session.query(DagModel).filter(DagModel.fileloc == temp_dagfile).first()
+        assert not dm.has_import_errors
+        # corrupt the file
+        with open(temp_dagfile, 'a') as file:
+            file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+
+        self._process_file(temp_dagfile, session)
+        import_errors = session.query(errors.ImportError).all()
+
+        assert len(import_errors) == 1
+        import_error = import_errors[0]
+        assert import_error.filename == temp_dagfile
+        assert import_error.stacktrace == f"invalid syntax ({TEMP_DAG_FILENAME}, line 53)"

Review comment:
       ```suggestion
   ```
   
   nit: Do we need to assert the traceback?

##########
File path: tests/dag_processing/test_processor.py
##########
@@ -481,6 +481,34 @@ def test_add_unparseable_zip_file_creates_import_error(self, tmpdir):
             assert import_error.stacktrace == f"invalid syntax ({TEMP_DAG_FILENAME}, line 1)"
             session.rollback()
 
+    @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
+    def test_dag_model_has_import_error_is_true_when_import_error_exists(self, tmpdir):
+        dag_file = os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py")
+        temp_dagfile = os.path.join(tmpdir, TEMP_DAG_FILENAME)
+        with open(dag_file) as main_dag, open(temp_dagfile, 'w') as next_dag:
+            for line in main_dag:
+                next_dag.write(line)
+        session = settings.Session()
+        # first we parse the dag
+        self._process_file(temp_dagfile, session)
+        # assert DagModel.has_import_errors is false
+        dm = session.query(DagModel).filter(DagModel.fileloc == temp_dagfile).first()
+        assert not dm.has_import_errors
+        # corrupt the file
+        with open(temp_dagfile, 'a') as file:
+            file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+
+        self._process_file(temp_dagfile, session)
+        import_errors = session.query(errors.ImportError).all()
+
+        assert len(import_errors) == 1
+        import_error = import_errors[0]
+        assert import_error.filename == temp_dagfile
+        assert import_error.stacktrace == f"invalid syntax ({TEMP_DAG_FILENAME}, line 53)"
+        session.rollback()

Review comment:
       I don't think this is actually doing anything - `update_import_errors` does a commit already. That said, this should probably stay and happen at the end of this test case.

##########
File path: tests/models/test_dag.py
##########
@@ -1901,6 +1932,36 @@ def test_dags_needing_dagruns_only_unpaused(self):
         session.rollback()
         session.close()
 
+    def test_dags_needing_dagruns_doesnot_send_dagmodel_with_import_errors(self, session):
+        """
+        We check that has_import_error is false for dags
+        being set to scheduler to create dagruns
+        """
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=DEFAULT_DATE,
+            next_dagrun_create_after=DEFAULT_DATE + timedelta(days=1),
+            is_active=True,
+        )
+        assert not orm_dag.has_import_errors
+        session.add(orm_dag)
+        session.flush()
+
+        needed = DagModel.dags_needing_dagruns(session).all()
+        assert needed == [orm_dag]
+        orm_dag.has_import_errors = True
+        session.merge(orm_dag)
+        session.flush()
+        needed = DagModel.dags_needing_dagruns(session).all()
+        assert needed == []
+
+        session.rollback()
+        session.close()

Review comment:
       ```suggestion
   ```
   
   The session fixture already does a rollback.

##########
File path: tests/models/test_dag.py
##########
@@ -822,6 +822,36 @@ def test_bulk_write_to_db_max_active_runs(self, state):
         model = session.query(DagModel).get((dag.dag_id,))
         assert model.next_dagrun_create_after is None
 
+    def test_bulk_write_to_db_has_import_error(self):
+        """
+        Test that DagModel.has_import_error is set to false if no import errors.
+        """
+        dag = DAG(dag_id='test_has_import_error', start_date=DEFAULT_DATE)
+
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()

Review comment:
       ```suggestion
       def test_bulk_write_to_db_has_import_error(self, session):
           """
           Test that DagModel.has_import_error is set to false if no import errors.
           """
           dag = DAG(dag_id='test_has_import_error', start_date=DEFAULT_DATE)
   
           DummyOperator(task_id='dummy', dag=dag, owner='airflow')
   
   ```
   
   Use the session fixture?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742146452



##########
File path: airflow/models/dag.py
##########
@@ -2743,7 +2743,9 @@ def __init__(self, concurrency=None, **kwargs):
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
         if self.max_active_runs is None:
-            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
+            self.max_active_runs = kwargs.get(
+                'max_active_runs', conf.getint('core', 'max_active_runs_per_dag', fallback=16)
+            )

Review comment:
       I just added it to be safe in case max_active_runs_per_dag is removed from the configuration. 
   Can't think of any other reason why they'd get that errror




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742146452



##########
File path: airflow/models/dag.py
##########
@@ -2743,7 +2743,9 @@ def __init__(self, concurrency=None, **kwargs):
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
         if self.max_active_runs is None:
-            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
+            self.max_active_runs = kwargs.get(
+                'max_active_runs', conf.getint('core', 'max_active_runs_per_dag', fallback=16)
+            )

Review comment:
       I just added it to be safe in case max_active_runs_per_dag is removed from the configuration. 
   Can't think of any other reason why they'd get that errror




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19367: Do not send DAGs with import errors to the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-960082182


   I discovered that this happens because the DAG has import error and is active after upgrading. So scheduler was trying to create dagruns for it. 
   I have updated the code and commit message. 
   cc: @jedcunningham  @kaxil 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742712770



##########
File path: airflow/models/dag.py
##########
@@ -2880,12 +2883,16 @@ def dags_needing_dagruns(cls, session: Session):
         # TODO[HA]: Bake this query, it is run _A lot_
         # We limit so that _one_ scheduler doesn't try to do all the creation
         # of dag runs
+        # exclude dags with import error
+        import_errors = [c.filename for c in session.query(importerror.filename).all()]
         query = (
             session.query(cls)
             .filter(
+                cls.max_active_runs != None,  # noqa
                 cls.is_paused == expression.false(),
                 cls.is_active == expression.true(),
                 cls.next_dagrun_create_after <= func.now(),
+                cls.fileloc.notin_(import_errors),

Review comment:
       I have updated it to exclude those with null max_active_runs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r754517808



##########
File path: tests/dag_processing/test_processor.py
##########
@@ -481,6 +481,34 @@ def test_add_unparseable_zip_file_creates_import_error(self, tmpdir):
             assert import_error.stacktrace == f"invalid syntax ({TEMP_DAG_FILENAME}, line 1)"
             session.rollback()
 
+    @conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
+    def test_dag_model_has_import_error_is_true_when_import_error_exists(self, tmpdir):
+        dag_file = os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py")
+        temp_dagfile = os.path.join(tmpdir, TEMP_DAG_FILENAME)
+        with open(dag_file) as main_dag, open(temp_dagfile, 'w') as next_dag:
+            for line in main_dag:
+                next_dag.write(line)
+        session = settings.Session()
+        # first we parse the dag
+        self._process_file(temp_dagfile, session)
+        # assert DagModel.has_import_errors is false
+        dm = session.query(DagModel).filter(DagModel.fileloc == temp_dagfile).first()
+        assert not dm.has_import_errors
+        # corrupt the file
+        with open(temp_dagfile, 'a') as file:
+            file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS)
+
+        self._process_file(temp_dagfile, session)
+        import_errors = session.query(errors.ImportError).all()
+
+        assert len(import_errors) == 1
+        import_error = import_errors[0]
+        assert import_error.filename == temp_dagfile
+        assert import_error.stacktrace == f"invalid syntax ({TEMP_DAG_FILENAME}, line 53)"

Review comment:
       I think there's no harm in asserting it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-976517464


   > Do we really need a new field? I think we can instead join `ImportError` on `import_error.filename = dag.fileloc` and error if the result is not empty.
   
   It looks like it'll not be that efficient. See 
   https://github.com/apache/airflow/pull/19367#discussion_r742659192
   
   Not sure the impact the join would have.
   cc: @ashb 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-957958057


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-966246554


   > Happy to discuss about if we still need the max_active_runs condition or not too.
   
   I agree we no longer need it.
   
   I restricted the creation of dagruns on the UI and API too. Let me know if I should remove these restrictions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r747386557



##########
File path: airflow/models/dag.py
##########
@@ -2880,11 +2884,14 @@ def dags_needing_dagruns(cls, session: Session):
         # TODO[HA]: Bake this query, it is run _A lot_
         # We limit so that _one_ scheduler doesn't try to do all the creation
         # of dag runs
+
         query = (
             session.query(cls)
             .filter(
+                cls.max_active_runs != None,  # noqa

Review comment:
       Yes. Will remove it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r754601174



##########
File path: tests/models/test_dag.py
##########
@@ -822,6 +822,36 @@ def test_bulk_write_to_db_max_active_runs(self, state):
         model = session.query(DagModel).get((dag.dag_id,))
         assert model.next_dagrun_create_after is None
 
+    def test_bulk_write_to_db_has_import_error(self):
+        """
+        Test that DagModel.has_import_error is set to false if no import errors.
+        """
+        dag = DAG(dag_id='test_has_import_error', start_date=DEFAULT_DATE)
+
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()

Review comment:
       I'd say do that in another PR. Plenty of other tests could benefit from it as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-957958057


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742143431



##########
File path: airflow/models/dag.py
##########
@@ -2743,7 +2743,9 @@ def __init__(self, concurrency=None, **kwargs):
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
         if self.max_active_runs is None:
-            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
+            self.max_active_runs = kwargs.get(
+                'max_active_runs', conf.getint('core', 'max_active_runs_per_dag', fallback=16)
+            )

Review comment:
       Wait, why do we need the fallback? It should get it from the default config anyways.

##########
File path: airflow/models/dag.py
##########
@@ -2743,7 +2743,9 @@ def __init__(self, concurrency=None, **kwargs):
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
         if self.max_active_runs is None:
-            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
+            self.max_active_runs = kwargs.get(
+                'max_active_runs', conf.getint('core', 'max_active_runs_per_dag', fallback=16)
+            )

Review comment:
       Gotcha. They'd have to remove it from the `config_templates/default_airflow.cfg` in their install, so I think that is unlikely to be the cause.

##########
File path: airflow/models/dag.py
##########
@@ -2743,7 +2743,9 @@ def __init__(self, concurrency=None, **kwargs):
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
         if self.max_active_runs is None:
-            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
+            self.max_active_runs = kwargs.get(
+                'max_active_runs', conf.getint('core', 'max_active_runs_per_dag', fallback=16)
+            )

Review comment:
       Wait, why do we need the fallback? It should get it from the default config anyways.

##########
File path: airflow/models/dag.py
##########
@@ -2743,7 +2743,9 @@ def __init__(self, concurrency=None, **kwargs):
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
         if self.max_active_runs is None:
-            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
+            self.max_active_runs = kwargs.get(
+                'max_active_runs', conf.getint('core', 'max_active_runs_per_dag', fallback=16)
+            )

Review comment:
       Gotcha. They'd have to remove it from the `config_templates/default_airflow.cfg` in their install, so I think that is unlikely to be the cause.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19367: Do not send DAGs with import errors to the scheduler

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742361388



##########
File path: airflow/models/dag.py
##########
@@ -2880,12 +2883,16 @@ def dags_needing_dagruns(cls, session: Session):
         # TODO[HA]: Bake this query, it is run _A lot_
         # We limit so that _one_ scheduler doesn't try to do all the creation
         # of dag runs
+        # exclude dags with import error
+        import_errors = [c.filename for c in session.query(importerror.filename).all()]

Review comment:
       Can you test this with zip file too please




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19367: Do not send DAGs with import errors to the scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742659192



##########
File path: airflow/models/dag.py
##########
@@ -2880,12 +2883,16 @@ def dags_needing_dagruns(cls, session: Session):
         # TODO[HA]: Bake this query, it is run _A lot_
         # We limit so that _one_ scheduler doesn't try to do all the creation
         # of dag runs
+        # exclude dags with import error
+        import_errors = [c.filename for c in session.query(importerror.filename).all()]
         query = (
             session.query(cls)
             .filter(
+                cls.max_active_runs != None,  # noqa
                 cls.is_paused == expression.false(),
                 cls.is_active == expression.true(),
                 cls.next_dagrun_create_after <= func.now(),
+                cls.fileloc.notin_(import_errors),

Review comment:
       We should find another way of doing this -- this function is part of the "hot" path of the scheduler, and a) adding an extra query and b) sending (possibly) a large number of params via this "NOT IN (?,?,?)" etc is not a great idea. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19367: Do not send DAGs with import errors to the scheduler

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742361388



##########
File path: airflow/models/dag.py
##########
@@ -2880,12 +2883,16 @@ def dags_needing_dagruns(cls, session: Session):
         # TODO[HA]: Bake this query, it is run _A lot_
         # We limit so that _one_ scheduler doesn't try to do all the creation
         # of dag runs
+        # exclude dags with import error
+        import_errors = [c.filename for c in session.query(importerror.filename).all()]

Review comment:
       Can you test this with zip file too please

##########
File path: airflow/models/dag.py
##########
@@ -2880,12 +2883,16 @@ def dags_needing_dagruns(cls, session: Session):
         # TODO[HA]: Bake this query, it is run _A lot_
         # We limit so that _one_ scheduler doesn't try to do all the creation
         # of dag runs
+        # exclude dags with import error
+        import_errors = [c.filename for c in session.query(importerror.filename).all()]

Review comment:
       Can you test this with zip file too please




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742150958



##########
File path: airflow/models/dag.py
##########
@@ -2743,7 +2743,9 @@ def __init__(self, concurrency=None, **kwargs):
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
         if self.max_active_runs is None:
-            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
+            self.max_active_runs = kwargs.get(
+                'max_active_runs', conf.getint('core', 'max_active_runs_per_dag', fallback=16)
+            )

Review comment:
       Gotcha. They'd have to remove it from the `config_templates/default_airflow.cfg` in their install, so I think that is unlikely to be the cause.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19367: Use `dag.max_active_runs` instead of `dag_model.max_active_runs`

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742143431



##########
File path: airflow/models/dag.py
##########
@@ -2743,7 +2743,9 @@ def __init__(self, concurrency=None, **kwargs):
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
         if self.max_active_runs is None:
-            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
+            self.max_active_runs = kwargs.get(
+                'max_active_runs', conf.getint('core', 'max_active_runs_per_dag', fallback=16)
+            )

Review comment:
       Wait, why do we need the fallback? It should get it from the default config anyways.

##########
File path: airflow/models/dag.py
##########
@@ -2743,7 +2743,9 @@ def __init__(self, concurrency=None, **kwargs):
             else:
                 self.max_active_tasks = conf.getint('core', 'max_active_tasks_per_dag')
         if self.max_active_runs is None:
-            self.max_active_runs = conf.getint('core', 'max_active_runs_per_dag')
+            self.max_active_runs = kwargs.get(
+                'max_active_runs', conf.getint('core', 'max_active_runs_per_dag', fallback=16)
+            )

Review comment:
       Gotcha. They'd have to remove it from the `config_templates/default_airflow.cfg` in their install, so I think that is unlikely to be the cause.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-961486413


   > This feels like only a partial fix.
   > 
   > If a DAG is currently working (and has max_active_runs) and _then_ the file is updated and now has an error, the DAG will still be scheduled, but it probably shouldn't be.
   > 
   > To do that without needing to join against ImportError perhaps we need to add a new column to DAG -- we have `is_active` which is close, but setting that to false hides it from the UI which isn't the behaviour we want here.
   
   I have updated it and it looks good. Thanks for the review!!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r747354277



##########
File path: docs/apache-airflow/migrations-ref.rst
##########
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``7b2661a43ba3`` (head)        | ``142555e44c17`` | ``2.2.0``       | Change ``TaskInstance`` and ``TaskReschedule`` tables from execution_date to run_id.  |
+| ``be2bfac3da23`` (head)        | ``7b2661a43ba3`` | ``2.2.2``       | Add has_import_errors column to DagModel                                              |

Review comment:
       ```suggestion
   | ``be2bfac3da23`` (head)        | ``7b2661a43ba3`` | ``2.2.3``       | Add has_import_errors column to DagModel                                              |
   ```

##########
File path: tests/models/test_dag.py
##########
@@ -1898,8 +1951,65 @@ def test_dags_needing_dagruns_only_unpaused(self):
         dag_models = DagModel.dags_needing_dagruns(session).all()
         assert dag_models == []
 
-        session.rollback()
-        session.close()
+    def test_dags_needing_dagruns_doesnot_send_dagmodel_with_null_max_active_runs(self):
+        """
+        We check that max_active_runs must not be null for dags
+        being set to scheduler to create dagruns
+        """
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()
+
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=DEFAULT_DATE,
+            next_dagrun_create_after=DEFAULT_DATE + timedelta(days=1),
+            is_active=True,
+        )
+        assert orm_dag.max_active_runs is not None
+        session.add(orm_dag)
+        session.flush()
+
+        needed = DagModel.dags_needing_dagruns(session).all()
+        assert needed == [orm_dag]
+        # set to None and ensure it's not sent to scheduler
+        orm_dag.max_active_runs = None
+        session.merge(orm_dag)
+        session.flush()
+        needed = DagModel.dags_needing_dagruns(session).all()
+        assert needed == []
+
+    def test_dags_needing_dagruns_doesnot_send_dagmodel_with_import_errors(self):
+        """
+        We check that has_import_error is false for dags
+        being set to scheduler to create dagruns
+        """
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()
+
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=DEFAULT_DATE,
+            next_dagrun_create_after=DEFAULT_DATE + timedelta(days=1),
+            is_active=True,
+        )
+        assert not orm_dag.has_import_errors
+        session.add(orm_dag)
+        session.flush()
+
+        needed = DagModel.dags_needing_dagruns(session).all()
+        assert needed == [orm_dag]
+        # set to None and ensure it's not sent to scheduler

Review comment:
       ```suggestion
   ```

##########
File path: tests/models/test_dag.py
##########
@@ -1898,8 +1951,65 @@ def test_dags_needing_dagruns_only_unpaused(self):
         dag_models = DagModel.dags_needing_dagruns(session).all()
         assert dag_models == []
 
-        session.rollback()
-        session.close()
+    def test_dags_needing_dagruns_doesnot_send_dagmodel_with_null_max_active_runs(self):
+        """
+        We check that max_active_runs must not be null for dags
+        being set to scheduler to create dagruns
+        """
+        dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()
+
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            has_task_concurrency_limits=False,
+            next_dagrun=DEFAULT_DATE,
+            next_dagrun_create_after=DEFAULT_DATE + timedelta(days=1),
+            is_active=True,
+        )
+        assert orm_dag.max_active_runs is not None
+        session.add(orm_dag)
+        session.flush()
+
+        needed = DagModel.dags_needing_dagruns(session).all()
+        assert needed == [orm_dag]
+        # set to None and ensure it's not sent to scheduler
+        orm_dag.max_active_runs = None
+        session.merge(orm_dag)
+        session.flush()
+        needed = DagModel.dags_needing_dagruns(session).all()
+        assert needed == []
+

Review comment:
       ```suggestion
   ```
   
   I don't think we need this behaviour _and_ the explicit flag, so since we have added the flag we should just use that.
   
   In which case we should also re-title this PR please.

##########
File path: tests/models/test_dag.py
##########
@@ -1848,9 +1888,6 @@ def test_dags_needing_dagruns_not_too_early(self):
         dag_models = DagModel.dags_needing_dagruns(session).all()
         assert dag_models == []
 
-        session.rollback()
-        session.close()

Review comment:
       But with these rollbacks it _shouldn't_ need a clean method.

##########
File path: tests/models/test_dag.py
##########
@@ -1863,13 +1900,29 @@ def test_max_active_runs_not_none(self):
             next_dagrun_create_after=None,
             is_active=True,
         )
+        # assert max_active_runs updated
+        assert orm_dag.max_active_runs == 16
         session.add(orm_dag)
         session.flush()
-
         assert orm_dag.max_active_runs is not None
 
-        session.rollback()
-        session.close()
+    def test_dagmodel_has_import_error_is_false(self):
+        dag = DAG(dag_id='test_dag', start_date=timezone.datetime(2020, 1, 1))
+        DummyOperator(task_id='dummy', dag=dag, owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(
+            dag_id=dag.dag_id,
+            next_dagrun=None,
+            next_dagrun_create_after=None,
+            is_active=True,
+        )
+        # assert has_import_error is false
+        assert not orm_dag.has_import_errors
+        session.add(orm_dag)
+        session.flush()
+
+        assert not orm_dag.has_import_errors

Review comment:
       This test isn't really adding much -- it's testing `self.has_import_errors = False` in the constructor.
   
   It certainly doesn't need a whole separate test case for this, it can be added as an assert somewhere else if we want it. You already test basically everything here in `test_dags_needing_dagruns_doesnot_send_dagmodel_with_import_errors`, so this whole test function isn't needed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Do not send DAGs with import errors to the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r742678255



##########
File path: airflow/models/dag.py
##########
@@ -2880,12 +2883,16 @@ def dags_needing_dagruns(cls, session: Session):
         # TODO[HA]: Bake this query, it is run _A lot_
         # We limit so that _one_ scheduler doesn't try to do all the creation
         # of dag runs
+        # exclude dags with import error
+        import_errors = [c.filename for c in session.query(importerror.filename).all()]
         query = (
             session.query(cls)
             .filter(
+                cls.max_active_runs != None,  # noqa

Review comment:
       Actually the cause of the issue was that an active dag had errors after upgrade, normally it should have max_active_runs but it didn’t have it because it failed to parse. 
   
   I’ll update it to only include this check and remove import errors 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-960648528


   This feels like only a partial fix.
   
   If a DAG is currently working (and has max_active_runs) and _then_ the file is updated and now has an error, the DAG will still be scheduled, but it probably shouldn't be.
   
   To do that without needing to join against ImportError perhaps we need to add a new column to DAG -- we have `is_active` which is close, but setting that to false hides it from the UI which isn't the behaviour we want here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r743252849



##########
File path: tests/models/test_dag.py
##########
@@ -1848,9 +1888,6 @@ def test_dags_needing_dagruns_not_too_early(self):
         dag_models = DagModel.dags_needing_dagruns(session).all()
         assert dag_models == []
 
-        session.rollback()
-        session.close()

Review comment:
       Removed this because of the added clean_db method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on a change in pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r747353957



##########
File path: airflow/models/dag.py
##########
@@ -2880,11 +2884,14 @@ def dags_needing_dagruns(cls, session: Session):
         # TODO[HA]: Bake this query, it is run _A lot_
         # We limit so that _one_ scheduler doesn't try to do all the creation
         # of dag runs
+
         query = (
             session.query(cls)
             .filter(
+                cls.max_active_runs != None,  # noqa

Review comment:
       ```suggestion
   ```
   
   We don't need this condition now I think, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
ephraimbuddy commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r747392653



##########
File path: tests/models/test_dag.py
##########
@@ -1848,9 +1888,6 @@ def test_dags_needing_dagruns_not_too_early(self):
         dag_models = DagModel.dags_needing_dagruns(session).all()
         assert dag_models == []
 
-        session.rollback()
-        session.close()

Review comment:
       Yes. I was seeing that it was repeated, happy to return it back if it's preferred




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] ashb commented on pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
ashb commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-966375995


   > > Happy to discuss about if we still need the max_active_runs condition or not too.
   > 
   > I agree we no longer need it.
   > 
   > I restricted the creation of dagruns on the UI and API too. Let me know if I should remove these restrictions
   
   If you want to reduce duplication, but keep the rollback you can go from this:
   
   ```python
        def test_dags_needing_dagruns_not_too_early(self):
            ...
           session = settings.Session()
   ```
   to this:
   
   ```python
        def test_dags_needing_dagruns_not_too_early(self, session):
   ```
   
   And remove the `session = settings.Session()`. That session pytest fixture defined in `tests/conftest.py` already does the rollback.
   
   But that should probably be a separate change to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] kaxil commented on a change in pull request #19367: Do not send DAGs with null DagModel.max_active_runs to the scheduler

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r743972540



##########
File path: airflow/dag_processing/processor.py
##########
@@ -533,6 +533,10 @@ def update_import_errors(session: Session, dagbag: DagBag) -> None:
 
         # Add the errors of the processed files
         for filename, stacktrace in dagbag.import_errors.items():
+            dag_model = session.query(DagModel).filter(DagModel.fileloc == filename).first()

Review comment:
       ```suggestion
               session.query(DagModel).filter(DagModel.fileloc == filename).update({"has_import_errors": True})
   ```
   
   might be better _I think_




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r754308324



##########
File path: docs/apache-airflow/migrations-ref.rst
##########
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``7b2661a43ba3`` (head)        | ``142555e44c17`` | ``2.2.0``       | Change ``TaskInstance`` and ``TaskReschedule`` tables from execution_date to run_id.  |
+| ``be2bfac3da23`` (head)        | ``7b2661a43ba3`` | ``2.2.3``       | Add has_import_errors column to DagModel                                              |

Review comment:
       I am for adding it to 2.2.3. There is hardly a use of DagRuns for DAGs with import errors, especially that you could not see whether there were errors via API. Even if it requires data model change, that's fine as patch-level. This is OK for a bugfix to require new field in the DB. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#discussion_r754426660



##########
File path: docs/apache-airflow/migrations-ref.rst
##########
@@ -23,7 +23,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
 | Revision ID                    | Revises ID       | Airflow Version | Description                                                                           |
 +--------------------------------+------------------+-----------------+---------------------------------------------------------------------------------------+
-| ``7b2661a43ba3`` (head)        | ``142555e44c17`` | ``2.2.0``       | Change ``TaskInstance`` and ``TaskReschedule`` tables from execution_date to run_id.  |
+| ``be2bfac3da23`` (head)        | ``7b2661a43ba3`` | ``2.2.3``       | Add has_import_errors column to DagModel                                              |

Review comment:
       Since this issue will crash the scheduler, I lean toward `2.2.3`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] uranusjr commented on pull request #19367: Do not create dagruns for DAGs with import errors

Posted by GitBox <gi...@apache.org>.
uranusjr commented on pull request #19367:
URL: https://github.com/apache/airflow/pull/19367#issuecomment-976368619


   Do we really need a new field? I think we can instead join `ImportError` on `import_error.filename = dag.fileloc` and error if the result is not empty.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org