You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/01/14 03:10:34 UTC
[GitHub] [airflow] kaxil opened a new pull request #13662: Stop creating duplicate Dag File Processors
kaxil opened a new pull request #13662:
URL: https://github.com/apache/airflow/pull/13662
When a dag file is executed via Dag File Processors and multiple callbacks are
created either via zombies or executor events, the dag file is added to
the _file_path_queue and the manager will launch a new process to
process it, which it should not since the dag file is currently under
processing. This will bypass the _parallelism eventually especially when
it takes long time to process some dag files.
This address the same issue as https://github.com/apache/airflow/pull/11875
but instead does not exlucde filepaths that are recently processed and that
run at limit (which is only used in tests) when Callbacks are sent by the
Agent. This is by design as execution of Callbacks is critical. This is done
with a caveat to avoid duplicate processor -- i.e. if a processor exists,
instead of removing the file path from the queue it is removed from the
beginning of the queue to the end. This means that the processor with
the filepath to run callback is still run before other filepaths are added.
Tests are added to check the same.
closes https://github.com/apache/airflow/issues/13047
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
---
**^ Add meaningful description above**
Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #13662: Stop creating duplicate Dag File Processors
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#discussion_r557486424
##########
File path: tests/utils/test_dag_processing.py
##########
@@ -142,6 +142,72 @@ def test_max_runs_when_no_files(self):
child_pipe.close()
parent_pipe.close()
+ @pytest.mark.backend("mysql", "postgres")
+ def test_start_new_processes_with_same_filepath(self):
+ """
+ Test that when a processor already exist with a filepath, a new processor won't be created
+ with that filepath. The filepath will just be removed from the list.
+ """
+ manager = DagFileProcessorManager(
+ dag_directory='directory',
+ max_runs=1,
+ processor_factory=MagicMock().return_value,
+ processor_timeout=timedelta.max,
+ signal_conn=MagicMock(),
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=True,
+ )
+
+ file_1 = 'file_1.py'
+ file_2 = 'file_2.py'
+ file_3 = 'file_3.py'
+ manager._file_path_queue = [file_1, file_2, file_3]
+
+ manager._processors[file_1] = MagicMock()
+ manager.start_new_processes()
+
+ assert file_1 in manager._processors.keys()
+ assert file_2 in manager._processors.keys()
+ assert [file_3] == manager._file_path_queue
Review comment:
Check processor_factory is only called once
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #13662: Stop creating duplicate Dag File Processors
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#issuecomment-760482623
[The Workflow run](https://github.com/apache/airflow/actions/runs/486235244) is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #13662: Stop creating duplicate Dag File Processors
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#discussion_r557436671
##########
File path: airflow/utils/dag_processing.py
##########
@@ -988,6 +993,15 @@ def start_new_processes(self):
"""Start more processors if we have enough slots and files to process"""
while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
file_path = self._file_path_queue.pop(0)
+ # Stop creating duplicate processor i.e. processor with the same filepath
+ if file_path in self._processors.keys():
+ # If filepath is no longer in the queue and if callback exists to execute
+ # we add the filepath to the end of queue so it still runs before
+ # new filepaths are added to the queue as we should runs callbacks asap
+ if file_path not in self._file_path_queue and self._callback_to_execute[file_path]:
+ self._file_path_queue.append(file_path)
Review comment:
I am in two minds about this. We can remove these lines -- and let it get executed when the file is added again to file_path_queue
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #13662: Stop creating duplicate Dag File Processors
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#issuecomment-761024998
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #13662: Stop creating duplicate Dag File Processors
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#discussion_r557436671
##########
File path: airflow/utils/dag_processing.py
##########
@@ -988,6 +993,15 @@ def start_new_processes(self):
"""Start more processors if we have enough slots and files to process"""
while self._parallelism - len(self._processors) > 0 and self._file_path_queue:
file_path = self._file_path_queue.pop(0)
+ # Stop creating duplicate processor i.e. processor with the same filepath
+ if file_path in self._processors.keys():
+ # If filepath is no longer in the queue and if callback exists to execute
+ # we add the filepath to the end of queue so it still runs before
+ # new filepaths are added to the queue as we should runs callbacks asap
+ if file_path not in self._file_path_queue and self._callback_to_execute[file_path]:
+ self._file_path_queue.append(file_path)
Review comment:
I am in two minds about this. We can remove these lines -- and let it get executed when the file is added again to file_path_queue.
(The case is tested / explained in `test_start_new_processes_with_same_filepath_with_callback` test below)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil merged pull request #13662: Stop creating duplicate Dag File Processors
Posted by GitBox <gi...@apache.org>.
kaxil merged pull request #13662:
URL: https://github.com/apache/airflow/pull/13662
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] kaxil commented on a change in pull request #13662: Stop creating duplicate Dag File Processors
Posted by GitBox <gi...@apache.org>.
kaxil commented on a change in pull request #13662:
URL: https://github.com/apache/airflow/pull/13662#discussion_r557648539
##########
File path: tests/utils/test_dag_processing.py
##########
@@ -142,6 +142,72 @@ def test_max_runs_when_no_files(self):
child_pipe.close()
parent_pipe.close()
+ @pytest.mark.backend("mysql", "postgres")
+ def test_start_new_processes_with_same_filepath(self):
+ """
+ Test that when a processor already exist with a filepath, a new processor won't be created
+ with that filepath. The filepath will just be removed from the list.
+ """
+ manager = DagFileProcessorManager(
+ dag_directory='directory',
+ max_runs=1,
+ processor_factory=MagicMock().return_value,
+ processor_timeout=timedelta.max,
+ signal_conn=MagicMock(),
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=True,
+ )
+
+ file_1 = 'file_1.py'
+ file_2 = 'file_2.py'
+ file_3 = 'file_3.py'
+ manager._file_path_queue = [file_1, file_2, file_3]
+
+ manager._processors[file_1] = MagicMock()
+ manager.start_new_processes()
+
+ assert file_1 in manager._processors.keys()
+ assert file_2 in manager._processors.keys()
+ assert [file_3] == manager._file_path_queue
Review comment:
@ashb Updated in https://github.com/apache/airflow/pull/13662/commits/b7fd3749d2db2ff8edc12fecec2cb7f6534a0e5f
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org