You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "pierrejeambrun (via GitHub)" <gi...@apache.org> on 2023/02/13 22:24:34 UTC
[GitHub] [airflow] pierrejeambrun opened a new pull request, #29516: AIP-44 Migrate BaseJob.run to Internal API
pierrejeambrun opened a new pull request, #29516:
URL: https://github.com/apache/airflow/pull/29516
closes: https://github.com/apache/airflow/issues/29316
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on a diff in pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29516:
URL: https://github.com/apache/airflow/pull/29516#discussion_r1106524576
##########
airflow/jobs/base_job.py:
##########
@@ -245,32 +246,47 @@ def heartbeat(self, only_if_necessary: bool = False):
# We didn't manage to heartbeat, so make sure that the timestamp isn't updated
self.latest_heartbeat = previous_heartbeat
+ @staticmethod
+ @internal_api_call
+ @provide_session
+ def pre_execute(job: BaseJob, *, session: Session = NEW_SESSION) -> None:
+ """Update the database job entry before running the _execute() function."""
+ job.state = State.RUNNING
+ session.add(job)
+ session.commit()
+ make_transient(job)
Review Comment:
(BTW. This is the first case I see we try to do that, so it's not a general issue so far).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "pierrejeambrun (via GitHub)" <gi...@apache.org>.
pierrejeambrun commented on code in PR #29516:
URL: https://github.com/apache/airflow/pull/29516#discussion_r1109204792
##########
airflow/jobs/base_job.py:
##########
@@ -245,32 +246,47 @@ def heartbeat(self, only_if_necessary: bool = False):
# We didn't manage to heartbeat, so make sure that the timestamp isn't updated
self.latest_heartbeat = previous_heartbeat
+ @staticmethod
+ @internal_api_call
+ @provide_session
+ def pre_execute(job: BaseJob, *, session: Session = NEW_SESSION) -> None:
+ """Update the database job entry before running the _execute() function."""
+ job.state = State.RUNNING
+ session.add(job)
+ session.commit()
+ make_transient(job)
Review Comment:
Yep, much bigger problem :joy:.
Converting to draft as I will not be able to update before going on holidays.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] pierrejeambrun commented on pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "pierrejeambrun (via GitHub)" <gi...@apache.org>.
pierrejeambrun commented on PR #29516:
URL: https://github.com/apache/airflow/pull/29516#issuecomment-1429633373
My bad, I didn’t pay enough attention and assumed it was like other issues.
I will update the PR, thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "pierrejeambrun (via GitHub)" <gi...@apache.org>.
pierrejeambrun commented on code in PR #29516:
URL: https://github.com/apache/airflow/pull/29516#discussion_r1109204792
##########
airflow/jobs/base_job.py:
##########
@@ -245,32 +246,47 @@ def heartbeat(self, only_if_necessary: bool = False):
# We didn't manage to heartbeat, so make sure that the timestamp isn't updated
self.latest_heartbeat = previous_heartbeat
+ @staticmethod
+ @internal_api_call
+ @provide_session
+ def pre_execute(job: BaseJob, *, session: Session = NEW_SESSION) -> None:
+ """Update the database job entry before running the _execute() function."""
+ job.state = State.RUNNING
+ session.add(job)
+ session.commit()
+ make_transient(job)
Review Comment:
Yep, much bigger problem :joy:.
Putting this one in draft as I will not be able to update before going on holidays.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] pierrejeambrun commented on pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "pierrejeambrun (via GitHub)" <gi...@apache.org>.
pierrejeambrun commented on PR #29516:
URL: https://github.com/apache/airflow/pull/29516#issuecomment-1430224325
Do we have a way of actually testing the code, with `AIRFLOW__WEBSERVER__RUN_INTERNAL_API`. Last time I tried it wasn't ready but would be great to be able to run this locally with internal_api.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #29516:
URL: https://github.com/apache/airflow/pull/29516#issuecomment-1565092749
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] closed pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
URL: https://github.com/apache/airflow/pull/29516
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] pierrejeambrun commented on a diff in pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "pierrejeambrun (via GitHub)" <gi...@apache.org>.
pierrejeambrun commented on code in PR #29516:
URL: https://github.com/apache/airflow/pull/29516#discussion_r1106489444
##########
airflow/jobs/base_job.py:
##########
@@ -245,32 +246,47 @@ def heartbeat(self, only_if_necessary: bool = False):
# We didn't manage to heartbeat, so make sure that the timestamp isn't updated
self.latest_heartbeat = previous_heartbeat
+ @staticmethod
+ @internal_api_call
+ @provide_session
+ def pre_execute(job: BaseJob, *, session: Session = NEW_SESSION) -> None:
+ """Update the database job entry before running the _execute() function."""
+ job.state = State.RUNNING
+ session.add(job)
+ session.commit()
+ make_transient(job)
Review Comment:
Wondering if this `make_transient` call is necessary in case of execution on the internal api.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] mhenc commented on pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on PR #29516:
URL: https://github.com/apache/airflow/pull/29516#issuecomment-1434500007
Regarding testing it: you may check
https://github.com/apache/airflow/pull/28900#issuecomment-1431391228
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] mhenc commented on pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "mhenc (via GitHub)" <gi...@apache.org>.
mhenc commented on PR #29516:
URL: https://github.com/apache/airflow/pull/29516#issuecomment-1429518246
I don't think this is what we need to do there (sorry I was not more explicit in the bug description).
the `self.execute()` needs to be run locally (e.g on worker) as this is the part that touches customer code, your change will make it executed in Internal API sever.
What we need to do there is migrate everything that touches "session" - so the pre-execute (changing state etc) and post-execute - updateing state,end_date.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on a diff in pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on code in PR #29516:
URL: https://github.com/apache/airflow/pull/29516#discussion_r1106522839
##########
airflow/jobs/base_job.py:
##########
@@ -245,32 +246,47 @@ def heartbeat(self, only_if_necessary: bool = False):
# We didn't manage to heartbeat, so make sure that the timestamp isn't updated
self.latest_heartbeat = previous_heartbeat
+ @staticmethod
+ @internal_api_call
+ @provide_session
+ def pre_execute(job: BaseJob, *, session: Session = NEW_SESSION) -> None:
+ """Update the database job entry before running the _execute() function."""
+ job.state = State.RUNNING
+ session.add(job)
+ session.commit()
+ make_transient(job)
Review Comment:
I think there is a bigger problem here. BaseJob is an SQLAlchemy model and we are effectively trying to serialize it with internal_api call. This is not going to end well.
I think this is one of those cases where we need to do it differently and refactor the code quite a bit deeper.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] github-actions[bot] commented on pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #29516:
URL: https://github.com/apache/airflow/pull/29516#issuecomment-1495157072
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [airflow] potiuk commented on pull request #29516: AIP-44 Migrate BaseJob.run to Internal API
Posted by "potiuk (via GitHub)" <gi...@apache.org>.
potiuk commented on PR #29516:
URL: https://github.com/apache/airflow/pull/29516#issuecomment-1495824342
not stale I guess.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org