You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "mhenc (via GitHub)" <gi...@apache.org> on 2023/09/01 21:06:06 UTC

[GitHub] [airflow] mhenc opened a new pull request, #34026: AIP-44 Migrate Job to Internal API

mhenc opened a new pull request, #34026:
URL: https://github.com/apache/airflow/pull/34026

   This PR migrates all methods defined in Job to Internal API. It also adds some methods for JobPydantic to make it used interchangebly with Job.
   However not all method are migrated - JobRunner still operates on ORM Job object - and it is requires, as  Job contains some "callback" methods, which can't be easily serialized and executed in Internal API. Instead they may simply do additional Internal API call.
   
   closes: #29315
   related: #30298
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34026:
URL: https://github.com/apache/airflow/pull/34026#discussion_r1315449185


##########
airflow/jobs/base_job_runner.py:
##########
@@ -27,7 +27,7 @@
     from airflow.jobs.job import Job
     from airflow.serialization.pydantic.job import JobPydantic
 
-J = TypeVar("J", "Job", "JobPydantic", "Job | JobPydantic")
+J = TypeVar("J", "Job", "Job")

Review Comment:
   This type variable (and the runner being a generic) is not needed if you don’t need multiple classes.



##########
airflow/jobs/base_job_runner.py:
##########
@@ -27,7 +27,7 @@
     from airflow.jobs.job import Job
     from airflow.serialization.pydantic.job import JobPydantic
 
-J = TypeVar("J", "Job", "JobPydantic", "Job | JobPydantic")
+J = TypeVar("J", "Job", "Job")

Review Comment:
   This type variable (and the runner being a generic) is not needed if you don’t need multiple classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #34026:
URL: https://github.com/apache/airflow/pull/34026#discussion_r1321550940


