You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/12/08 17:09:52 UTC

[GitHub] [airflow] MrGeorgeOwl opened a new pull request, #28230: Add deferrable mode to Kubernetes and GKE operators

MrGeorgeOwl opened a new pull request, #28230:
URL: https://github.com/apache/airflow/pull/28230

   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   ---
   **^ Add meaningful description above**
   
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054886383


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -484,15 +496,21 @@ def extract_xcom(self, pod: k8s.V1Pod):
             return json.loads(result)
 
     def execute(self, context: Context):
-        remote_pod = None
+        """Based on the deferrable parameter runs the pod asynchronously or synchronously"""
+        if self.deferrable:
+            self.asynchronously(context)
+        else:
+            return self.synchronously(context)

Review Comment:
   https://github.com/apache/airflow/pull/28523#issuecomment-1362176160 -- Let's get this PR merged first since it was created first. We will rebase that PR on top of this :) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
VladaZakharova commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1055241805


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -437,6 +442,10 @@ def hook(self) -> KubernetesHook:
     def client(self) -> CoreV1Api:
         return self.hook.core_v1_client
 
+    def get_hook(self):
+        warnings.warn("get_hook is deprecated. Please use hook instead.", DeprecationWarning, stacklevel=2)
+        return self.hook
+

Review Comment:
   We have moved this method to be placed in one place as other methods after all the cached_properties. We will avoid such kind of changes in next PRs, thanks!
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054886383


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -484,15 +496,21 @@ def extract_xcom(self, pod: k8s.V1Pod):
             return json.loads(result)
 
     def execute(self, context: Context):
-        remote_pod = None
+        """Based on the deferrable parameter runs the pod asynchronously or synchronously"""
+        if self.deferrable:
+            self.asynchronously(context)
+        else:
+            return self.synchronously(context)

Review Comment:
   https://github.com/apache/airflow/pull/28523#issuecomment-1362176160 -- Let's get this PR merged first since it was created first. We will rebase that PR on main 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
VladaZakharova commented on PR #28230:
URL: https://github.com/apache/airflow/pull/28230#issuecomment-1380332167

   @eladkal @potiuk @kaxil 
   Hi Team! :)
   Could we please review the changes in this PR? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
VladaZakharova commented on PR #28230:
URL: https://github.com/apache/airflow/pull/28230#issuecomment-1362609277

   > 
   
   As we are adding new feature to existing operator instead of creating new operator for deferrable mode, we decided to also use KubernetesPodOperator as a base class for the GKEStartPodOperator to instantiate needed behavior. So splitting this to 2 separate PRs will be not an option here as we will need to wait until KubernetesPodOperator will be merged to main and only then add new PR for the GKEStartPodOperator. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054838589


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -484,15 +496,21 @@ def extract_xcom(self, pod: k8s.V1Pod):
             return json.loads(result)
 
     def execute(self, context: Context):
-        remote_pod = None
+        """Based on the deferrable parameter runs the pod asynchronously or synchronously"""
+        if self.deferrable:
+            self.asynchronously(context)
+        else:
+            return self.synchronously(context)

Review Comment:
   Can you rename this too.
   
   @VladaZakharova Apologies about delay in our contribution - Could you rebase once https://github.com/apache/airflow/pull/28523 is merged. That PR also has additional feature of ping-pong to show periodic logs which our customers found very useful



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1070375327


##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -305,11 +303,9 @@ def __init__(
         *,
         location: str,
         cluster_name: str,
-        use_internal_ip: bool = False,

Review Comment:
   This is going to break backwards compat!



##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -305,11 +303,9 @@ def __init__(
         *,
         location: str,
         cluster_name: str,
-        use_internal_ip: bool = False,
         project_id: str | None = None,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        regional: bool = False,

Review Comment:
   This is going to break backwards compat!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054791833


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -437,6 +442,10 @@ def hook(self) -> KubernetesHook:
     def client(self) -> CoreV1Api:
         return self.hook.core_v1_client
 
+    def get_hook(self):
+        warnings.warn("get_hook is deprecated. Please use hook instead.", DeprecationWarning, stacklevel=2)
+        return self.hook
+

