You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/02/29 20:59:51 UTC

[GitHub] [airflow] mik-laj opened a new pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main loop of the scheduler

mik-laj opened a new pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main loop of the scheduler
URL: https://github.com/apache/airflow/pull/7597
 
 
   ---
   Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   <sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386425807
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -1616,26 +1633,6 @@ def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag) -> bool
         self._process_executor_events(simple_dag_bag)
         return True
 
-    def _process_and_execute_tasks(self, simple_dag_bag):
 
 Review comment:
   I will revert this change and propose it in a separate PR.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r387810889
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -752,37 +758,35 @@ def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> Li
         return dags
 
     @provide_session
-    def kill_zombies(self, dagbag, zombies, session=None):
+    def execute_on_failure_callbacks(self, dagbag, failure_callback_requests, session=None):
         """
-        Fail given zombie tasks, which are tasks that haven't
-        had a heartbeat for too long, in the current DagBag.
+        Execute on failure callbacks. These objects can come from SchedulerJob or from
+        DagFileProcessorManager.
 
-        :param zombies: zombie task instances to kill.
-        :type zombies: List[airflow.models.taskinstance.SimpleTaskInstance]
+        :param failure_callback_requests: failure callbacks to execute
+        :type failure_callback_requests: List[airflow.utils.dag_processing.FailureCallbackRequest]
         :param session: DB session.
         """
         TI = models.TaskInstance
 
-        for zombie in zombies:
-            if zombie.dag_id in dagbag.dags:
-                dag = dagbag.dags[zombie.dag_id]
-                if zombie.task_id in dag.task_ids:
-                    task = dag.get_task(zombie.task_id)
-                    ti = TI(task, zombie.execution_date)
+        for request in failure_callback_requests:
+            if request.simple_task_instance.dag_id in dagbag.dags:
+                dag = dagbag.dags[request.simple_task_instance.dag_id]
+                if request.simple_task_instance.task_id in dag.task_ids:
+                    task = dag.get_task(request.simple_task_instance.task_id)
+                    ti = TI(task, request.simple_task_instance.execution_date)
                     # Get properties needed for failure handling from SimpleTaskInstance.
-                    ti.start_date = zombie.start_date
-                    ti.end_date = zombie.end_date
-                    ti.try_number = zombie.try_number
-                    ti.state = zombie.state
+                    ti.start_date = request.simple_task_instance.start_date
+                    ti.end_date = request.simple_task_instance.end_date
+                    ti.try_number = request.simple_task_instance.try_number
+                    ti.state = request.simple_task_instance.state
                     ti.test_mode = self.UNIT_TEST_MODE
-                    ti.handle_failure("{} detected as zombie".format(ti),
-                                      ti.test_mode, ti.get_template_context())
-                    self.log.info('Marked zombie job %s as %s', ti, ti.state)
-                    Stats.incr('zombies_killed')
 
 Review comment:
   Needs mention in API and we need to remove `zombies_killed` from the metrics list

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj edited a comment on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj edited a comment on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593430117
 
 
   It is worth noting that this also solves one more problem. The modules are always reloaded.
   https://issues.apache.org/jira/projects/AIRFLOW/issues/AIRFLOW-6497?filter=allopenissues
   so when someone makes a change in the additional module it is correctly executed. Its old version is not executed. This can be a problem because the handler is often stored in helper module and is shared among many DAGs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-595933565
 
 
   Travis is green.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] tooptoop4 commented on a change in pull request #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
tooptoop4 commented on a change in pull request #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386057085
 
 

 ##########
 File path: tests/dags/test_on_failure_callback.py
 ##########
 @@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+
+DEFAULT_DATE = datetime(2016, 1, 1)
+
+args = {
+    'owner': 'airflow',
+    'start_date': DEFAULT_DATE,
+}
+
+dag = DAG(dag_id='test_om_failure_callback_dag', default_args=args)
+
+
+def write_data_to_callback(*arg, **kwargs):  # pylint: disable=unused-argument
+    with open(os.environ.get('AIRFLOW_CALLBACK_FILE'), "w+") as f:
+        f.write("Callbacck fired")
 
 Review comment:
   cck typo

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r387844631
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -752,37 +758,35 @@ def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> Li
         return dags
 
     @provide_session