##########
airflow/jobs/job.py:
##########
@@ -190,11 +182,10 @@ def heartbeat(
 
         try:
             # This will cause it to load from the db
-            session.merge(self)
+            self._merge_from(Job._fetch_from_db(self, session))

Review Comment:
   @potiuk any update on this? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #34026:
URL: https://github.com/apache/airflow/pull/34026#discussion_r1317132549


##########
airflow/jobs/job.py:
##########
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime

Review Comment:
   Ok. Done.
   Unfortunately I can't move it under `TYPE_CHECKING` in pydantic/job.py  as Pydantic doesn't seem to like it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk merged pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk merged PR #34026:
URL: https://github.com/apache/airflow/pull/34026


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34026:
URL: https://github.com/apache/airflow/pull/34026#discussion_r1316822714


##########
airflow/jobs/base_job_runner.py:
##########
@@ -29,20 +29,19 @@
 
 J = TypeVar("J", "Job", "JobPydantic", "Job | JobPydantic")
 
-
-class BaseJobRunner(Generic[J]):
+class BaseJobRunner(Job):

Review Comment:
   ```suggestion
   class BaseJobRunner:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #34026:
URL: https://github.com/apache/airflow/pull/34026#discussion_r1315441122


##########
airflow/jobs/job.py:
##########
@@ -137,28 +134,23 @@ def is_alive(self, grace_multiplier=2.1):
         :param grace_multiplier: multiplier of heartrate to require heart beat
             within
         """
-        if self.job_type == "SchedulerJob":
-            health_check_threshold: int = conf.getint("scheduler", "scheduler_health_check_threshold")
-        elif self.job_type == "TriggererJob":
-            health_check_threshold: int = conf.getint("triggerer", "triggerer_health_check_threshold")
-        else:
-            health_check_threshold: int = self.heartrate * grace_multiplier
-        return (
-            self.state == JobState.RUNNING
-            and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < health_check_threshold
+        return Job._is_alive(
+            job_type=self.job_type,
+            heartrate=self.heartrate,
+            state=self.state,
+            latest_heartbeat=self.latest_heartbeat,
+            grace_multiplier=grace_multiplier,
         )
 
     @provide_session
     def kill(self, session: Session = NEW_SESSION) -> NoReturn:
         """Handle on_kill callback and updates state in database."""
-        job = session.scalar(select(Job).where(Job.id == self.id).limit(1))
-        job.end_date = timezone.utcnow()
         try:
             self.on_kill()
         except Exception as e:
             self.log.error("on_kill() method failed: %s", e)
-        session.merge(job)
-        session.commit()
+
+        Job._kill(job_id=self.id, session=session)

Review Comment:
   Right, but please notice that end_date was not set on `self` but on newly fetched object - so `self.on_kill()`  didn't have access to the `job.end_date`
   That's why I believe it's safe to move the updating for `end_date` after.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34026:
URL: https://github.com/apache/airflow/pull/34026#discussion_r1315222653


##########
airflow/jobs/job.py:
##########
@@ -190,11 +182,10 @@ def heartbeat(
 
         try:
             # This will cause it to load from the db
-            session.merge(self)
+            self._merge_from(Job._fetch_from_db(self, session))

Review Comment:
   I tihnk we need to meet for an interactive brainstorming session and discuss what's going on here. I believe there were some non-obvious behaviours of heartbeat that we should replicate (especially around edge cases).
   
   I tried recently to understand what's going on in this method and explain it here:  https://github.com/apache/airflow/discussions/33689#discussioncomment-6820251
   
   And I think it's not entirely obvious if we have the same now.
   
   Let's discuss on slack @mhenc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #34026:
URL: https://github.com/apache/airflow/pull/34026#discussion_r1322759865


##########
airflow/jobs/job.py:
##########
@@ -309,8 +398,8 @@ def execute_job(job: Job | JobPydantic, execute_callable: Callable[[], int | Non
     database operations or over the Internal API call.
 
     :param job: Job to execute - it can be either DB job or it's Pydantic serialized version. It does
-       not really matter, because except of running the heartbeat and state setting,
-       the runner should not modify the job state.
+    not really matter, because except of running the heartbeat and state setting,
+    the runner should not modify the job state.

Review Comment:
   No sure, probably some formatting tool. Reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34026:
URL: https://github.com/apache/airflow/pull/34026#discussion_r1322483510


##########
airflow/jobs/job.py:
##########
@@ -309,8 +398,8 @@ def execute_job(job: Job | JobPydantic, execute_callable: Callable[[], int | Non
     database operations or over the Internal API call.
 
     :param job: Job to execute - it can be either DB job or it's Pydantic serialized version. It does
-       not really matter, because except of running the heartbeat and state setting,
-       the runner should not modify the job state.
+    not really matter, because except of running the heartbeat and state setting,
+    the runner should not modify the job state.

Review Comment:
   Why change this? Doesn’t really matter but the indentation is good for readability.



##########
airflow/jobs/job.py:
##########
@@ -309,8 +398,8 @@ def execute_job(job: Job | JobPydantic, execute_callable: Callable[[], int | Non
     database operations or over the Internal API call.
 
     :param job: Job to execute - it can be either DB job or it's Pydantic serialized version. It does
-       not really matter, because except of running the heartbeat and state setting,
-       the runner should not modify the job state.
+    not really matter, because except of running the heartbeat and state setting,
+    the runner should not modify the job state.

Review Comment:
   Why change this? Doesn’t really matter but the indentation is good for readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #34026:
URL: https://github.com/apache/airflow/pull/34026#issuecomment-1731483154

   @mhenc -> one thing to remember that we have to do as a follow-up for AIP-44 as well - we need to implement the heartbeat callback.
   
   In the current implementaiton of AIP-44 the callback is still called on the client side (i.e. in the local task job) and it performs some DB methods / refreshing task instance and dag run from the DB so it is . But it also uses some "local" process manageement (terminating running task if the state changed, but also handle the "impersonation" case where we have additional parent process created in the process of switching to the impersonated user - so this method should be likely split into "retrieving state from DB" (with internal API call) and reacting to state change. 
   
   Or maybe we could refactor the heartbeat_callback approach and make a "dedicated" local task job heartbeat that will do it in a single Internal API call to both update heartbeat status AND retrieve the state and return it to be able to kill processes as reaction to external DB state change.
   
   cc: @bjankie1 - I think it would be great if you two think about it and propose some approach that would be good regarding potential optimisation of heartbeat in the future).
   
   Currently the callback is as follows.
   
   ```
       @provide_session
       def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
           """Self destruct task if state has been moved away from running externally."""
           if self.terminating:
               # ensure termination if processes are created later
               self.task_runner.terminate()
               return
   
           self.task_instance.refresh_from_db()
           ti = self.task_instance
   
           if ti.state == TaskInstanceState.RUNNING:
               fqdn = get_hostname()
               same_hostname = fqdn == ti.hostname
               if not same_hostname:
                   self.log.error(
                       "The recorded hostname %s does not match this instance's hostname %s",
                       ti.hostname,
                       fqdn,
                   )
                   raise AirflowException("Hostname of job runner does not match")
               current_pid = self.task_runner.get_process_pid()
               recorded_pid = ti.pid
               same_process = recorded_pid == current_pid
   
               if recorded_pid is not None and (ti.run_as_user or self.task_runner.run_as_user):
                   # when running as another user, compare the task runner pid to the parent of
                   # the recorded pid because user delegation becomes an extra process level.
                   # However, if recorded_pid is None, pass that through as it signals the task
                   # runner process has already completed and been cleared out. `psutil.Process`
                   # uses the current process if the parameter is None, which is not what is intended
                   # for comparison.
                   recorded_pid = psutil.Process(ti.pid).ppid()
                   same_process = recorded_pid == current_pid
   
               if recorded_pid is not None and not same_process and not IS_WINDOWS:
                   self.log.warning(
                       "Recorded pid %s does not match the current pid %s", recorded_pid, current_pid
                   )
                   raise AirflowException("PID of job runner does not match")
           elif self.task_runner.return_code() is None and hasattr(self.task_runner, "process"):
               if ti.state == TaskInstanceState.SKIPPED:
                   # A DagRun timeout will cause tasks to be externally marked as skipped.
                   dagrun = ti.get_dagrun(session=session)
                   execution_time = (dagrun.end_date or timezone.utcnow()) - dagrun.start_date
                   if ti.task.dag is not None:
                       dagrun_timeout = ti.task.dag.dagrun_timeout
                   else:
                       dagrun_timeout = None
                   if dagrun_timeout and execution_time > dagrun_timeout:
                       self.log.warning("DagRun timed out after %s.", execution_time)
   
               # potential race condition, the _run_raw_task commits `success` or other state
               # but task_runner does not exit right away due to slow process shutdown or any other reasons
               # let's do a throttle here, if the above case is true, the handle_task_exit will handle it
               if self._state_change_checks >= 1:  # defer to next round of heartbeat
                   self.log.warning(
                       "State of this instance has been externally set to %s. Terminating instance.", ti.state
                   )
                   self.terminating = True
               self._state_change_checks += 1
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34026:
URL: https://github.com/apache/airflow/pull/34026#discussion_r1316823578


##########
airflow/jobs/job.py:
##########
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime

Review Comment:
   I think we have an effort consolidating this to `import datetime` instead. And there’s another one to move this kind of imports into the `if TYPE_CHECKING` block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #34026:
URL: https://github.com/apache/airflow/pull/34026#issuecomment-1730779345

   LGTM after deeper review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] mhenc commented on a diff in pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on code in PR #34026:
