You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/05 18:47:56 UTC

[GitHub] [airflow] sternr opened a new pull request, #28128: Add retry to the scheduler loop to protect against DB hiccups

sternr opened a new pull request, #28128:
URL: https://github.com/apache/airflow/pull/28128

   Add retry loop in case where sql query fails, this makes AF much more resilient to potential DB hiccups
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on PR #28128:
URL: https://github.com/apache/airflow/pull/28128#issuecomment-1443963659

   Why was this pull request abandoned?
   Is there some other method of handling some db hiccups?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #28128:
URL: https://github.com/apache/airflow/pull/28128#issuecomment-1337941153

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1040616353


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,21 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                max_retry_count = conf.getint('scheduler', 'scheduler_loop_max_retries', fallback=5)

Review Comment:
   What is this configuration? If you’re adding a new key, you need to add it to the configuration declaration. See files in `airflow/config_templates`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] closed pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #28128: Add retry to the scheduler loop to protect against DB hiccups
URL: https://github.com/apache/airflow/pull/28128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1045070732


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,24 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                
+                for attempt in run_with_db_retries():
+                    with attempt:
+                        start_time = time.time()
+                        try:
+                            with create_session() as session:
+                                num_queued_tis = self._do_scheduling(session)
+
+                                self.executor.heartbeat()
+                                session.expunge_all()
+                                num_finished_events = self._process_executor_events(session=session)
+                        except OperationalError as e:
+                            end_time = time.time()
+                            self.log.error("got a MySql exception (retry:" + str(
+                                attempt.retry_state.attempt_number) + ", total time in seconds: " + str(
+                                end_time - start_time) + "), details: " + str(e))

Review Comment:
   Do not use concatenations in logging
   
   ```suggestion
                               self.log.error(
                                   "got a MySql exception (retry: %s total time in seconds: %.3f), details: %s",
                                   attempt.retry_state.attempt_number,
                                   end_time - start_time,
                                   e
                               )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sternr commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
sternr commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1045124170


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,24 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                
+                for attempt in run_with_db_retries():
+                    with attempt:
+                        start_time = time.time()
+                        try:
+                            with create_session() as session:
+                                num_queued_tis = self._do_scheduling(session)
+
+                                self.executor.heartbeat()
+                                session.expunge_all()
+                                num_finished_events = self._process_executor_events(session=session)
+                        except OperationalError as e:
+                            end_time = time.time()
+                            self.log.error("got a MySql exception (retry:" + str(
+                                attempt.retry_state.attempt_number) + ", total time in seconds: " + str(
+                                end_time - start_time) + "), details: " + str(e))

Review Comment:
   Thanks, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sternr commented on pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
sternr commented on PR #28128:
URL: https://github.com/apache/airflow/pull/28128#issuecomment-1344216383

   Migrated the loop to use `airflow.utils.retries.run_with_db_retries`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #28128:
URL: https://github.com/apache/airflow/pull/28128#issuecomment-1345235142

   Static checks :( 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sternr commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
sternr commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1041501668


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,21 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                max_retry_count = conf.getint('scheduler', 'scheduler_loop_max_retries', fallback=5)

Review Comment:
   Since upgrading to 2.x (we've tried multiple versions up to 2.3.1) we've been having many sporadic "MySql has gone away" exceptions causing the scheduler to either hang or crash. and since adding this simple retry our scheduler had zero such limbo cases for weeks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #28128:
URL: https://github.com/apache/airflow/pull/28128#issuecomment-1426904819

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1046700074


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,22 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                
+                for attempt in run_with_db_retries():
+                    with attempt:
+                        start_time = time.time()
+                        try:
+                            with create_session() as session:
+                                num_queued_tis = self._do_scheduling(session)
+
+                                self.executor.heartbeat()
+                                session.expunge_all()
+                                num_finished_events = self._process_executor_events(session=session)
+                        except OperationalError as e:
+                            total_time = time.time() - start_time
+                            self.log.error("got a DB exception (retry:%d, total time in seconds: %d), details: %s", attempt.retry_state.attempt_number, total_time, e)

Review Comment:
   IIRC tenacity logs the exception automatically, so this is sort of duplicating. We can likely remove `details: %s` from the message.
   
   Also it’d be best for a log message to be a complete sentence with proper capitalisation and punctuation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1040685948


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,21 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                max_retry_count = conf.getint('scheduler', 'scheduler_loop_max_retries', fallback=5)

Review Comment:
   Yes. Optional keys should be added as a commented out value (there are several examples) so they can be discovered by users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1041052669


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,21 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                max_retry_count = conf.getint('scheduler', 'scheduler_loop_max_retries', fallback=5)

Review Comment:
   that get's the number of retries from `conf.getint('database', 'max_db_retries', fallback=3)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1058099167


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,22 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                
+                for attempt in run_with_db_retries():
+                    with attempt:
+                        start_time = time.time()
+                        try:
+                            with create_session() as session:
+                                num_queued_tis = self._do_scheduling(session)
+
+                                self.executor.heartbeat()
+                                session.expunge_all()
+                                num_finished_events = self._process_executor_events(session=session)
+                        except OperationalError as e:
+                            total_time = time.time() - start_time
+                            self.log.error("got a DB exception (retry:%d, total time in seconds: %d), details: %s", attempt.retry_state.attempt_number, total_time, e)

