You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/04/13 12:51:30 UTC

[GitHub] [airflow] nclaeys opened a new pull request, #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

nclaeys opened a new pull request, #22976:
URL: https://github.com/apache/airflow/pull/22976

   closes: #20982 
   
   ---
   Updated the kubernetes_executor code to take into account manually launched tasks (through the web-ui).
   This way they are not reset as orphaned tasks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22976:
URL: https://github.com/apache/airflow/pull/22976#issuecomment-1374969395

   This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nclaeys commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
nclaeys commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r852991803


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -696,8 +701,7 @@ def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance
             for pod in pod_list.items:
                 self.adopt_launched_task(kube_client, pod, pod_ids)
         self._adopt_completed_pods(kube_client)
-        tis_to_flush.extend(pod_ids.values())
-        return tis_to_flush
+        return list(pod_ids.values())

Review Comment:
   I will look at that this week and create a new PR for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nclaeys commented on pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
nclaeys commented on PR #22976:
URL: https://github.com/apache/airflow/pull/22976#issuecomment-1149924030

   @potiuk is there anything blocking the PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tanelk commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
tanelk commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r991862739


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -700,9 +700,14 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'

Review Comment:
   I believe they mean this https://github.com/apache/airflow/blob/cb8c671eeaae44fae0f5b0135097ce4476f4141f/airflow/www/views.py#L1760-L1767
   
   This is not related to manually triggering a dagrun, but running a single task in an existing dagrun - perhaps that caused some confusion.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nclaeys commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
nclaeys commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r906860054


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -689,9 +689,14 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'
+        if any(ti for ti in tis if not ti.queued_by_job_id):
+            scheduler_job_ids.add('manual')

Review Comment:
   You are right, not sure how to handle that within Airflow. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tanelk commented on pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
tanelk commented on PR #22976:
URL: https://github.com/apache/airflow/pull/22976#issuecomment-1119286278

   @ashb can you take a look at this bugfix. It has flown under the radar.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nclaeys commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
nclaeys commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r871428746


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -689,9 +689,14 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'
+        if any(ti for ti in tis if not ti.queued_by_job_id):

Review Comment:
   I am not sure which issues you are talking about, which makes it difficult for me to implement it. Also if you are speaking in general about queued_by_job_id not being filled in than that is an issue for any executor, so should that logic not reside in the scheduler code itself instead of in the kubernetes_executor? At this moment I would feel uncomfortable adding that and I believe it is a better fit in a separate ticket...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r892422290


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -689,9 +689,14 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'
+        if any(ti for ti in tis if not ti.queued_by_job_id):
+            scheduler_job_ids.add('manual')

Review Comment:
   This introduces a new race condition with multiple schedulers:
   
   If there is a manually launched task, and two schedulers both try to adopt this task, one of them will win (as in the label will be changed) but both schedulers will end up adding the pod to their `running` list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by "ashb (via GitHub)" <gi...@apache.org>.
ashb commented on PR #22976:
URL: https://github.com/apache/airflow/pull/22976#issuecomment-1538976341

   Removing this PR form the milestone as it appears the OP has no time to finish it off.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] boring-cyborg[bot] commented on pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on PR #22976:
URL: https://github.com/apache/airflow/pull/22976#issuecomment-1098013169

   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better 🚀.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] commented on pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #22976:
URL: https://github.com/apache/airflow/pull/22976#issuecomment-1121228695

   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tanelk commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
tanelk commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r907466635


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -689,9 +689,14 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'
+        if any(ti for ti in tis if not ti.queued_by_job_id):
+            scheduler_job_ids.add('manual')

Review Comment:
   I think this check allready covers this: https://github.com/apache/airflow/blob/c95081406fa47ee737fa7937be4f13e47b2befca/airflow/executors/kubernetes_executor.py#L717-L734
   
   It would be nice to have a UT for it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #22976:
URL: https://github.com/apache/airflow/pull/22976#issuecomment-1174415526

   can you please rebase @nclaeys ? there is a conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #22976:
URL: https://github.com/apache/airflow/pull/22976#issuecomment-1149939262

   > @potiuk is there anything blocking the PR?
   
   Why asking me? If I see correctly, there are some changes requested by @ashb?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nclaeys commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
nclaeys commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r868253532


##########
tests/executors/test_kubernetes_executor.py:
##########
@@ -504,6 +505,39 @@ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_la
         mock_adopt_completed_pods.assert_called_once()
         assert reset_tis == []  # This time our return is empty - no TIs to reset
 
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task')
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods')
+    def test_try_adopt_task_instances_manual_runs(self, mock_adopt_completed_pods, mock_adopt_launched_task):

Review Comment:
   No indeed it is no manual dag run but indeed a task triggered by a user through a run action. I see the confusion and will rename the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] uranusjr commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
uranusjr commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r850155201


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -696,8 +701,7 @@ def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance
             for pod in pod_list.items:
                 self.adopt_launched_task(kube_client, pod, pod_ids)
         self._adopt_completed_pods(kube_client)
-        tis_to_flush.extend(pod_ids.values())
-        return tis_to_flush
+        return list(pod_ids.values())

