You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "apilaskowski (via GitHub)" <gi...@apache.org> on 2023/02/28 17:04:26 UTC

[GitHub] [airflow] apilaskowski opened a new pull request, #29815: Making AF more resistant to DB hiccups

apilaskowski opened a new pull request, #29815:
URL: https://github.com/apache/airflow/pull/29815

   I used @sternr [https://github.com/apache/airflow/pull/28128](https://www.google.com/url?q=https://github.com/apache/airflow/pull/28128&sa=D&source=buganizer&usg=AOvVaw0DzCD6xPqrovlmc_RDBKFX) as a base for this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1509389217

   Ok. I will.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1479843337

   @potiuk @uranusjr 
   What do you think about using @retry_db_transaction, which is already used extensively across Airflow?
   I proposed a solution, which for sure is much better that previous one.
   Now, I am not retrying whole blocks of code, which was a bad approach.
   
   I think, however I am not sure, that we might want to introduce @retry_db_transaction for every create_session used (as proposed in base_job.run function.
   
   I hope that this is acceptable.
   My tests suggests that it is enough to prevent temporary unavailable db.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1453217790

   If connection with db is broken then those changes to the db (at this point) would also take place, right?
   Is there some rollback that I missed?
   Is it possible that there is some non-"broken connection" related problem, which could be negatively influenced by those changes?
   
   For us it is critical that expensive job does not get completely restarted, when it does not need to (due to some minor db hiccup).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1146001016


##########
airflow/jobs/base_job.py:
##########
@@ -175,6 +176,17 @@ def kill(self, session=None):
     def on_kill(self):
         """Will be called when an external kill command is received."""
 
+    @retry_db_transaction
+    def handle_db_transaction_with_session(self, task_function, session):
+        return task_function(session)
+  
+    def handle_db_task(self, task_function):
+        try:
+            with create_session() as session:
+                return self.handle_db_transaction_with_session(task_function, session)
+        except OperationalError:
+            raise

Review Comment:
   Why? This does nothing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1453013130

   There are multiple db commits in the block and I wonder if the internal state would be messed up if the db fails midway through and the entire block got rerun.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bjankie1 commented on pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "bjankie1 (via GitHub)" <gi...@apache.org>.
bjankie1 commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1457711870

   > There are multiple db commits in the block and I wonder if the internal state would be messed up if the db fails midway through and the entire block got rerun.
   
   What multiple commits do you mean? I see the changes being related to heartbeat alone. Even in case of retry the changes executed in heartbeat block should be idempotent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1150760871


##########
airflow/jobs/base_job.py:
##########
@@ -210,54 +222,63 @@ def heartbeat(self, only_if_necessary: bool = False):
 
         previous_heartbeat = self.latest_heartbeat
 
+        def session_merge(session):
+            session.merge(self)
+
         try:
-            with create_session() as session:
-                # This will cause it to load from the db
-                session.merge(self)
-                previous_heartbeat = self.latest_heartbeat
+            self.handle_db_task(task_function = session_merge)
+        except OperationalError:
+            # We didn't manage to heartbeat, so make sure that the timestamp isn't updated
+            self.latest_heartbeat = previous_heartbeat
+
+        previous_heartbeat = self.latest_heartbeat
 
-            if self.state in State.terminating_states:
-                self.kill()
+        if self.state in State.terminating_states:
+            self.kill()
 
-            # Figure out how long to sleep for
-            sleep_for = 0
-            if self.latest_heartbeat:
-                seconds_remaining = (
-                    self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
-                )
-                sleep_for = max(0, seconds_remaining)
-            sleep(sleep_for)
+        # Figure out how long to sleep for
+        sleep_for = 0
+        if self.latest_heartbeat:
+            seconds_remaining = (
+                self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
+            )
+            sleep_for = max(0, seconds_remaining)
+        sleep(sleep_for)
 
-            # Update last heartbeat time
-            with create_session() as session:
-                # Make the session aware of this object
-                session.merge(self)
-                self.latest_heartbeat = timezone.utcnow()
-                session.commit()
-                # At this point, the DB has updated.
-                previous_heartbeat = self.latest_heartbeat
+        def session_merge_and_commit(session):
+            # Make the session aware of this object
+            session.merge(self)
+            self.latest_heartbeat = timezone.utcnow()
+            session.commit()

Review Comment:
   @potiuk, @uranusjr I can not locate the second commit in heartbeat, unless it is implicite.
   What do you think about moving this commit to the end of this block? I though that there were second commit (which I must have imagined myself) there.
   Can there be a commit within heartbeat_callback function? If so, should we lean into excluding this callback from this transaction?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] notatallshaw commented on pull request #29815: Making AF more resistant to DB hiccups

Posted by "notatallshaw (via GitHub)" <gi...@apache.org>.
notatallshaw commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1448699221

   Small thing, and I'm not sure if Airflow devs already have a policy about this but time.time() is not guaranteed to be monotonic so you could report a negative value when you do time.time() - start_time.
   
   You can use time.monotonic() or time.perf_counter() to get monotonic time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1147342288


##########
airflow/jobs/base_job.py:
##########
@@ -210,54 +222,63 @@ def heartbeat(self, only_if_necessary: bool = False):
 
         previous_heartbeat = self.latest_heartbeat
 
+        def session_merge(session):
+            session.merge(self)
+
         try:
-            with create_session() as session:
-                # This will cause it to load from the db
-                session.merge(self)
-                previous_heartbeat = self.latest_heartbeat
+            self.handle_db_task(task_function = session_merge)
+        except OperationalError:
+            # We didn't manage to heartbeat, so make sure that the timestamp isn't updated
+            self.latest_heartbeat = previous_heartbeat
+
+        previous_heartbeat = self.latest_heartbeat
 
-            if self.state in State.terminating_states:
-                self.kill()
+        if self.state in State.terminating_states:
+            self.kill()
 
-            # Figure out how long to sleep for
-            sleep_for = 0
-            if self.latest_heartbeat:
-                seconds_remaining = (
-                    self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
-                )
-                sleep_for = max(0, seconds_remaining)
-            sleep(sleep_for)
+        # Figure out how long to sleep for
+        sleep_for = 0
+        if self.latest_heartbeat:
+            seconds_remaining = (
+                self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
+            )
+            sleep_for = max(0, seconds_remaining)
+        sleep(sleep_for)
 
-            # Update last heartbeat time
-            with create_session() as session:
-                # Make the session aware of this object
-                session.merge(self)
-                self.latest_heartbeat = timezone.utcnow()
-                session.commit()
-                # At this point, the DB has updated.
-                previous_heartbeat = self.latest_heartbeat
+        def session_merge_and_commit(session):
+            # Make the session aware of this object
+            session.merge(self)
+            self.latest_heartbeat = timezone.utcnow()
+            session.commit()

Review Comment:
   Yeah. I share very similar concerns. I think we should be very deliberate on retrying full transactions, there are likely subtle issues that we don' realise by allowing retries mid-transaction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1151889757


##########
airflow/jobs/base_job.py:
##########
@@ -210,54 +222,63 @@ def heartbeat(self, only_if_necessary: bool = False):
 
         previous_heartbeat = self.latest_heartbeat
 
+        def session_merge(session):
+            session.merge(self)
+
         try:
-            with create_session() as session:
-                # This will cause it to load from the db
-                session.merge(self)
-                previous_heartbeat = self.latest_heartbeat
+            self.handle_db_task(task_function = session_merge)
+        except OperationalError:
+            # We didn't manage to heartbeat, so make sure that the timestamp isn't updated
+            self.latest_heartbeat = previous_heartbeat
+
+        previous_heartbeat = self.latest_heartbeat
 
-            if self.state in State.terminating_states:
-                self.kill()
+        if self.state in State.terminating_states:
+            self.kill()
 
-            # Figure out how long to sleep for
-            sleep_for = 0
-            if self.latest_heartbeat:
-                seconds_remaining = (
-                    self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
-                )
-                sleep_for = max(0, seconds_remaining)
-            sleep(sleep_for)
+        # Figure out how long to sleep for
+        sleep_for = 0
+        if self.latest_heartbeat:
+            seconds_remaining = (
+                self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
+            )
+            sleep_for = max(0, seconds_remaining)
+        sleep(sleep_for)
 
-            # Update last heartbeat time
-            with create_session() as session:
-                # Make the session aware of this object
-                session.merge(self)
-                self.latest_heartbeat = timezone.utcnow()
-                session.commit()
-                # At this point, the DB has updated.
-                previous_heartbeat = self.latest_heartbeat
+        def session_merge_and_commit(session):
+            # Make the session aware of this object
+            session.merge(self)
+            self.latest_heartbeat = timezone.utcnow()
+            session.commit()

Review Comment:
   I am actually now working on refactoring this part of the code in #30255, #30302 and #30308 (and one more to follow) in order to handle AIP-44 ( https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44 ) . There is already a part there to split the method into separate steps (#30308) and I think the discussion should happen around the time I do it there as it wil be changing slightly the behaviour of this particular transaction (this is not yet reviewed change so we might have more discussions there - for now I am waiting for the #30255 and #30302 to be reviewed and merged as they are prerequisite for that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] bjankie1 commented on pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "bjankie1 (via GitHub)" <gi...@apache.org>.
bjankie1 commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1457710161

   @potiuk DB connection failures happen. Airflow is running in distributed environment. As much as we want to have a reliable infrastructure it's not possible all the time. Heartbeat is the most frequent operation in the airflow DB and it's success has a significant impact on the task success or failure. What problems do you anticipate with making it more resilient to disruptions?
   
   I'm looking forward to AIP-44 and at the same time finding this change not disrupting and reducing chances of task failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1151778620


##########
airflow/jobs/base_job.py:
##########
@@ -210,54 +222,63 @@ def heartbeat(self, only_if_necessary: bool = False):
 
         previous_heartbeat = self.latest_heartbeat
 
+        def session_merge(session):
+            session.merge(self)
+
         try:
-            with create_session() as session:
-                # This will cause it to load from the db
-                session.merge(self)
-                previous_heartbeat = self.latest_heartbeat
+            self.handle_db_task(task_function = session_merge)
+        except OperationalError:
+            # We didn't manage to heartbeat, so make sure that the timestamp isn't updated
+            self.latest_heartbeat = previous_heartbeat
+
+        previous_heartbeat = self.latest_heartbeat
 
-            if self.state in State.terminating_states:
-                self.kill()
+        if self.state in State.terminating_states:
+            self.kill()
 
-            # Figure out how long to sleep for
-            sleep_for = 0
-            if self.latest_heartbeat:
-                seconds_remaining = (
-                    self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
-                )
-                sleep_for = max(0, seconds_remaining)
-            sleep(sleep_for)
+        # Figure out how long to sleep for
+        sleep_for = 0
+        if self.latest_heartbeat:
+            seconds_remaining = (
+                self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
+            )
+            sleep_for = max(0, seconds_remaining)
+        sleep(sleep_for)
 
-            # Update last heartbeat time
-            with create_session() as session:
-                # Make the session aware of this object
-                session.merge(self)
-                self.latest_heartbeat = timezone.utcnow()
-                session.commit()
-                # At this point, the DB has updated.
-                previous_heartbeat = self.latest_heartbeat
+        def session_merge_and_commit(session):
+            # Make the session aware of this object
+            session.merge(self)
+            self.latest_heartbeat = timezone.utcnow()
+            session.commit()

Review Comment:
   Another approach, which I was considering is to add commit as last operation in ```handle_db_transaction_with_session``` and to never have another write to db from the ```handle_db_transaction```. 
   
   This way we have only a single update on the db, which will happen as last operation. If there were any problems, db should be clean at any time. Do you think that it could be feasible to have commit as last command and forbid using commit mid transaction?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1147129731


##########
airflow/jobs/base_job.py:
##########
@@ -210,54 +222,63 @@ def heartbeat(self, only_if_necessary: bool = False):
 
         previous_heartbeat = self.latest_heartbeat
 
+        def session_merge(session):
+            session.merge(self)
+
         try:
-            with create_session() as session:
-                # This will cause it to load from the db
-                session.merge(self)
-                previous_heartbeat = self.latest_heartbeat
+            self.handle_db_task(task_function = session_merge)
+        except OperationalError:
+            # We didn't manage to heartbeat, so make sure that the timestamp isn't updated
+            self.latest_heartbeat = previous_heartbeat
+
+        previous_heartbeat = self.latest_heartbeat
 
-            if self.state in State.terminating_states:
-                self.kill()
+        if self.state in State.terminating_states:
+            self.kill()
 
-            # Figure out how long to sleep for
-            sleep_for = 0
-            if self.latest_heartbeat:
-                seconds_remaining = (
-                    self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
-                )
-                sleep_for = max(0, seconds_remaining)
-            sleep(sleep_for)
+        # Figure out how long to sleep for
+        sleep_for = 0
+        if self.latest_heartbeat:
+            seconds_remaining = (
+                self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
+            )
+            sleep_for = max(0, seconds_remaining)
+        sleep(sleep_for)
 
-            # Update last heartbeat time
-            with create_session() as session:
-                # Make the session aware of this object
-                session.merge(self)
-                self.latest_heartbeat = timezone.utcnow()
-                session.commit()
-                # At this point, the DB has updated.
-                previous_heartbeat = self.latest_heartbeat
+        def session_merge_and_commit(session):
+            # Make the session aware of this object
+            session.merge(self)
+            self.latest_heartbeat = timezone.utcnow()
+            session.commit()

Review Comment:
   Hmm, the more I think, the more problematic this commit seems (I think there’s at least one other below). Say a database executes this commit, and then hiccups during `heartbeat_callback`. This entire block would be retried again with an extra update. That doesn’t feel right to me. We should perhaps either
   
   a. remove this commit so the entire block shares one single transaction and gets rolled back and restarted when something bad happens, or
   b. split this into two blocks (by the `commit` call), and retry them separately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1458999232

   > When each `create_session` block exits, it calls `session.commit()` and writes to the database. So say if a database failure happens during lines 226–232, the entire block would be restarted and you end up writing the database twice without calling `heartbeat_callback` correctly. I don’t know if it would be problematic, but it is awkward. Ideally I would imagine the two `create_session` blocks should be retried separately instead.
   
   @uranusjr Yep. This is very awkward.  And this awkwardness is precisely what I am going to change during AIP-44 implementation. There is an (very unfinished) draft commit:  https://github.com/apache/airflow/commit/f11f5afbddfe39a9f0e31bc1fc1ba3cc1dfa5394 that depends on merging https://github.com/apache/airflow/pull/29776 which will solve this issue (by splitting the "before", "during", and "after" task into three steps (and three DB sessions when internal DB api won't be used - when the internal API will be used there will be no "during" session at all).
   
   > @potiuk DB connection failures happen. Airflow is running in distributed environment. As much as we want to have a reliable infrastructure it's not possible all the time. Heartbeat is the most frequent operation in the airflow DB and it's success has a significant impact on the task success or failure. What problems do you anticipate with making it more resilient to disruptions?
   
   @bjankie1  Yes. I understand that and sympathise with such statement. But the solution to that is not to retry a failed transaction without looking at the consequences (pointed out by @uranusjr nicely) . The proposal of yours is a "band-aid" which might create more problems. The right approach for making system resilent to DB problems is to understand every single DB transaction that happens in the system and deliberately design behaviour of what happens if the transaction fails and act appropriately to recover. Retrying failed transaction without understanding of the consequences is a recipe for disaster. And yes in this case the whole transaction is ... awward - as also nicely pointed out by @uranusjr. and the right approach is to fix the behaviour to make more sense (and so you can reason about it) and only after that to implement recovery scenario (which might actually be not needed - because the way I think it will work when I complete the refactor is that those transactions 
 will be split into tthree (or two in case of internal DB API) so that there will be no need to recover becuse the problem you observe will not exist.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1457729988

   When each `create_session` block exits, it calls `session.commit()` and writes to the database. So say if a database failure happens during lines 226–232, the entire block would be restarted and you end up writing the database twice without calling `heartbeat_callback` correctly. I don’t know if it would be problematic, but it is awkward. Ideally I would imagine the two `create_session` blocks should be retried separately instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1150739380


##########
airflow/jobs/base_job.py:
##########
@@ -210,54 +222,63 @@ def heartbeat(self, only_if_necessary: bool = False):
 
         previous_heartbeat = self.latest_heartbeat
 
+        def session_merge(session):
+            session.merge(self)
+
         try:
-            with create_session() as session:
-                # This will cause it to load from the db
-                session.merge(self)
-                previous_heartbeat = self.latest_heartbeat
+            self.handle_db_task(task_function = session_merge)
+        except OperationalError:
+            # We didn't manage to heartbeat, so make sure that the timestamp isn't updated
+            self.latest_heartbeat = previous_heartbeat
+
+        previous_heartbeat = self.latest_heartbeat
 
-            if self.state in State.terminating_states:
-                self.kill()
+        if self.state in State.terminating_states:
+            self.kill()
 
-            # Figure out how long to sleep for
-            sleep_for = 0
-            if self.latest_heartbeat:
-                seconds_remaining = (
-                    self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
-                )
-                sleep_for = max(0, seconds_remaining)
-            sleep(sleep_for)
+        # Figure out how long to sleep for
+        sleep_for = 0
+        if self.latest_heartbeat:
+            seconds_remaining = (
+                self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
+            )
+            sleep_for = max(0, seconds_remaining)
+        sleep(sleep_for)
 
-            # Update last heartbeat time
-            with create_session() as session:
-                # Make the session aware of this object
-                session.merge(self)
-                self.latest_heartbeat = timezone.utcnow()
-                session.commit()
-                # At this point, the DB has updated.
-                previous_heartbeat = self.latest_heartbeat
+        def session_merge_and_commit(session):
+            # Make the session aware of this object
+            session.merge(self)
+            self.latest_heartbeat = timezone.utcnow()
+            session.commit()

Review Comment:
   I thought I did know, but I went though the code and checked SQLAlchemy to confirm, and I was wrong.
   I reverted this proposition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1146094311


##########
airflow/jobs/base_job.py:
##########
@@ -175,6 +176,17 @@ def kill(self, session=None):
     def on_kill(self):
         """Will be called when an external kill command is received."""
 
+    @retry_db_transaction
+    def handle_db_transaction_with_session(self, task_function, session):
+        return task_function(session)
+  
+    def handle_db_task(self, task_function):
+        try:
+            with create_session() as session:
+                return self.handle_db_transaction_with_session(task_function, session)
+        except OperationalError:
+            raise

Review Comment:
   You are right. I thought that I saw similar construct in code, however, I can not find it.
   Nevertheless, I will remove this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1146003470


##########
airflow/jobs/base_job.py:
##########
@@ -210,54 +222,63 @@ def heartbeat(self, only_if_necessary: bool = False):
 
         previous_heartbeat = self.latest_heartbeat
 
+        def session_merge(session):
+            session.merge(self)
+
         try:
-            with create_session() as session:
-                # This will cause it to load from the db
-                session.merge(self)
-                previous_heartbeat = self.latest_heartbeat
+            self.handle_db_task(task_function = session_merge)
+        except OperationalError:
+            # We didn't manage to heartbeat, so make sure that the timestamp isn't updated
+            self.latest_heartbeat = previous_heartbeat
+
+        previous_heartbeat = self.latest_heartbeat
 
-            if self.state in State.terminating_states:
-                self.kill()
+        if self.state in State.terminating_states:
+            self.kill()
 
-            # Figure out how long to sleep for
-            sleep_for = 0
-            if self.latest_heartbeat:
-                seconds_remaining = (
-                    self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
-                )
-                sleep_for = max(0, seconds_remaining)
-            sleep(sleep_for)
+        # Figure out how long to sleep for
+        sleep_for = 0
+        if self.latest_heartbeat:
+            seconds_remaining = (
+                self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
+            )
+            sleep_for = max(0, seconds_remaining)
+        sleep(sleep_for)
 
-            # Update last heartbeat time
-            with create_session() as session:
-                # Make the session aware of this object
-                session.merge(self)
-                self.latest_heartbeat = timezone.utcnow()
-                session.commit()
-                # At this point, the DB has updated.
-                previous_heartbeat = self.latest_heartbeat
+        def session_merge_and_commit(session):
+            # Make the session aware of this object
+            session.merge(self)
+            self.latest_heartbeat = timezone.utcnow()
+            session.commit()

Review Comment:
   This existed in the old code but I don’t get why it’s needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1150305746


##########
airflow/jobs/base_job.py:
##########
@@ -210,54 +222,63 @@ def heartbeat(self, only_if_necessary: bool = False):
 
         previous_heartbeat = self.latest_heartbeat
 
+        def session_merge(session):
+            session.merge(self)
+
         try:
-            with create_session() as session:
-                # This will cause it to load from the db
-                session.merge(self)
-                previous_heartbeat = self.latest_heartbeat
+            self.handle_db_task(task_function = session_merge)
+        except OperationalError:
+            # We didn't manage to heartbeat, so make sure that the timestamp isn't updated
+            self.latest_heartbeat = previous_heartbeat
+
+        previous_heartbeat = self.latest_heartbeat
 
-            if self.state in State.terminating_states:
-                self.kill()
+        if self.state in State.terminating_states:
+            self.kill()
 
-            # Figure out how long to sleep for
-            sleep_for = 0
-            if self.latest_heartbeat:
-                seconds_remaining = (
-                    self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
-                )
-                sleep_for = max(0, seconds_remaining)
-            sleep(sleep_for)
+        # Figure out how long to sleep for
+        sleep_for = 0
+        if self.latest_heartbeat:
+            seconds_remaining = (
+                self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
+            )
+            sleep_for = max(0, seconds_remaining)
+        sleep(sleep_for)
 
-            # Update last heartbeat time
-            with create_session() as session:
-                # Make the session aware of this object
-                session.merge(self)
-                self.latest_heartbeat = timezone.utcnow()
-                session.commit()
-                # At this point, the DB has updated.
-                previous_heartbeat = self.latest_heartbeat
+        def session_merge_and_commit(session):
+            # Make the session aware of this object
+            session.merge(self)
+            self.latest_heartbeat = timezone.utcnow()
+            session.commit()

Review Comment:
   I propose to remove session.commit().



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1509278687

   would be good to take a look at this one after the refactoring is complete. Closing for nw - and @apilaskowski - please reopen with conflict resolved if you would like to pursue it - the changes are going to be released in 2.6 and you should base the changes on those.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on pull request #29815: Making AF more resistant to DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1448539726

   @potiuk @kaxil @uranusjr 
   Please let me know if I should make any changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1146098598


##########
airflow/jobs/base_job.py:
##########
@@ -175,6 +176,17 @@ def kill(self, session=None):
     def on_kill(self):
         """Will be called when an external kill command is received."""
 
+    @retry_db_transaction
+    def handle_db_transaction_with_session(self, task_function, session):
+        return task_function(session)

Review Comment:
   I intended ```handle_db_task``` to be accessible from subclasses.
   I agree that ```handle_db_transaction_with_session``` could go into ```handle_db_task```.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1146082447


##########
airflow/jobs/base_job.py:
##########
@@ -175,6 +176,17 @@ def kill(self, session=None):
     def on_kill(self):
         """Will be called when an external kill command is received."""
 
+    @retry_db_transaction
+    def handle_db_transaction_with_session(self, task_function, session):
+        return task_function(session)

Review Comment:
   I intended it to be accessible from subclasses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1150583845


##########
airflow/jobs/base_job.py:
##########
@@ -210,54 +222,63 @@ def heartbeat(self, only_if_necessary: bool = False):
 
         previous_heartbeat = self.latest_heartbeat
 
+        def session_merge(session):
+            session.merge(self)
+
         try:
-            with create_session() as session:
-                # This will cause it to load from the db
-                session.merge(self)
-                previous_heartbeat = self.latest_heartbeat
+            self.handle_db_task(task_function = session_merge)
+        except OperationalError:
+            # We didn't manage to heartbeat, so make sure that the timestamp isn't updated
+            self.latest_heartbeat = previous_heartbeat
+
+        previous_heartbeat = self.latest_heartbeat
 
-            if self.state in State.terminating_states:
-                self.kill()
+        if self.state in State.terminating_states:
+            self.kill()
 
-            # Figure out how long to sleep for
-            sleep_for = 0
-            if self.latest_heartbeat:
-                seconds_remaining = (
-                    self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
-                )
-                sleep_for = max(0, seconds_remaining)
-            sleep(sleep_for)
+        # Figure out how long to sleep for
+        sleep_for = 0
+        if self.latest_heartbeat:
+            seconds_remaining = (
+                self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
+            )
+            sleep_for = max(0, seconds_remaining)
+        sleep(sleep_for)
 
-            # Update last heartbeat time
-            with create_session() as session:
-                # Make the session aware of this object
-                session.merge(self)
-                self.latest_heartbeat = timezone.utcnow()
-                session.commit()
-                # At this point, the DB has updated.
-                previous_heartbeat = self.latest_heartbeat
+        def session_merge_and_commit(session):
+            # Make the session aware of this object
+            session.merge(self)
+            self.latest_heartbeat = timezone.utcnow()
+            session.commit()

Review Comment:
   What would be the consequences if you do ? Do you know that ? What's the ground you propose it on?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on pull request #29815: Making AF more resistant to DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1450320902

   > Small thing, and I'm not sure if Airflow devs already have a policy about this but time.time() is not guaranteed to be monotonic so you could report a negative value when you do time.time() - start_time.
   > 
   > You can use time.monotonic() or time.perf_counter() to get monotonic time.
   
    I made a change as @notatallshaw suggested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk closed pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk closed pull request #29815: Add retry to the scheduler loop to protect against DB hiccups
URL: https://github.com/apache/airflow/pull/29815


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on PR #29815:
URL: https://github.com/apache/airflow/pull/29815#issuecomment-1457924946

   Second creation of session was introduced here:
   https://github.com/apache/airflow/commit/4905a5563d47b45e38b91661ee5aa7f3765a129b


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1146002102


##########
airflow/jobs/base_job.py:
##########
@@ -175,6 +176,17 @@ def kill(self, session=None):
     def on_kill(self):
         """Will be called when an external kill command is received."""
 
+    @retry_db_transaction
+    def handle_db_transaction_with_session(self, task_function, session):
+        return task_function(session)

Review Comment:
   Why does this need to be defined on the object, instead of a nested local function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] apilaskowski commented on a diff in pull request #29815: Add retry to the scheduler loop to protect against DB hiccups

Posted by "apilaskowski (via GitHub)" <gi...@apache.org>.
apilaskowski commented on code in PR #29815:
URL: https://github.com/apache/airflow/pull/29815#discussion_r1146082447


##########
airflow/jobs/base_job.py:
##########
@@ -175,6 +176,17 @@ def kill(self, session=None):
     def on_kill(self):
         """Will be called when an external kill command is received."""
 
+    @retry_db_transaction
+    def handle_db_transaction_with_session(self, task_function, session):
+        return task_function(session)

Review Comment:
   I intended it to be accessible from subclasses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org