You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/02/29 20:59:51 UTC
[GitHub] [airflow] mik-laj opened a new pull request #7597: [AIRFLOW-6497]
Avoid loading DAGs in the main loop of the scheduler
mik-laj opened a new pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main loop of the scheduler
URL: https://github.com/apache/airflow/pull/7597
---
Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg)
Make sure to mark the boxes below before creating PR: [x]
- [X] Description above provides context of the change
- [X] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN = JIRA ID<sup>*</sup>
- [X] Unit tests coverage for changes (not needed for documentation changes)
- [X] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
- [X] Relevant documentation is updated including usage instructions.
- [X] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
<sup>*</sup> For document-only changes commit message can start with `[AIRFLOW-XXXX]`.
---
In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7597:
[AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386425807
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1616,26 +1633,6 @@ def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag) -> bool
self._process_executor_events(simple_dag_bag)
return True
- def _process_and_execute_tasks(self, simple_dag_bag):
Review comment:
I will revert this change and propose it in a separate PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7597:
[AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r387810889
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -752,37 +758,35 @@ def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> Li
return dags
@provide_session
- def kill_zombies(self, dagbag, zombies, session=None):
+ def execute_on_failure_callbacks(self, dagbag, failure_callback_requests, session=None):
"""
- Fail given zombie tasks, which are tasks that haven't
- had a heartbeat for too long, in the current DagBag.
+ Execute on failure callbacks. These objects can come from SchedulerJob or from
+ DagFileProcessorManager.
- :param zombies: zombie task instances to kill.
- :type zombies: List[airflow.models.taskinstance.SimpleTaskInstance]
+ :param failure_callback_requests: failure callbacks to execute
+ :type failure_callback_requests: List[airflow.utils.dag_processing.FailureCallbackRequest]
:param session: DB session.
"""
TI = models.TaskInstance
- for zombie in zombies:
- if zombie.dag_id in dagbag.dags:
- dag = dagbag.dags[zombie.dag_id]
- if zombie.task_id in dag.task_ids:
- task = dag.get_task(zombie.task_id)
- ti = TI(task, zombie.execution_date)
+ for request in failure_callback_requests:
+ if request.simple_task_instance.dag_id in dagbag.dags:
+ dag = dagbag.dags[request.simple_task_instance.dag_id]
+ if request.simple_task_instance.task_id in dag.task_ids:
+ task = dag.get_task(request.simple_task_instance.task_id)
+ ti = TI(task, request.simple_task_instance.execution_date)
# Get properties needed for failure handling from SimpleTaskInstance.
- ti.start_date = zombie.start_date
- ti.end_date = zombie.end_date
- ti.try_number = zombie.try_number
- ti.state = zombie.state
+ ti.start_date = request.simple_task_instance.start_date
+ ti.end_date = request.simple_task_instance.end_date
+ ti.try_number = request.simple_task_instance.try_number
+ ti.state = request.simple_task_instance.state
ti.test_mode = self.UNIT_TEST_MODE
- ti.handle_failure("{} detected as zombie".format(ti),
- ti.test_mode, ti.get_template_context())
- self.log.info('Marked zombie job %s as %s', ti, ti.state)
- Stats.incr('zombies_killed')
Review comment:
Needs mention in API and we need to remove `zombies_killed` from the metrics list
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj edited a comment on issue #7597: [AIRFLOW-6497]
Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj edited a comment on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593430117
It is worth noting that this also solves one more problem. The modules are always reloaded.
https://issues.apache.org/jira/projects/AIRFLOW/issues/AIRFLOW-6497?filter=allopenissues
so when someone makes a change in the additional module it is correctly executed. Its old version is not executed. This can be a problem because the handler is often stored in helper module and is shared among many DAGs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid
loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-595933565
Travis is green.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] tooptoop4 commented on a change in pull request #7597:
[AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
tooptoop4 commented on a change in pull request #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386057085
##########
File path: tests/dags/test_on_failure_callback.py
##########
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from datetime import datetime
+
+from airflow import DAG
+from airflow.operators.dummy_operator import DummyOperator
+
+DEFAULT_DATE = datetime(2016, 1, 1)
+
+args = {
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE,
+}
+
+dag = DAG(dag_id='test_om_failure_callback_dag', default_args=args)
+
+
+def write_data_to_callback(*arg, **kwargs): # pylint: disable=unused-argument
+ with open(os.environ.get('AIRFLOW_CALLBACK_FILE'), "w+") as f:
+ f.write("Callbacck fired")
Review comment:
cck typo
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7597:
[AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r387844631
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -752,37 +758,35 @@ def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> Li
return dags
@provide_session
- def kill_zombies(self, dagbag, zombies, session=None):
+ def execute_on_failure_callbacks(self, dagbag, failure_callback_requests, session=None):
"""
- Fail given zombie tasks, which are tasks that haven't
- had a heartbeat for too long, in the current DagBag.
+ Execute on failure callbacks. These objects can come from SchedulerJob or from
+ DagFileProcessorManager.
- :param zombies: zombie task instances to kill.
- :type zombies: List[airflow.models.taskinstance.SimpleTaskInstance]
+ :param failure_callback_requests: failure callbacks to execute
+ :type failure_callback_requests: List[airflow.utils.dag_processing.FailureCallbackRequest]
:param session: DB session.
"""
TI = models.TaskInstance
- for zombie in zombies:
- if zombie.dag_id in dagbag.dags:
- dag = dagbag.dags[zombie.dag_id]
- if zombie.task_id in dag.task_ids:
- task = dag.get_task(zombie.task_id)
- ti = TI(task, zombie.execution_date)
+ for request in failure_callback_requests:
+ if request.simple_task_instance.dag_id in dagbag.dags:
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] tooptoop4 commented on a change in pull request #7597:
[AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
tooptoop4 commented on a change in pull request #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386057105
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1015,14 +1012,52 @@ def test_kill_zombies(self, mock_ti_handle_failure):
session.add(ti)
session.commit()
- zombies = [SimpleTaskInstance(ti)]
- dag_file_processor.kill_zombies(dagbag, zombies)
+ requests = [
+ FailureCallbackRequest(
+ full_filepath="A",
+ simple_task_instance=SimpleTaskInstance(ti),
+ msg="Message"
+ )
+ ]
+ dag_file_processor.execute_on_failure_callbacks(dagbag, requests)
mock_ti_handle_failure.assert_called_once_with(
- mock.ANY,
+ "Message",
conf.getboolean('core', 'unit_test_mode'),
mock.ANY
)
+ def test_process_file_should_failure_callback(self):
+ dag_file = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)), '../dags/test_on_failure_callback.py'
+ )
+ dagbag = DagBag(dag_folder=dag_file, include_examples=False)
+ dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+ with create_session() as session, NamedTemporaryFile(delete=False) as callback_file:
+ session.query(TaskInstance).delete()
+ dag = dagbag.get_dag('test_om_failure_callback_dag')
+ task = dag.get_task(task_id='test_om_failure_callback_task')
+
+ ti = TaskInstance(task, DEFAULT_DATE, State.RUNNING)
+
+ session.add(ti)
+ session.commit()
+
+ requests = [
+ FailureCallbackRequest(
+ full_filepath=dag.full_filepath,
+ simple_task_instance=SimpleTaskInstance(ti),
+ msg="Message"
+ )
+ ]
+ callback_file.close()
+
+ with mock.patch.dict("os.environ", {"AIRFLOW_CALLBACK_FILE": callback_file.name}):
+ dag_file_processor.process_file(dag_file, requests)
+ with open(callback_file.name) as callback_file2:
+ content = callback_file2.read()
+ self.assertEqual("Callbacck fired", content)
Review comment:
cck typo
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io commented on issue #7597: [AIRFLOW-6497][WIP]
Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
codecov-io commented on issue #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593006305
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=h1) Report
> Merging [#7597](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/063a35f58823d53eccc723b44d02c356e81ea6f9?src=pr&el=desc) will **decrease** coverage by `0.23%`.
> The diff coverage is `83.07%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7597/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7597 +/- ##
==========================================
- Coverage 86.76% 86.52% -0.24%
==========================================
Files 896 896
Lines 42653 42701 +48
==========================================
- Hits 37006 36946 -60
- Misses 5647 5755 +108
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `85.5% <73.52%> (-1.16%)` | :arrow_down: |
| [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.61% <93.54%> (+0.42%)` | :arrow_up: |
| [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
| [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
| [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
| [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.69% <0%> (-25.26%)` | :arrow_down: |
| [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
| [airflow/providers/google/cloud/operators/dlp.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9kbHAucHk=) | `96.25% <0%> (-3.75%)` | :arrow_down: |
| [airflow/providers/amazon/aws/hooks/s3.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9ob29rcy9zMy5weQ==) | `96.33% <0%> (-0.28%)` | :arrow_down: |
| [airflow/providers/google/cloud/hooks/dlp.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2hvb2tzL2RscC5weQ==) | `98.55% <0%> (-0.15%)` | :arrow_down: |
| ... and [6 more](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=footer). Last update [063a35f...733de5c](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7597:
[AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593006305
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=h1) Report
> Merging [#7597](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/063a35f58823d53eccc723b44d02c356e81ea6f9?src=pr&el=desc) will **decrease** coverage by `0.17%`.
> The diff coverage is `83.07%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7597/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7597 +/- ##
==========================================
- Coverage 86.76% 86.58% -0.18%
==========================================
Files 896 896
Lines 42653 43132 +479
==========================================
+ Hits 37006 37345 +339
- Misses 5647 5787 +140
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `85.5% <73.52%> (-1.16%)` | :arrow_down: |
| [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.61% <93.54%> (+0.42%)` | :arrow_up: |
| [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
| [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
| [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
| [airflow/utils/process\_utils.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9wcm9jZXNzX3V0aWxzLnB5) | `72.36% <0%> (-27.64%)` | :arrow_down: |
| [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.69% <0%> (-25.26%)` | :arrow_down: |
| [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
| [airflow/providers/google/cloud/operators/dlp.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9kbHAucHk=) | `96.25% <0%> (-3.75%)` | :arrow_down: |
| [airflow/providers/amazon/aws/hooks/s3.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9ob29rcy9zMy5weQ==) | `96.33% <0%> (-0.28%)` | :arrow_down: |
| ... and [10 more](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=footer). Last update [063a35f...cb455dc](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7597:
[AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7597: [AIRFLOW-6497][WIP] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386065568
##########
File path: tests/jobs/test_scheduler_job.py
##########
@@ -1015,14 +1012,52 @@ def test_kill_zombies(self, mock_ti_handle_failure):
session.add(ti)
session.commit()
- zombies = [SimpleTaskInstance(ti)]
- dag_file_processor.kill_zombies(dagbag, zombies)
+ requests = [
+ FailureCallbackRequest(
+ full_filepath="A",
+ simple_task_instance=SimpleTaskInstance(ti),
+ msg="Message"
+ )
+ ]
+ dag_file_processor.execute_on_failure_callbacks(dagbag, requests)
mock_ti_handle_failure.assert_called_once_with(
- mock.ANY,
+ "Message",
conf.getboolean('core', 'unit_test_mode'),
mock.ANY
)
+ def test_process_file_should_failure_callback(self):
+ dag_file = os.path.join(
+ os.path.dirname(os.path.realpath(__file__)), '../dags/test_on_failure_callback.py'
+ )
+ dagbag = DagBag(dag_folder=dag_file, include_examples=False)
+ dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
+ with create_session() as session, NamedTemporaryFile(delete=False) as callback_file:
+ session.query(TaskInstance).delete()
+ dag = dagbag.get_dag('test_om_failure_callback_dag')
+ task = dag.get_task(task_id='test_om_failure_callback_task')
+
+ ti = TaskInstance(task, DEFAULT_DATE, State.RUNNING)
+
+ session.add(ti)
+ session.commit()
+
+ requests = [
+ FailureCallbackRequest(
+ full_filepath=dag.full_filepath,
+ simple_task_instance=SimpleTaskInstance(ti),
+ msg="Message"
+ )
+ ]
+ callback_file.close()
+
+ with mock.patch.dict("os.environ", {"AIRFLOW_CALLBACK_FILE": callback_file.name}):
+ dag_file_processor.process_file(dag_file, requests)
+ with open(callback_file.name) as callback_file2:
+ content = callback_file2.read()
+ self.assertEqual("Callbacck fired", content)
Review comment:
Done
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7597:
[AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386339683
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -1616,26 +1633,6 @@ def _validate_and_run_task_instances(self, simple_dag_bag: SimpleDagBag) -> bool
self._process_executor_events(simple_dag_bag)
return True
- def _process_and_execute_tasks(self, simple_dag_bag):
Review comment:
Moving code around like this makes PRs harder to review -- as you are mixing two different things here and making more work for reviewers -- and this change is unrelated to "not loading DAGs in main scheduler loop".
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7597:
[AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r387810889
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -752,37 +758,35 @@ def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> Li
return dags
@provide_session
- def kill_zombies(self, dagbag, zombies, session=None):
+ def execute_on_failure_callbacks(self, dagbag, failure_callback_requests, session=None):
"""
- Fail given zombie tasks, which are tasks that haven't
- had a heartbeat for too long, in the current DagBag.
+ Execute on failure callbacks. These objects can come from SchedulerJob or from
+ DagFileProcessorManager.
- :param zombies: zombie task instances to kill.
- :type zombies: List[airflow.models.taskinstance.SimpleTaskInstance]
+ :param failure_callback_requests: failure callbacks to execute
+ :type failure_callback_requests: List[airflow.utils.dag_processing.FailureCallbackRequest]
:param session: DB session.
"""
TI = models.TaskInstance
- for zombie in zombies:
- if zombie.dag_id in dagbag.dags:
- dag = dagbag.dags[zombie.dag_id]
- if zombie.task_id in dag.task_ids:
- task = dag.get_task(zombie.task_id)
- ti = TI(task, zombie.execution_date)
+ for request in failure_callback_requests:
+ if request.simple_task_instance.dag_id in dagbag.dags:
+ dag = dagbag.dags[request.simple_task_instance.dag_id]
+ if request.simple_task_instance.task_id in dag.task_ids:
+ task = dag.get_task(request.simple_task_instance.task_id)
+ ti = TI(task, request.simple_task_instance.execution_date)
# Get properties needed for failure handling from SimpleTaskInstance.
- ti.start_date = zombie.start_date
- ti.end_date = zombie.end_date
- ti.try_number = zombie.try_number
- ti.state = zombie.state
+ ti.start_date = request.simple_task_instance.start_date
+ ti.end_date = request.simple_task_instance.end_date
+ ti.try_number = request.simple_task_instance.try_number
+ ti.state = request.simple_task_instance.state
ti.test_mode = self.UNIT_TEST_MODE
- ti.handle_failure("{} detected as zombie".format(ti),
- ti.test_mode, ti.get_template_context())
- self.log.info('Marked zombie job %s as %s', ti, ti.state)
- Stats.incr('zombies_killed')
Review comment:
Needs mention in API and we need to remove `zombies_killed` from the metrics list
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on issue #7597: [AIRFLOW-6497] Avoid
loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
ashb commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593356649
> This change should speed up the main scheduler loop
Do you have any numbers for this please?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid
loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593431538
This also protects against killing the scheduler by incorrect error handling functions. If sys.exit(1) appears in the handler code, the scheduler will not be stopped. It will only affect one DAG.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj edited a comment on issue #7597: [AIRFLOW-6497]
Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj edited a comment on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593427244
> Do you have any numbers for this please?
It is very difficult to measure because it depends on the specific DAG File. Some DAG files take up to 30 seconds or more to load. During this time, the scheduler loop is stopped and does not start any new tasks. I can measure how long it takes to load example_dags, but it's not just a subset of cases. It doesn't provide real values,... but I created a spreadsheet:
When I ran the following script:
```python
import os
import sys
import time
from contextlib import contextmanager
import psutil
from airflow.models import DagBag
@contextmanager
def timing_ctx():
time1 = time.time()
try:
yield
finally:
time2 = time.time()
diff = (time2 - time1) * 1000.0
print('Time: %0.3f ms' % diff)
def get_process_memory():
process = psutil.Process(os.getpid())
return process.memory_info().rss
@contextmanager
def memory_ctx():
before = get_process_memory()
try:
yield
finally:
after = get_process_memory()
diff = after - before
print('Memory: %d bytes' % diff)
filename = sys.argv[1]
with timing_ctx(), memory_ctx():
print("Filename:", filename)
DagBag(dag_folder=filename, include_examples=False, store_serialized_dags=False)
```
```
find airflow/providers/google/cloud/example_dags/ -type f | sort| grep -v "__init__.py" | grep -v "__init__.py" | xargs -n 1 readlink -e | xargs -t -n 1 python /files/performance/load_dag_perf_test.py
```
I got following values:
https://docs.google.com/spreadsheets/d/1T0kLEQLSU5ujxU-W_PoxddbkEgWx70EQkpwjRLNWaic/edit?usp=sharing
In this case, IPC communication should be very fast. I suspect it should take less than 3% of all DAG loading time. So it can be assumed that the main loop is faster by the time of loading the module.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid
loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593427244
> Do you have any numbers for this please?
It is very difficult to measure because it depends on the specific DAG File. Some DAG files take up to 30 seconds or more to load. During this time, the scheduler loop is stopped and does not start any new tasks. I can measure how long it takes to load example_dags, but it's not just a subset of cases. It doesn't provide real values,... but I created a spreadsheet:
When I ran the following script:
```python
import os
import sys
import time
from contextlib import contextmanager
import psutil
from airflow.models import DagBag
@contextmanager
def timing_ctx():
time1 = time.time()
try:
yield
finally:
time2 = time.time()
diff = (time2 - time1) * 1000.0
print('Time: %0.3f ms' % diff)
def get_process_memory():
process = psutil.Process(os.getpid())
return process.memory_info().rss
@contextmanager
def memory_ctx():
before = get_process_memory()
try:
yield
finally:
after = get_process_memory()
diff = after - before
print('Memory: %d bytes' % diff)
filename = sys.argv[1]
with timing_ctx(), memory_ctx():
print("Filename:", filename)
DagBag(dag_folder=filename, include_examples=False, store_serialized_dags=False)
```
```
find airflow/providers/google/cloud/example_dags/ -type f | sort| grep -v "__init__.py" | grep -v "__init__.py" | xargs -n 1 readlink -e | xargs -t -n 1 python /files/performance/load_dag_perf_test.py
```
I got following values:
https://docs.google.com/spreadsheets/d/1T0kLEQLSU5ujxU-W_PoxddbkEgWx70EQkpwjRLNWaic/edit?usp=sharing
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj edited a comment on issue #7597: [AIRFLOW-6497]
Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj edited a comment on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593431538
This also protects against killing the scheduler by incorrect error handling functions. If sys.exit(1) appears in the handler code, the scheduler will not be stopped. It will only affect one DAG file.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7597:
[AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386390242
##########
File path: airflow/utils/dag_processing.py
##########
@@ -655,6 +693,7 @@ def start(self):
# Update number of loop iteration.
self._num_run += 1
+ simple_dags = self.collect_results()
Review comment:
Yes. First we need to create processes, and then we can read the results. Otherwise, we will never get the value on the first iteration of the loop. Other solution: we can increase loop iteration nunber: https://github.com/apache/airflow/blob/cb455dc81162680f90edcd78400e1ef46c09766d/tests/utils/test_dag_processing.py#L291
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] codecov-io edited a comment on issue #7597:
[AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593006305
# [Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=h1) Report
> Merging [#7597](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/063a35f58823d53eccc723b44d02c356e81ea6f9?src=pr&el=desc) will **decrease** coverage by `0.2%`.
> The diff coverage is `82.45%`.
[![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7597/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=tree)
```diff
@@ Coverage Diff @@
## master #7597 +/- ##
==========================================
- Coverage 86.76% 86.55% -0.21%
==========================================
Files 896 897 +1
Lines 42653 42763 +110
==========================================
+ Hits 37006 37014 +8
- Misses 5647 5749 +102
```
| [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=tree) | Coverage Δ | |
|---|---|---|
| [airflow/utils/dag\_processing.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9kYWdfcHJvY2Vzc2luZy5weQ==) | `88.26% <75%> (+1.61%)` | :arrow_up: |
| [airflow/jobs/scheduler\_job.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9qb2JzL3NjaGVkdWxlcl9qb2IucHk=) | `90.64% <92%> (+0.45%)` | :arrow_up: |
| [airflow/kubernetes/volume\_mount.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZV9tb3VudC5weQ==) | `44.44% <0%> (-55.56%)` | :arrow_down: |
| [airflow/kubernetes/volume.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3ZvbHVtZS5weQ==) | `52.94% <0%> (-47.06%)` | :arrow_down: |
| [airflow/kubernetes/pod\_launcher.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3BvZF9sYXVuY2hlci5weQ==) | `47.18% <0%> (-45.08%)` | :arrow_down: |
| [airflow/utils/process\_utils.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9wcm9jZXNzX3V0aWxzLnB5) | `73.25% <0%> (-26.75%)` | :arrow_down: |
| [...viders/cncf/kubernetes/operators/kubernetes\_pod.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvY25jZi9rdWJlcm5ldGVzL29wZXJhdG9ycy9rdWJlcm5ldGVzX3BvZC5weQ==) | `69.69% <0%> (-25.26%)` | :arrow_down: |
| [airflow/kubernetes/refresh\_config.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9rdWJlcm5ldGVzL3JlZnJlc2hfY29uZmlnLnB5) | `50.98% <0%> (-23.53%)` | :arrow_down: |
| [airflow/providers/google/cloud/operators/dlp.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9kbHAucHk=) | `96.25% <0%> (-3.75%)` | :arrow_down: |
| [airflow/config\_templates/airflow\_local\_settings.py](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree#diff-YWlyZmxvdy9jb25maWdfdGVtcGxhdGVzL2FpcmZsb3dfbG9jYWxfc2V0dGluZ3MucHk=) | `64.28% <0%> (-1.76%)` | :arrow_down: |
| ... and [17 more](https://codecov.io/gh/apache/airflow/pull/7597/diff?src=pr&el=tree-more) | |
------
[Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=continue).
> **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
> `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
> Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=footer). Last update [063a35f...fbb647c](https://codecov.io/gh/apache/airflow/pull/7597?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7597:
[AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r387811869
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -752,37 +758,35 @@ def _find_dags_to_process(self, dags: List[DAG], paused_dag_ids: Set[str]) -> Li
return dags
@provide_session
- def kill_zombies(self, dagbag, zombies, session=None):
+ def execute_on_failure_callbacks(self, dagbag, failure_callback_requests, session=None):
"""
- Fail given zombie tasks, which are tasks that haven't
- had a heartbeat for too long, in the current DagBag.
+ Execute on failure callbacks. These objects can come from SchedulerJob or from
+ DagFileProcessorManager.
- :param zombies: zombie task instances to kill.
- :type zombies: List[airflow.models.taskinstance.SimpleTaskInstance]
+ :param failure_callback_requests: failure callbacks to execute
+ :type failure_callback_requests: List[airflow.utils.dag_processing.FailureCallbackRequest]
:param session: DB session.
"""
TI = models.TaskInstance
- for zombie in zombies:
- if zombie.dag_id in dagbag.dags:
- dag = dagbag.dags[zombie.dag_id]
- if zombie.task_id in dag.task_ids:
- task = dag.get_task(zombie.task_id)
- ti = TI(task, zombie.execution_date)
+ for request in failure_callback_requests:
+ if request.simple_task_instance.dag_id in dagbag.dags:
Review comment:
We can create a local var `simple_ti` to avoid repeating `request.simple_task_instance` in this loop
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7597:
[AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r387294958
##########
File path: airflow/utils/dag_processing.py
##########
@@ -655,6 +693,7 @@ def start(self):
# Update number of loop iteration.
self._num_run += 1
+ simple_dags = self.collect_results()
Review comment:
Cool, sounds good as you have it
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj merged pull request #7597: [AIRFLOW-6497] Avoid
loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj merged pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7597:
[AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
ashb commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386340502
##########
File path: airflow/utils/dag_processing.py
##########
@@ -655,6 +693,7 @@ def start(self):
# Update number of loop iteration.
self._num_run += 1
+ simple_dags = self.collect_results()
Review comment:
Was there a reason you moved the position of this call?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid
loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj commented on issue #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#issuecomment-593430117
It is worth noting that this also solves one more problem. The modules are always reloaded.
https://issues.apache.org/jira/projects/AIRFLOW/issues/AIRFLOW-6497?filter=allopenissues
so when someone makes a change in the additional module it is correctly executed. Its old version is not executed. This can be a problem because the handler is often stored in helper functions and is shared among many DAGs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
[GitHub] [airflow] mik-laj commented on a change in pull request #7597:
[AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #7597: [AIRFLOW-6497] Avoid loading DAGs in the main scheduler loop
URL: https://github.com/apache/airflow/pull/7597#discussion_r386390242
##########
File path: airflow/utils/dag_processing.py
##########
@@ -655,6 +693,7 @@ def start(self):
# Update number of loop iteration.
self._num_run += 1
+ simple_dags = self.collect_results()
Review comment:
Yes. First we need to create processes, and then we can read the results. Otherwise, we will never get the value on the first iteration of the loop. Other solution, we can increase loop iteration nunber: https://github.com/apache/airflow/blob/cb455dc81162680f90edcd78400e1ef46c09766d/tests/utils/test_dag_processing.py#L291
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services