URL: https://github.com/apache/airflow/pull/34026#discussion_r1316039440


##########
airflow/jobs/base_job_runner.py:
##########
@@ -27,7 +27,7 @@
     from airflow.jobs.job import Job
     from airflow.serialization.pydantic.job import JobPydantic
 
-J = TypeVar("J", "Job", "JobPydantic", "Job | JobPydantic")
+J = TypeVar("J", "Job", "Job")

Review Comment:
   Right, removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "uranusjr (via GitHub)" <gi...@apache.org>.
uranusjr commented on code in PR #34026:
URL: https://github.com/apache/airflow/pull/34026#discussion_r1316823578


##########
airflow/jobs/job.py:
##########
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import datetime

Review Comment:
   I think we have an effort consolidating this to `import datetime` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on a diff in pull request #34026: AIP-44 Migrate Job to Internal API

Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #34026:
URL: https://github.com/apache/airflow/pull/34026#discussion_r1315220044


##########
airflow/jobs/job.py:
##########
@@ -137,28 +134,23 @@ def is_alive(self, grace_multiplier=2.1):
         :param grace_multiplier: multiplier of heartrate to require heart beat
             within
         """
-        if self.job_type == "SchedulerJob":
-            health_check_threshold: int = conf.getint("scheduler", "scheduler_health_check_threshold")
-        elif self.job_type == "TriggererJob":
-            health_check_threshold: int = conf.getint("triggerer", "triggerer_health_check_threshold")
-        else:
-            health_check_threshold: int = self.heartrate * grace_multiplier
-        return (
-            self.state == JobState.RUNNING
-            and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < health_check_threshold
+        return Job._is_alive(
+            job_type=self.job_type,
+            heartrate=self.heartrate,
+            state=self.state,
+            latest_heartbeat=self.latest_heartbeat,
+            grace_multiplier=grace_multiplier,
         )
 
     @provide_session
     def kill(self, session: Session = NEW_SESSION) -> NoReturn:
         """Handle on_kill callback and updates state in database."""
-        job = session.scalar(select(Job).where(Job.id == self.id).limit(1))
-        job.end_date = timezone.utcnow()
         try:
             self.on_kill()
         except Exception as e:
             self.log.error("on_kill() method failed: %s", e)
-        session.merge(job)
-        session.commit()
+
+        Job._kill(job_id=self.id, session=session)

Review Comment:
   This is slight change in the semantics - job end_date is updated after the on_kill, but looking at implementations, that des not matter.



##########
airflow/jobs/job.py:
##########
@@ -137,28 +134,23 @@ def is_alive(self, grace_multiplier=2.1):
         :param grace_multiplier: multiplier of heartrate to require heart beat
             within
         """