Review Comment:
   I took a look at the call sites of this, it seems that non of the actual usages strictly rely on this being a list; we should change the annotation to `Sequence[TaskInstance]` and return `pod_ids.values()` instead.
   
   This would need to change a bunch more files so it’s OK to do this in a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] github-actions[bot] closed pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.
URL: https://github.com/apache/airflow/pull/22976


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tanelk commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
tanelk commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r907468255


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -689,9 +689,14 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'
+        if any(ti for ti in tis if not ti.queued_by_job_id):
+            scheduler_job_ids.add('manual')

Review Comment:
   The `pod_ids` is built from the `tis` and that is constructed with row locks.



##########
airflow/executors/kubernetes_executor.py:
##########
@@ -689,9 +689,14 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'
+        if any(ti for ti in tis if not ti.queued_by_job_id):
+            scheduler_job_ids.add('manual')

Review Comment:
   The `pod_ids` is built from the `tis` and that is constructed with row locks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] potiuk commented on pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
potiuk commented on PR #22976:
URL: https://github.com/apache/airflow/pull/22976#issuecomment-1121226659

   I guess that oone should go to 2.3.1.  It looks good but I likely need second pair of eyes @ashb @dstandish ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ashb commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
ashb commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r868196898


##########
tests/executors/test_kubernetes_executor.py:
##########
@@ -504,6 +505,39 @@ def test_try_adopt_task_instances(self, mock_adopt_completed_pods, mock_adopt_la
         mock_adopt_completed_pods.assert_called_once()
         assert reset_tis == []  # This time our return is empty - no TIs to reset
 
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor.adopt_launched_task')
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesExecutor._adopt_completed_pods')
+    def test_try_adopt_task_instances_manual_runs(self, mock_adopt_completed_pods, mock_adopt_launched_task):

Review Comment:
   "manual" is the wrong name here - elsewhere in the code base that commonly refers to a manual dag run, but the scheduler still handles the queuing etc.
   
   But here your are not (I think) talking about a manual dag run, but a run action performed on a single TaskInstance, correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] hterik commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
hterik commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r991854664


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -700,9 +700,14 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'

Review Comment:
   Are you sure about this? We us the REST API a lot to trigger dags and I've never seen any pod with `airflow-worker=manual`. Do you have any pointers to where the _manual_ value is created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nclaeys commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
nclaeys commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r871428746


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -689,9 +689,14 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'
+        if any(ti for ti in tis if not ti.queued_by_job_id):

Review Comment:
   I am not sure which issues you are talking about, which makes it difficult for me to implement it. Also if you are speaking in general about queued_by_job_id not being filled in than that is an issue for any executor, so should that logic not reside in the scheduler code itself instead of in the kubernetes_executor? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nclaeys commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
nclaeys commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r871428746


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -689,9 +689,14 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'
+        if any(ti for ti in tis if not ti.queued_by_job_id):

Review Comment:
   I am not sure which issues you are talking about, which makes it difficult for me to implement it. Also if you are speaking in general about queued_by_job_id not being filled in than that is an issue for any executor, so should that logic not reside in the scheduler code itself instead of in the kubernetes_executor? At this moment I would feel uncomfortable adding that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] SamWheating commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
SamWheating commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r869782418


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -689,9 +689,14 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'
+        if any(ti for ti in tis if not ti.queued_by_job_id):

Review Comment:
   This change will fix the issue of tasks being incorrectly reset, but I think that in some cases we could still could end up with empty fields in the database, which might cause issues elsewhere or later on. 
   
   Would it be possible instead to just set the `queued_by_job_id`  field to `manual` when the taskInstance is initially queued? We already have to manually write the `queued_dttm` for similar reasons. 
   
   https://github.com/apache/airflow/blob/cfa95af7e83b067787d8d6596caa3bc97f4b25bd/airflow/www/views.py#L1811-L1819



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] nclaeys commented on pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
nclaeys commented on PR #22976:
URL: https://github.com/apache/airflow/pull/22976#issuecomment-1174705499

   > can you please rebase @nclaeys ? there is a conflict.
   
   Resolved conflict


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] tanelk commented on a diff in pull request #22976: Make sure jobs triggered by airflow web are not identified as orphaned.

Posted by GitBox <gi...@apache.org>.
tanelk commented on code in PR #22976:
URL: https://github.com/apache/airflow/pull/22976#discussion_r907468553


##########
airflow/executors/kubernetes_executor.py:
##########
@@ -689,9 +689,14 @@ def _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str,
         self.event_buffer[key] = state, None
 
     def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance]:
-        tis_to_flush = [ti for ti in tis if not ti.queued_by_job_id]
-        scheduler_job_ids = {ti.queued_by_job_id for ti in tis}
-        pod_ids = {ti.key: ti for ti in tis if ti.queued_by_job_id}
+        scheduler_job_ids = {ti.queued_by_job_id for ti in tis if ti.queued_by_job_id}
+
+        # Tasks triggered through API will have no ti.queued_by_job_id
+        # and their pod will have label 'airflow-worker=manual'
+        if any(ti for ti in tis if not ti.queued_by_job_id):
+            scheduler_job_ids.add('manual')

Review Comment:
   The `pod_ids` is built from the `tis` and that is constructed with row locks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org