Review Comment:
   Let's avoid those unrelated changes please -- makes it difficult for checking git history



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
VladaZakharova commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1055253964


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -484,15 +496,21 @@ def extract_xcom(self, pod: k8s.V1Pod):
             return json.loads(result)
 
     def execute(self, context: Context):
-        remote_pod = None
+        """Based on the deferrable parameter runs the pod asynchronously or synchronously"""
+        if self.deferrable:
+            self.asynchronously(context)
+        else:
+            return self.synchronously(context)

Review Comment:
   Sure, would be nice of you. We have added some changes to prevent memory leak and are working now on other changes to improve performance for the operator. We will let you know when it will be ready :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054884426


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -484,15 +496,21 @@ def extract_xcom(self, pod: k8s.V1Pod):
             return json.loads(result)
 
     def execute(self, context: Context):
-        remote_pod = None
+        """Based on the deferrable parameter runs the pod asynchronously or synchronously"""
+        if self.deferrable:
+            self.asynchronously(context)
+        else:
+            return self.synchronously(context)

Review Comment:
   If you prefer you contribute first and @phanikumv rebases his PR -- that is fine too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
VladaZakharova commented on PR #28230:
URL: https://github.com/apache/airflow/pull/28230#issuecomment-1387151905

   @kaxil @MrGeorgeOwl @lwyszomi 
   Hi Team! 
   Please, check separate PR for the KubernetesPodOperator: https://github.com/apache/airflow/pull/29017
   Hope that it can really speed up little bit the review process :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on PR #28230:
URL: https://github.com/apache/airflow/pull/28230#issuecomment-1454875500

   @VladaZakharova can you rebase and fix conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054232688


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -437,6 +442,10 @@ def hook(self) -> KubernetesHook:
     def client(self) -> CoreV1Api:
         return self.hook.core_v1_client
 
+    def get_hook(self):
+        warnings.warn("get_hook is deprecated. Please use hook instead.", DeprecationWarning, stacklevel=2)
+        return self.hook
+

Review Comment:
   Why this unrelated change of moving the hook position?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054839109


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -509,48 +527,141 @@ def execute(self, context: Context):
             if self.do_xcom_push:
                 self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)
                 result = self.extract_xcom(pod=self.pod)
-            remote_pod = self.pod_manager.await_pod_completion(self.pod)
+            self.remote_pod = self.pod_manager.await_pod_completion(self.pod)
         finally:
             self.cleanup(
                 pod=self.pod or self.pod_request_obj,
-                remote_pod=remote_pod,
+                remote_pod=self.remote_pod,
             )
         ti = context["ti"]
         ti.xcom_push(key="pod_name", value=self.pod.metadata.name)
         ti.xcom_push(key="pod_namespace", value=self.pod.metadata.namespace)
         if self.do_xcom_push:
             return result
 
-    def _read_pod_log_events(self, pod, *, reraise=True):
-        """Will fetch and emit events from pod"""
-        with _optionally_suppress(reraise=reraise):
-            for event in self.pod_manager.read_pod_events(pod).items:
-                self.log.error("Pod Event: %s - %s", event.reason, event.message)
+    def asynchronously(self, context: Context):

Review Comment:
   `execute_async` would be a better name. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on PR #28230:
URL: https://github.com/apache/airflow/pull/28230#issuecomment-1362175763

   https://github.com/apache/airflow/pull/28523 was created first, let's rebase on top of it and add any features if needed once it is merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by "VladaZakharova (via GitHub)" <gi...@apache.org>.
VladaZakharova commented on PR #28230:
URL: https://github.com/apache/airflow/pull/28230#issuecomment-1455010371

   @eladkal 
   Hi!
   This PR was splitted to 2 separate. Please, check the link in the comment above :)
   I think this PR can be closed in this case. But i am not the owner, so i cant.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1070372823