-    def kill_zombies(self, dagbag, zombies, session=None):
+    def execute_on_failure_callbacks(self, dagbag, failure_callback_requests, session=None):
         """
-        Fail given zombie tasks, which are tasks that haven't
-        had a heartbeat for too long, in the current DagBag.
+        Execute on failure callbacks. These objects can come from SchedulerJob or from
+        DagFileProcessorManager.
 
-        :param zombies: zombie task instances to kill.
-        :type zombies: List[airflow.models.taskinstance.SimpleTaskInstance]
+        :param failure_callback_requests: failure callbacks to execute
+        :type failure_callback_requests: List[airflow.utils.dag_processing.FailureCallbackRequest]
         :param session: DB session.
         """
         TI = models.TaskInstance
 
-        for zombie in zombies:
-            if zombie.dag_id in dagbag.dags:
-                dag = dagbag.dags[zombie.dag_id]
-                if zombie.task_id in dag.task_ids:
-                    task = dag.get_task(zombie.task_id)
-                    ti = TI(task, zombie.execution_date)
+        for request in failure_callback_requests:
+            if request.simple_task_instance.dag_id in dagbag.dags:
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] tooptoop4 commented on a change in pull request #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
tooptoop4 commented on a change in pull request #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386057105
 
 

 ##########
 File path: tests/jobs/test_scheduler_job.py
 ##########
 @@ -1015,14 +1012,52 @@ def test_kill_zombies(self, mock_ti_handle_failure):
             session.add(ti)
             session.commit()
 
-            zombies = [SimpleTaskInstance(ti)]
-            dag_file_processor.kill_zombies(dagbag, zombies)
+            requests = [
+                FailureCallbackRequest(
+                    full_filepath="A",
+                    simple_task_instance=SimpleTaskInstance(ti),
+                    msg="Message"
+                )
+            ]
+            dag_file_processor.execute_on_failure_callbacks(dagbag, requests)
             mock_ti_handle_failure.assert_called_once_with(
-                mock.ANY,
+                "Message",
                 conf.getboolean('core', 'unit_test_mode'),
                 mock.ANY
             )
 