-        if self.job_type == "SchedulerJob":
-            health_check_threshold: int = conf.getint("scheduler", "scheduler_health_check_threshold")
-        elif self.job_type == "TriggererJob":
-            health_check_threshold: int = conf.getint("triggerer", "triggerer_health_check_threshold")
-        else:
-            health_check_threshold: int = self.heartrate * grace_multiplier
-        return (
-            self.state == JobState.RUNNING
-            and (timezone.utcnow() - self.latest_heartbeat).total_seconds() < health_check_threshold
+        return Job._is_alive(
+            job_type=self.job_type,
+            heartrate=self.heartrate,
+            state=self.state,
+            latest_heartbeat=self.latest_heartbeat,
+            grace_multiplier=grace_multiplier,
         )
 
     @provide_session
     def kill(self, session: Session = NEW_SESSION) -> NoReturn:
         """Handle on_kill callback and updates state in database."""
-        job = session.scalar(select(Job).where(Job.id == self.id).limit(1))
-        job.end_date = timezone.utcnow()
         try:
             self.on_kill()
         except Exception as e:
             self.log.error("on_kill() method failed: %s", e)
-        session.merge(job)
-        session.commit()
+
+        Job._kill(job_id=self.id, session=session)

Review Comment:
   This is slight change in the semantics - job end_date is updated after the on_kill, but looking at implementations, that does not matter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org