##########
generated/provider_dependencies.json:
##########
@@ -209,8 +209,10 @@
   "cncf.kubernetes": {
     "deps": [
       "apache-airflow>=2.3.0",
+      "asgiref>=3.5.2",
       "cryptography>=2.0.0",
-      "kubernetes>=21.7.0,<24"
+      "kubernetes>=21.7.0,<24",
+      "kubernetes_asyncio==24.2.2"

Review Comment:
   Let's not pin to a specific version, Airflow is also used as a library and an application. Pinning it to a specific version restricts what deps can be used with Airflow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal closed pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal closed pull request #28230: Add deferrable mode to Kubernetes and GKE operators
URL: https://github.com/apache/airflow/pull/28230


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
VladaZakharova commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054858418


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -484,15 +496,21 @@ def extract_xcom(self, pod: k8s.V1Pod):
             return json.loads(result)
 
     def execute(self, context: Context):
-        remote_pod = None
+        """Based on the deferrable parameter runs the pod asynchronously or synchronously"""
+        if self.deferrable:
+            self.asynchronously(context)
+        else:
+            return self.synchronously(context)

Review Comment:
   We have also implemented some features that were and still are very urgent for our costumer. That is why it was important to reach you and your team earlier, to discuss these changes. So, may be after our current PR will be merged you can rebase and discuss with us your way of improvement :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054551442


##########
docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst:
##########
@@ -131,6 +131,14 @@ And then use it in other operators:
     :start-after: [START howto_operator_gke_xcom_result]
     :end-before: [END howto_operator_gke_xcom_result]
 
+You can use deferrable mode for this action in order to run the operator asynchronously:

Review Comment:
   Could you be more descriptive about the idea you propose? I am not sure what you meant. That string is made for docs reader to inform them that deferrable mode is implemented for that pattern of usage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054546446


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -437,6 +442,10 @@ def hook(self) -> KubernetesHook:
     def client(self) -> CoreV1Api:
         return self.hook.core_v1_client
 
+    def get_hook(self):
+        warnings.warn("get_hook is deprecated. Please use hook instead.", DeprecationWarning, stacklevel=2)
+        return self.hook
+

Review Comment:
   It is little refactoring for improving code readability so all cached methods and hook properties can be found in the same place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on PR #28230:
URL: https://github.com/apache/airflow/pull/28230#issuecomment-1362785141

   > > 
   > 
   > As we are adding new feature to existing operator instead of creating new operator for deferrable mode, we decided to also use KubernetesPodOperator as a base class for the GKEStartPodOperator to instantiate needed behavior. So splitting this to 2 separate PRs will be not an option here as we will also need to wait until KubernetesPodOperator will be merged to main and only then add new PR for the GKEStartPodOperator.
   
   Yup, that is exactly what I suggest, though. If the PRs are big, it will take more time for committers like myself to review and merge. If the PRs are broken down into smaller chunks it is easier to review and merge faster. What I would suggest:
   
   1) Convert this PR to `Add "deferrable" mode to KPO`, and we will get this merged 
   2) Once (1) is merged, create a separate PR for adding deferrable mode to GKE 
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] VladaZakharova commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
VladaZakharova commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054858418


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -484,15 +496,21 @@ def extract_xcom(self, pod: k8s.V1Pod):
             return json.loads(result)
 
     def execute(self, context: Context):
-        remote_pod = None
+        """Based on the deferrable parameter runs the pod asynchronously or synchronously"""
+        if self.deferrable:
+            self.asynchronously(context)
+        else:
+            return self.synchronously(context)

Review Comment:
   We have also implemented some features that were and still are very urgent for our costumer. That is why it was important to reach you and your team earlier, to discuss these changes. So, may be after our current PR will be merged you can rebase and add additional logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054790943


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -272,6 +276,8 @@ def __init__(
         pod_runtime_info_envs: list[k8s.V1EnvVar] | None = None,
         termination_grace_period: int | None = None,
         configmaps: list[str] | None = None,
+        deferrable: bool = False,
+        poll_interval: float = 2,

Review Comment:
    Please update docstring accordingly on L212 to say it is only for async



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] eladkal commented on pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by "eladkal (via GitHub)" <gi...@apache.org>.
eladkal commented on PR #28230:
URL: https://github.com/apache/airflow/pull/28230#issuecomment-1455011791

   > @eladkal Hi! This PR was splitted to 2 separate. Please, check the link in the comment above :) I think this PR can be closed in this case. But i am not the owner, so i cant.
   
   Thanks. i thought by split some parts were extracted then we have some parts left here as a followup. Thanks for the clarification 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054890030


