You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/03/08 16:12:40 UTC

[GitHub] [airflow] michaelmicheal opened a new pull request #22092: Find Pod Before Cleanup In KubernetesPodOperator Execution

michaelmicheal opened a new pull request #22092:
URL: https://github.com/apache/airflow/pull/22092


   <!--
   
   closes: #21169
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   
   As outlined in [this issue](https://github.com/apache/airflow/issues/21169), running multiple KubernetesPodOperators with  `random_name_suffix=False` and `is_delete_pod_operator=True` leads to
   1. First task creating a pod (with name `'my_pod'` for example)
   2. Second task attempting to create a pod with the same name and failing because a pod with name `'my_pod'` already exists
   3. Second tasks deletes pod with name `'my_pod'`, which is the pod from the first task.
   
   Ideally the second tasks shouldn't delete the pod from the first task, so I added a check to make sure a task's pod exists with the `find_pod` method before calling the `cleanup` function (which handles the deletion of the pod).
   
   ### Validation
   To reproduce the issue and validate this change I ran two dag runs of the following DAG at the same time.
   ```python
   from datetime import timedelta
   from airflow import models
   from airflow import utils
   from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
   
   dag = models.DAG(
       'kubernetes_change_validation',
       start_date=utils.dates.days_ago(2),
       max_active_runs=3,
       dagrun_timeout=timedelta(minutes=5),
       schedule_interval='@daily'
   )
   
   test_kubernetes_pod= KubernetesPodOperator(
       namespace='my_namespace',
       image="busybox",
       cmds=['sh', '-c', 'sleep 600'],
       name="test_kubernetes_pod",
       in_cluster=True,
       task_id="test_kubernetes_pod",
       get_logs=True,
       random_name_suffix=False,
       dag=dag,
       is_delete_operator_pod=True
   )
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] SamWheating commented on a change in pull request #22092: Find Pod Before Cleanup In KubernetesPodOperator Execution

Posted by GitBox <gi...@apache.org>.
SamWheating commented on a change in pull request #22092:
URL: https://github.com/apache/airflow/pull/22092#discussion_r840959284



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -385,19 +384,23 @@ def execute(self, context: 'Context'):
 
             if self.do_xcom_push:
                 result = self.extract_xcom(pod=self.pod)
-            remote_pod = self.pod_manager.await_pod_completion(self.pod)
+            self.pod_manager.await_pod_completion(self.pod)
         finally:
             self.cleanup(
                 pod=self.pod or self.pod_request_obj,
-                remote_pod=remote_pod,
+                context=context,
             )
         ti = context['ti']
         ti.xcom_push(key='pod_name', value=self.pod.metadata.name)
         ti.xcom_push(key='pod_namespace', value=self.pod.metadata.namespace)
         if self.do_xcom_push:
             return result
 
-    def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
+    def cleanup(self, pod: k8s.V1Pod, context):

Review comment:
       ```suggestion
       def cleanup(self, pod: k8s.V1Pod, context: 'Context'):
   ```
   
   Just to maintain the type-hint from the above function




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #22092: Find Pod Before Cleanup In KubernetesPodOperator Execution

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #22092:
URL: https://github.com/apache/airflow/pull/22092#issuecomment-1061948223


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, itโ€™s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better ๐Ÿš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jedcunningham commented on a change in pull request #22092: Find Pod Before Cleanup In KubernetesPodOperator Execution

Posted by GitBox <gi...@apache.org>.
jedcunningham commented on a change in pull request #22092:
URL: https://github.com/apache/airflow/pull/22092#discussion_r822067701



##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -386,10 +386,14 @@ def execute(self, context: 'Context'):
                 result = self.extract_xcom(pod=self.pod)
             remote_pod = self.pod_manager.await_pod_completion(self.pod)
         finally:
-            self.cleanup(
-                pod=self.pod or self.pod_request_obj,
-                remote_pod=remote_pod,
-            )
+            pod = self.pod or self.pod_request_obj
+            if self.find_pod(pod.metadata.namespace, context=context):

Review comment:
       I wonder if we should move this into `cleanup` instead. If we are going to grab the pod from the k8s api, we might as well use that status in `cleanup` as it could be more up to date.

##########
File path: airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
##########
@@ -386,10 +386,14 @@ def execute(self, context: 'Context'):
                 result = self.extract_xcom(pod=self.pod)
             remote_pod = self.pod_manager.await_pod_completion(self.pod)
         finally:
-            self.cleanup(
-                pod=self.pod or self.pod_request_obj,
-                remote_pod=remote_pod,
-            )
+            pod = self.pod or self.pod_request_obj
+            if self.find_pod(pod.metadata.namespace, context=context):
+                self.cleanup(
+                    pod=pod,
+                    remote_pod=remote_pod,
+                )
+            else:
+                self.log.error("Could not find pod with name: %s and namespace: %s", pod.metadata.name, pod.metadata.namespace)

Review comment:
       We should probably say what we didn't do since we didn't find the pod, and it can't just be based on name, it's for this TI. Maybe something along these lines (we could probably be more verbose if we want)?
   
   `Could not find pod for this TI to delete`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org