+    def test_process_file_should_failure_callback(self):
+        dag_file = os.path.join(
+            os.path.dirname(os.path.realpath(__file__)), '../dags/test_on_failure_callback.py'
+        )
+        dagbag = DagBag(dag_folder=dag_file, include_examples=False)
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        with create_session() as session, NamedTemporaryFile(delete=False) as callback_file:
+            session.query(TaskInstance).delete()
+            dag = dagbag.get_dag('test_om_failure_callback_dag')
+            task = dag.get_task(task_id='test_om_failure_callback_task')
+
+            ti = TaskInstance(task, DEFAULT_DATE, State.RUNNING)
+
+            session.add(ti)
+            session.commit()
+
+            requests = [
+                FailureCallbackRequest(
+                    full_filepath=dag.full_filepath,
+                    simple_task_instance=SimpleTaskInstance(ti),
+                    msg="Message"
+                )
+            ]
+            callback_file.close()
+
+            with mock.patch.dict("os.environ", {"AIRFLOW_CALLBACK_FILE": callback_file.name}):
+                dag_file_processor.process_file(dag_file, requests)
+            with open(callback_file.name) as callback_file2:
+                content = callback_file2.read()
+            self.assertEqual("Callbacck fired", content)
 
 Review comment:
   cck typo

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io commented on issue #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593006305
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=h1) Report
   > Merging [#7597](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/063a35f58823d53eccc723b44d02c356e81ea6f9?src=pr&el=desc) will **decrease** coverage by `0.23%`.
   > The diff coverage is `83.07%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7597/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7597      +/-   ##
   ==========================================
   - Coverage   86.76%   86.52%   -0.24%     
   ==========================================
     Files         896      896              
     Lines       42653    42701      +48     
   ==========================================
   - Hits        37006    36946      -60     
   - Misses       5647     5755     +108
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `85.5% <73.52%> (-1.16%)` | :arrow_down: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.61% <93.54%> (+0.42%)` | :arrow_up: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.69% <0%> (-25.26%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | [airflow/providers/google/cloud/operators/dlp.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9kbHAucHk=) | `96.25% <0%> (-3.75%)` | :arrow_down: |
   | [airflow/providers/amazon/aws/hooks/s3.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9ob29rcy9zMy5weQ==) | `96.33% <0%> (-0.28%)` | :arrow_down: |
   | [airflow/providers/google/cloud/hooks/dlp.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2hvb2tzL2RscC5weQ==) | `98.55% <0%> (-0.15%)` | :arrow_down: |
   | ... and [6 more](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=footer). Last update [063a35f...733de5c](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593006305
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=h1) Report
   > Merging [#7597](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/063a35f58823d53eccc723b44d02c356e81ea6f9?src=pr&el=desc) will **decrease** coverage by `0.17%`.
   > The diff coverage is `83.07%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7597/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7597      +/-   ##
   ==========================================
   - Coverage   86.76%   86.58%   -0.18%     
   ==========================================
     Files         896      896              
     Lines       42653    43132     +479     
   ==========================================
   + Hits        37006    37345     +339     
   - Misses       5647     5787     +140
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `85.5% <73.52%> (-1.16%)` | :arrow_down: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.61% <93.54%> (+0.42%)` | :arrow_up: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | [airflow/utils/process\_utils.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9wcm9jZXNzX3V0aWxzLnB5) | `72.36% <0%> (-27.64%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.69% <0%> (-25.26%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | [airflow/providers/google/cloud/operators/dlp.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9kbHAucHk=) | `96.25% <0%> (-3.75%)` | :arrow_down: |
   | [airflow/providers/amazon/aws/hooks/s3.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9ob29rcy9zMy5weQ==) | `96.33% <0%> (-0.28%)` | :arrow_down: |
   | ... and [10 more](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=footer). Last update [063a35f...cb455dc](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386065568
 
 

 ##########
 File path: tests/jobs/test_scheduler_job.py
 ##########
 @@ -1015,14 +1012,52 @@ def test_kill_zombies(self, mock_ti_handle_failure):
             session.add(ti)
             session.commit()
 
-            zombies = [SimpleTaskInstance(ti)]
-            dag_file_processor.kill_zombies(dagbag, zombies)
+            requests = [
+                FailureCallbackRequest(
+                    full_filepath="A",
+                    simple_task_instance=SimpleTaskInstance(ti),
+                    msg="Message"
+                )
+            ]
+            dag_file_processor.execute_on_failure_callbacks(dagbag, requests)
             mock_ti_handle_failure.assert_called_once_with(
-                mock.ANY,
+                "Message",
                 conf.getboolean('core', 'unit_test_mode'),
                 mock.ANY
             )
 
+    def test_process_file_should_failure_callback(self):
+        dag_file = os.path.join(
+            os.path.dirname(os.path.realpath(__file__)), '../dags/test_on_failure_callback.py'
+        )
+        dagbag = DagBag(dag_folder=dag_file, include_examples=False)
+        dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+        with create_session() as session, NamedTemporaryFile(delete=False) as callback_file:
+            session.query(TaskInstance).delete()
+            dag = dagbag.get_dag('test_om_failure_callback_dag')
+            task = dag.get_task(task_id='test_om_failure_callback_task')
+
+            ti = TaskInstance(task, DEFAULT_DATE, State.RUNNING)
+
+            session.add(ti)
+            session.commit()
+
+            requests = [
+                FailureCallbackRequest(
+                    full_filepath=dag.full_filepath,
+                    simple_task_instance=SimpleTaskInstance(ti),
+                    msg="Message"
+                )
+            ]
+            callback_file.close()
+
+            with mock.patch.dict("os.environ", {"AIRFLOW_CALLBACK_FILE": callback_file.name}):
+                dag_file_processor.process_file(dag_file, requests)
+            with open(callback_file.name) as callback_file2:
+                content = callback_file2.read()
+            self.assertEqual("Callbacck fired", content)
 
 Review comment:
   Done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386339683
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -1616,26 +1633,6 @@ def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag) -> bool
         self._process_executor_events(simple_dag_bag)
         return True
 
-    def _process_and_execute_tasks(self, simple_dag_bag):
 
 Review comment:
   Moving code around like this makes PRs harder to review -- as you are mixing two different things here and making more work for reviewers -- and this change is unrelated to "not loading DAGs in main scheduler loop".

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r387810889
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -752,37 +758,35 @@ def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> Li
         return dags
 
     @provide_session
-    def kill_zombies(self, dagbag, zombies, session=None):
+    def execute_on_failure_callbacks(self, dagbag, failure_callback_requests, session=None):
         """
-        Fail given zombie tasks, which are tasks that haven't
-        had a heartbeat for too long, in the current DagBag.
+        Execute on failure callbacks. These objects can come from SchedulerJob or from
+        DagFileProcessorManager.
 
-        :param zombies: zombie task instances to kill.
-        :type zombies: List[airflow.models.taskinstance.SimpleTaskInstance]
+        :param failure_callback_requests: failure callbacks to execute
+        :type failure_callback_requests: List[airflow.utils.dag_processing.FailureCallbackRequest]
         :param session: DB session.
         """
         TI = models.TaskInstance
 
-        for zombie in zombies:
-            if zombie.dag_id in dagbag.dags:
-                dag = dagbag.dags[zombie.dag_id]
-                if zombie.task_id in dag.task_ids:
-                    task = dag.get_task(zombie.task_id)
-                    ti = TI(task, zombie.execution_date)
+        for request in failure_callback_requests:
+            if request.simple_task_instance.dag_id in dagbag.dags:
+                dag = dagbag.dags[request.simple_task_instance.dag_id]
+                if request.simple_task_instance.task_id in dag.task_ids:
+                    task = dag.get_task(request.simple_task_instance.task_id)
+                    ti = TI(task, request.simple_task_instance.execution_date)
                     # Get properties needed for failure handling from SimpleTaskInstance.
-                    ti.start_date = zombie.start_date
-                    ti.end_date = zombie.end_date
-                    ti.try_number = zombie.try_number
-                    ti.state = zombie.state
+                    ti.start_date = request.simple_task_instance.start_date
+                    ti.end_date = request.simple_task_instance.end_date
+                    ti.try_number = request.simple_task_instance.try_number
+                    ti.state = request.simple_task_instance.state
                     ti.test_mode = self.UNIT_TEST_MODE
-                    ti.handle_failure("{} detected as zombie".format(ti),
-                                      ti.test_mode, ti.get_template_context())
-                    self.log.info('Marked zombie job %s as %s', ti, ti.state)
-                    Stats.incr('zombies_killed')
 
 Review comment:
   Needs mention in API and we need to remove `zombies_killed` from the metrics list

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
ashb commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593356649
 
 
   > This change should speed up the main scheduler loop
   
   Do you have any numbers for this please?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593431538
 
 
   This also protects against killing the scheduler by incorrect error handling functions.  If sys.exit(1) appears in the handler code, the scheduler will not be stopped.  It will only affect one DAG.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj edited a comment on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj edited a comment on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593427244
 
 
   > Do you have any numbers for this please?
   
   It is very difficult to measure because it depends on the specific DAG File. Some DAG files take up to 30 seconds or more to load. During this time, the scheduler loop is stopped and does not start any new tasks.  I can measure how long it takes to load example_dags, but it's not just a subset of cases. It doesn't provide real values,... but I created a spreadsheet:
   When I ran the following script:
   ```python
   import os
   import sys
   import time
   from contextlib import contextmanager
   
   import psutil
   
   from airflow.models import DagBag
   
   
   @contextmanager
   def timing_ctx():
       time1 = time.time()
       try:
           yield
       finally:
           time2 = time.time()
           diff = (time2 - time1) * 1000.0
           print('Time: %0.3f ms' % diff)
   
   
   def get_process_memory():
       process = psutil.Process(os.getpid())
       return process.memory_info().rss
   
   
   @contextmanager
   def memory_ctx():
       before = get_process_memory()
       try:
           yield
       finally:
           after = get_process_memory()
           diff = after - before
           print('Memory: %d bytes' % diff)
   
   
   filename = sys.argv[1]
   
   with timing_ctx(), memory_ctx():
       print("Filename:", filename)
       DagBag(dag_folder=filename, include_examples=False, store_serialized_dags=False)
   ```
   ```
   find  airflow/providers/google/cloud/example_dags/ -type f | sort| grep -v "__init__.py" | grep -v "__init__.py" | xargs -n 1 readlink -e  | xargs -t -n 1 python /files/performance/load_dag_perf_test.py
   ```
   I got following values:
   https://docs.google.com/spreadsheets/d/1T0kLEQLSU5ujxU-W_PoxddbkEgWx70EQkpwjRLNWaic/edit?usp=sharing
   In this case, IPC communication should be very fast. I suspect it should take less than 3% of all DAG loading time. So it can be assumed that the main loop is faster by the time of loading the module.
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593427244
 
 
   > Do you have any numbers for this please?
   
   It is very difficult to measure because it depends on the specific DAG File. Some DAG files take up to 30 seconds or more to load. During this time, the scheduler loop is stopped and does not start any new tasks.  I can measure how long it takes to load example_dags, but it's not just a subset of cases. It doesn't provide real values,... but I created a spreadsheet:
   When I ran the following script:
   ```python
   import os
   import sys
   import time
   from contextlib import contextmanager
   
   import psutil
   
   from airflow.models import DagBag
   
   
   @contextmanager
   def timing_ctx():
       time1 = time.time()
       try:
           yield
       finally:
           time2 = time.time()
           diff = (time2 - time1) * 1000.0
           print('Time: %0.3f ms' % diff)
   
   
   def get_process_memory():
       process = psutil.Process(os.getpid())
       return process.memory_info().rss
   
   
   @contextmanager
   def memory_ctx():
       before = get_process_memory()
       try:
           yield
       finally:
           after = get_process_memory()
           diff = after - before
           print('Memory: %d bytes' % diff)
   
   
   filename = sys.argv[1]
   
   with timing_ctx(), memory_ctx():
       print("Filename:", filename)
       DagBag(dag_folder=filename, include_examples=False, store_serialized_dags=False)
   ```
   ```
   find  airflow/providers/google/cloud/example_dags/ -type f | sort| grep -v "__init__.py" | grep -v "__init__.py" | xargs -n 1 readlink -e  | xargs -t -n 1 python /files/performance/load_dag_perf_test.py
   ```
   I got following values:
   https://docs.google.com/spreadsheets/d/1T0kLEQLSU5ujxU-W_PoxddbkEgWx70EQkpwjRLNWaic/edit?usp=sharing
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj edited a comment on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj edited a comment on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593431538
 
 
   This also protects against killing the scheduler by incorrect error handling functions.  If sys.exit(1) appears in the handler code, the scheduler will not be stopped.  It will only affect one DAG file.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386390242
 
 

 ##########
 File path: airflow/utils/dag_processing.py
 ##########
 @@ -655,6 +693,7 @@ def start(self):
             # Update number of loop iteration.
             self._num_run += 1
 
+            simple_dags = self.collect_results()
 
 Review comment:
   Yes. First we need to create processes, and then we can read the results. Otherwise, we will never get the value on the first iteration of the loop. Other solution: we can increase loop iteration nunber: https://github.com/apache/airflow/blob/cb455dc81162680f90edcd78400e1ef46c09766d/tests/utils/test_dag_processing.py#L291

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] codecov-io edited a comment on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593006305
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=h1) Report
   > Merging [#7597](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/063a35f58823d53eccc723b44d02c356e81ea6f9?src=pr&el=desc) will **decrease** coverage by `0.2%`.
   > The diff coverage is `82.45%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7597/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #7597      +/-   ##
   ==========================================
   - Coverage   86.76%   86.55%   -0.21%     
   ==========================================
     Files         896      897       +1     
     Lines       42653    42763     +110     
   ==========================================
   + Hits        37006    37014       +8     
   - Misses       5647     5749     +102
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.26% <75%> (+1.61%)` | :arrow_up: |
   | [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.64% <92%> (+0.45%)` | :arrow_up: |
   | [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
   | [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
   | [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
   | [airflow/utils/process\_utils.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9wcm9jZXNzX3V0aWxzLnB5) | `73.25% <0%> (-26.75%)` | :arrow_down: |
   | [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.69% <0%> (-25.26%)` | :arrow_down: |
   | [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
   | [airflow/providers/google/cloud/operators/dlp.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9kbHAucHk=) | `96.25% <0%> (-3.75%)` | :arrow_down: |
   | [airflow/config\_templates/airflow\_local\_settings.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb25maWdfdGVtcGxhdGVzL2FpcmZsb3dfbG9jYWxfc2V0dGluZ3MucHk=) | `64.28% <0%> (-1.76%)` | :arrow_down: |
   | ... and [17 more](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=footer). Last update [063a35f...fbb647c](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] kaxil commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r387811869
 
 

 ##########
 File path: airflow/jobs/scheduler_job.py
 ##########
 @@ -752,37 +758,35 @@ def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> Li
         return dags
 
     @provide_session
-    def kill_zombies(self, dagbag, zombies, session=None):
+    def execute_on_failure_callbacks(self, dagbag, failure_callback_requests, session=None):
         """
-        Fail given zombie tasks, which are tasks that haven't
-        had a heartbeat for too long, in the current DagBag.
+        Execute on failure callbacks. These objects can come from SchedulerJob or from
+        DagFileProcessorManager.
 
-        :param zombies: zombie task instances to kill.
-        :type zombies: List[airflow.models.taskinstance.SimpleTaskInstance]
+        :param failure_callback_requests: failure callbacks to execute
+        :type failure_callback_requests: List[airflow.utils.dag_processing.FailureCallbackRequest]
         :param session: DB session.
         """
         TI = models.TaskInstance
 
-        for zombie in zombies:
-            if zombie.dag_id in dagbag.dags:
-                dag = dagbag.dags[zombie.dag_id]
-                if zombie.task_id in dag.task_ids:
-                    task = dag.get_task(zombie.task_id)
-                    ti = TI(task, zombie.execution_date)
+        for request in failure_callback_requests:
+            if request.simple_task_instance.dag_id in dagbag.dags:
 
 Review comment:
   We can create a local var `simple_ti` to avoid repeating `request.simple_task_instance` in this loop

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r387294958
 
 

 ##########
 File path: airflow/utils/dag_processing.py
 ##########
 @@ -655,6 +693,7 @@ def start(self):
             # Update number of loop iteration.
             self._num_run += 1
 
+            simple_dags = self.collect_results()
 
 Review comment:
   Cool, sounds good as you have it

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj merged pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj merged pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] ashb commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386340502
 
 

 ##########
 File path: airflow/utils/dag_processing.py
 ##########
 @@ -655,6 +693,7 @@ def start(self):
             # Update number of loop iteration.
             self._num_run += 1
 
+            simple_dags = self.collect_results()
 
 Review comment:
   Was there a reason you moved the position of this call?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593430117
 
 
   It is worth noting that this also solves one more problem. The modules are always reloaded.
   https://issues.apache.org/jira/projects/AIRFLOW/issues/AIRFLOW-6497?filter=allopenissues
   so when someone makes a change in the additional module it is correctly executed. Its old version is not executed. This can be a problem because the handler is often stored in helper functions and is shared among many DAGs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [airflow] mik-laj commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386390242
 
 

 ##########
 File path: airflow/utils/dag_processing.py
 ##########
 @@ -655,6 +693,7 @@ def start(self):
             # Update number of loop iteration.
             self._num_run += 1
 
+            simple_dags = self.collect_results()
 
 Review comment:
   Yes. First we need to create processes, and then we can read the results. Otherwise, we will never get the value on the first iteration of the loop. Other solution, we can increase loop iteration nunber: https://github.com/apache/airflow/blob/cb455dc81162680f90edcd78400e1ef46c09766d/tests/utils/test_dag_processing.py#L291

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services