##########
airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -406,3 +418,83 @@ def _get_bool(val) -> bool | None:
         elif val.strip().lower() == "false":
             return False
     return None
+
+
+class AsyncKubernetesHook(KubernetesHook):
+    """Hook to use Kubernetes SDK asynchronously."""
+
+    def __init__(self, config_dict: dict | None = None, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.config_dict = config_dict
+
+    async def _load_config(self):
+        """Returns Kubernetes API session for use with requests"""
+        in_cluster = self._coalesce_param(self.in_cluster, self._get_field("in_cluster"))
+        cluster_context = self._coalesce_param(self.cluster_context, self._get_field("cluster_context"))

Review Comment:
   `self._get_field` calls `self.conn_extras` which has a DB call. 
   
   Can you override it please so that we can make it an async call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054234582


##########
docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst:
##########
@@ -131,6 +131,14 @@ And then use it in other operators:
     :start-after: [START howto_operator_gke_xcom_result]
     :end-before: [END howto_operator_gke_xcom_result]
 
+You can use deferrable mode for this action in order to run the operator asynchronously:

Review Comment:
   There are some limitations of using async KPO in its current state?
   
   What are they?
   
   



##########
docs/apache-airflow-providers-google/operators/cloud/kubernetes_engine.rst:
##########
@@ -131,6 +131,14 @@ And then use it in other operators:
     :start-after: [START howto_operator_gke_xcom_result]
     :end-before: [END howto_operator_gke_xcom_result]
 
+You can use deferrable mode for this action in order to run the operator asynchronously:

Review Comment:
   There are some limitations of using async KPO in its current state?
   
   What are they? let's document it
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054232181


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -272,6 +276,8 @@ def __init__(
         pod_runtime_info_envs: list[k8s.V1EnvVar] | None = None,
         termination_grace_period: int | None = None,
         configmaps: list[str] | None = None,
+        deferrable: bool = False,
+        poll_interval: float = 2,

Review Comment:
   What does `poll_interval` mean for non-async operator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054790943


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -272,6 +276,8 @@ def __init__(
         pod_runtime_info_envs: list[k8s.V1EnvVar] | None = None,
         termination_grace_period: int | None = None,
         configmaps: list[str] | None = None,
+        deferrable: bool = False,
+        poll_interval: float = 2,

Review Comment:
   Please update docstring accordingly on L212



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dgouju commented on pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
dgouju commented on PR #28230:
URL: https://github.com/apache/airflow/pull/28230#issuecomment-1353242270

   Hello, I'm trying to test this code on my Airflow 2.3.4 environment (GCP Cloud Composer). 
   In google.cloud.utils.kubernetes_engine_config.py, `write_permanent_gke_config_file` wants to write into /files/ , which doesn't exist in my environment:
   `[2022-12-15, 14:18:35 UTC] {taskinstance.py:1904} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/opt/python3.8/lib/python3.8/site-packages/adasdgouju/google/cloud/operators/kubernetes_engine.py", line 358, in execute
       config_file = write_permanent_gke_config_file(**config_args)  # type: ignore[arg-type]
     File "/opt/python3.8/lib/python3.8/site-packages/adasdgouju/google/cloud/utils/kubernetes_engine_config.py", line 100, in write_permanent_gke_config_file
       with open(filename, "w") as conf_file, patch_environ(
   FileNotFoundError: [Errno 2] No such file or directory: '/files/kube_config_948b954c'`
   
   I tried to use /home/airflow/gcs/data/ instead of /files/ . Pods are scheduled but almost all immediately terminated, some continue to run (as expected) while Airflow fails all tasks:
   `[2022-12-15, 14:51:12 UTC] {taskinstance.py:1904} ERROR - Task failed with exception
   Traceback (most recent call last):
     File "/opt/python3.8/lib/python3.8/site-packages/adasdgouju/google/cloud/operators/kubernetes_engine.py", line 372, in execute_complete
       return super().execute_complete(context, event)
     File "/opt/python3.8/lib/python3.8/site-packages/adasdgouju/cncf/kubernetes/operators/kubernetes_pod.py", line 575, in execute_complete
       self.cleanup(
     File "/opt/python3.8/lib/python3.8/site-packages/adasdgouju/cncf/kubernetes/operators/kubernetes_pod.py", line 644, in cleanup
       raise AirflowException(
   airflow.exceptions.AirflowException: Pod run-zf-pod-whesa52w returned a failure:
   
   remote_pod: {'api_version': 'v1',
   ...`
   
   What is your guidance to fix this?
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on PR #28230:
URL: https://github.com/apache/airflow/pull/28230#issuecomment-1382881501

   > @eladkal @potiuk @kaxil Hi Team! :) Could we please review the changes in this PR?
   
   As I had mentioned in https://github.com/apache/airflow/pull/28230#issuecomment-1362785141 , if the PR is big, it will take time to review and makes our life as reviewers hard. So please assume it will take some time to fully review the PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] dgouju commented on pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
dgouju commented on PR #28230:
URL: https://github.com/apache/airflow/pull/28230#issuecomment-1346957181

   Hello @MrGeorgeOwl ,  quick comment on GKEStartPodOperator __init__():
   Missing __init__() missing deferrable: bool = False,
   Missing self.deferrable = deferrable
   ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054544244


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -272,6 +276,8 @@ def __init__(
         pod_runtime_info_envs: list[k8s.V1EnvVar] | None = None,
         termination_grace_period: int | None = None,
         configmaps: list[str] | None = None,
+        deferrable: bool = False,
+        poll_interval: float = 2,

Review Comment:
   For non-async nothing but it will give user opportunity to configure how often trigger should check pod operator status if deferrable mode is on



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] kaxil commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
kaxil commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1054891585


##########
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:
##########
@@ -484,15 +498,21 @@ def extract_xcom(self, pod: k8s.V1Pod):
             return json.loads(result)
 
     def execute(self, context: Context):
-        remote_pod = None
+        """Based on the deferrable parameter runs the pod asynchronously or synchronously"""
+        if self.deferrable:
+            self.asynchronously(context)
+        else:
+            return self.synchronously(context)
+
+    def synchronously(self, context: Context):

Review Comment:
   same as https://github.com/apache/airflow/pull/28230/files#r1054839109
   
   `synchronously` name for a method alone doesn't make much sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1070991597


##########
generated/provider_dependencies.json:
##########
@@ -209,8 +209,10 @@
   "cncf.kubernetes": {
     "deps": [
       "apache-airflow>=2.3.0",
+      "asgiref>=3.5.2",
       "cryptography>=2.0.0",
-      "kubernetes>=21.7.0,<24"
+      "kubernetes>=21.7.0,<24",
+      "kubernetes_asyncio==24.2.2"

Review Comment:
   Change it to range



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] MrGeorgeOwl commented on a diff in pull request #28230: Add deferrable mode to Kubernetes and GKE operators

Posted by GitBox <gi...@apache.org>.
MrGeorgeOwl commented on code in PR #28230:
URL: https://github.com/apache/airflow/pull/28230#discussion_r1070988973


##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -305,11 +303,9 @@ def __init__(
         *,
         location: str,
         cluster_name: str,
-        use_internal_ip: bool = False,

Review Comment:
   Replace it with optional parameter and add deprecation message



##########
airflow/providers/google/cloud/operators/kubernetes_engine.py:
##########
@@ -305,11 +303,9 @@ def __init__(
         *,
         location: str,
         cluster_name: str,
-        use_internal_ip: bool = False,
         project_id: str | None = None,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
-        regional: bool = False,

Review Comment:
   Replace it with optional parameter and add deprecation message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org