Review Comment:
   Or we could raise a different exception with a clearer message here for tenecity to log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sternr commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
sternr commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1047066039


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,22 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                
+                for attempt in run_with_db_retries():
+                    with attempt:
+                        start_time = time.time()
+                        try:
+                            with create_session() as session:
+                                num_queued_tis = self._do_scheduling(session)
+
+                                self.executor.heartbeat()
+                                session.expunge_all()
+                                num_finished_events = self._process_executor_events(session=session)
+                        except OperationalError as e:
+                            total_time = time.time() - start_time
+                            self.log.error("got a DB exception (retry:%d, total time in seconds: %d), details: %s", attempt.retry_state.attempt_number, total_time, e)

Review Comment:
   Do you prefer I completely remove the try & ctch and let tenacity handle it completely (it does print the exception, it just might be a little less easier to infer the context of the error)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sternr commented on pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
sternr commented on PR #28128:
URL: https://github.com/apache/airflow/pull/28128#issuecomment-1347579450

   Anything missing for this to be merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #28128:
URL: https://github.com/apache/airflow/pull/28128#issuecomment-1444477621

   > Why was this pull request abandoned? Is there some other method of handling some db hiccups?
   
   because the author abandoned it. If you wish to pick-up after and continue to do so - feel free. 
   
   Best is not to have hiccups (i.e. improving stability of your DB). The assumption of Airlfow is that DB is generally stable. Recovering from "any" DB hiccup is not easy and investing into it is not a goal for Airlfow community in genera. Airflow is not 99.999% available systtem (having robust system like that is a significant investment and few order of magnitude more complexity). Occasionally failing and restarting components where such hiccups do happen is also a viable approach (and this is one that Airflow supports). Generally when any airflow component fails, you should restart it and it should recover. Assumption is that those kind of problems happen rather rarely - this is the **right** approach for your deployment.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1041053680


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,21 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                max_retry_count = conf.getint('scheduler', 'scheduler_loop_max_retries', fallback=5)

Review Comment:
   Also why do we need this? Can you let us know on what issue you faced and with which Airflow version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1041052216


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,21 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                max_retry_count = conf.getint('scheduler', 'scheduler_loop_max_retries', fallback=5)

Review Comment:
   Why new config?
   
   We already have `airflow.utils.retries.run_with_db_retries`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tanelk commented on pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
tanelk commented on PR #28128:
URL: https://github.com/apache/airflow/pull/28128#issuecomment-1339265853

   Shouldn't this be done using the `airflow.utils.retries.run_with_db_retries` helper method?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sternr commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
sternr commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1041504109


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,21 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                max_retry_count = conf.getint('scheduler', 'scheduler_loop_max_retries', fallback=5)

Review Comment:
   Do you prefer I reuse the same conf.getint('database', 'max_db_retries', fallback=3)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] sternr commented on a diff in pull request #28128: Add retry to the scheduler loop to protect against DB hiccups

Posted by GitBox <gi...@apache.org>.
sternr commented on code in PR #28128:
URL: https://github.com/apache/airflow/pull/28128#discussion_r1040667989


##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,21 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                max_retry_count = conf.getint('scheduler', 'scheduler_loop_max_retries', fallback=5)

Review Comment:
   Since I use fallback it's not mandatory, do you prefer I add it to config template for documentation



##########
airflow/jobs/scheduler_job.py:
##########
@@ -880,13 +880,21 @@ def _run_scheduler_loop(self) -> None:
                     # is finished to avoid concurrent access to the DB.
                     self.log.debug("Waiting for processors to finish since we're using sqlite")
                     self.processor_agent.wait_until_finished()
-
-                with create_session() as session:
-                    num_queued_tis = self._do_scheduling(session)
-
-                    self.executor.heartbeat()
-                    session.expunge_all()
-                    num_finished_events = self._process_executor_events(session=session)
+                max_retry_count = conf.getint('scheduler', 'scheduler_loop_max_retries', fallback=5)

Review Comment:
   Since I use fallback it's not mandatory, do you prefer I add it to config template for documentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org