You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by "sunank200 (via GitHub)" <gi...@apache.org> on 2024/04/19 11:23:39 UTC

[PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

sunank200 opened a new pull request, #39130:
URL: https://github.com/apache/airflow/pull/39130

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   This PR introduces improvements to the `DataprocCreateClusterOperator`, specifically addressing deficiencies in handling clusters that are created in an ERROR state when operating in deferrable mode
   
   Previously, when the `DataprocCreateClusterOperator` was operating in deferrable mode, it did not correctly handle scenarios where clusters were initially created in an ERROR state. The lack of appropriate error handling and resource cleanup led to subsequent retries encountering these errored clusters, attempting deletion, and resulting in pipeline failures. 
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)** for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a newsfragment file, named `{pr_number}.significant.rst` or `{issue_number}.significant.rst`, in [newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575668317


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",
+                }
+            )
+        else:
+            return TriggerEvent(
+                {"cluster_name": self.cluster_name, "cluster_state": cluster.status.state, "cluster": cluster}
+            )
+
+    async def handle_cancellation(self) -> None:
+        """Handle the cancellation of the trigger, cleaning up resources if necessary."""
+        self.log.info("Cancellation requested. Deleting the cluster if created.")
+        try:
+            if self.delete_on_error:
+                cluster = await self.fetch_cluster_status()
+                if cluster.status.state == ClusterStatus.State.ERROR:
+                    await self.get_async_hook().async_delete_cluster(

Review Comment:
   I think this was a typo. I should be `delete_cluster` from async hook like in `gather_diagnostics_and_maybe_delete`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajastro (via GitHub)" <gi...@apache.org>.
pankajastro commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1579015319


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)

Review Comment:
   can we log cluster state too?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is deleted.
+                    self.get_sync_hook().delete_cluster(

Review Comment:
   I think we should make an async call here. Perhaps we could periodically poll the status to verify its existence. https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-dataproc/google/cloud/dataproc_v1/services/cluster_controller/async_client.py#L1046 



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is deleted.
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+                raise AirflowException("Error during cancellation handling: %s", e)
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )

Review Comment:
   do we need this additional method?
   I think we should directly use in run methid
   ```python
   await self.get_async_hook().get_cluster(
               project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
           )
   ``` 



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is deleted.
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+                raise AirflowException("Error during cancellation handling: %s", e)
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    async def delete_when_error_occurred(self, cluster: Cluster):
+        """
+        Delete the cluster on error.
+
+        :param cluster: The cluster to delete.
+        """
+        if self.delete_on_error:
+            self.log.info("Deleting cluster %s.", self.cluster_name)
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            self.log.info("Cluster %s has been deleted.", self.cluster_name)
+        else:
+            self.log.info(
+                "Cluster %s is not be deleted as delete_on_error is set to False.", self.cluster_name

Review Comment:
   Considering that we're invoking this function only once, and that too when we're already aware of an error occurring, I believe it's more efficient to directly execute the hook delete method. Therefore, I think the method shouldn't be necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577302020


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+                    self.log.info("Cluster deletion initiated.")
+                    async for event in self.wait_until_cluster_deleted():
+                        if event["status"] == "success":
+                            self.log.info("Cluster deletion confirmed.")
+                        elif event["status"] == "error":
+                            self.log.error("Cluster deletion failed with message: %s", event["message"])

Review Comment:
   removed this method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577939404


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.ERROR,

Review Comment:
   I think a better idea would just be to return the state as it is as it is being fetched from the cluster. And here if you look at code above its `state = cluster.status.state` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577833328


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +150,72 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.DELETING,

Review Comment:
   Can we re-check here that `state.DELETING` is the right usage because `state=ClusterStatus.State.ERROR` after entering the if block here?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +150,72 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The sync hook is used to delete the cluster in case of cancellation of task.

Review Comment:
   Let's also elaborate _why_ we're using the sync hook instead of async hook and also mention that this sync hook's call for delete cluster is not a blocking call meaning that it does not wait until the cluster is deleted.
   
   Would be nice to explain this in detail so that someone looking at this code later would understand _why_ we did not use the async hook's method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575661892


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:

Review Comment:
   It checks two cases - if it is running or it is in error state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577256381


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -43,20 +44,28 @@ def __init__(
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
         polling_interval_seconds: int = 30,
+        delete_on_error: bool = True,
     ):
         super().__init__()
         self.region = region
         self.project_id = project_id
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
         self.polling_interval_seconds = polling_interval_seconds
+        self.delete_on_error = delete_on_error
 
     def get_async_hook(self):
         return DataprocAsyncHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
         )
 
+    def get_sync_hook(self):

Review Comment:
   This is to achieve the deletion of the cluster when the CancelledError is raised. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577761858


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:

Review Comment:
   Removed this method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1578038004


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.ERROR,

Review Comment:
   ClusterStatus.State.DELETING is better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577878165


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""

Review Comment:
   ```suggestion
   ```



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.ERROR,

Review Comment:
   since `state = ClusterStatus.State.ERROR` after entering the if block, I don't think `state.ERROR` which would translate to `ClusterStatus.State.ERROR.ERROR` or `ClusterStatus.State.ERROR.DELETING` is the right usage. Can you check the type of `state` variable and if `state.DELETING` or `state.ERROR` is available on that type? 
   
   I think it could be, but suggest to check the type and ensure that we've the right usage and right trigger event value yielded in the worker.
   ```suggestion
                               "cluster_state": ClusterStatus.State.DELETING,
   ```



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""

Review Comment:
   I don't think that docstring adds much value unless we need it for some pre-commit. WDYT?



##########
tests/providers/google/cloud/triggers/test_dataproc.py:
##########
@@ -215,9 +228,48 @@ async def test_cluster_run_loop_is_still_running(
         await asyncio.sleep(0.5)
 
         assert not task.done()
-        assert f"Current state is: {ClusterStatus.State.CREATING}"
+        assert f"Current state is: {ClusterStatus.State.CREATING}."
         assert f"Sleeping for {TEST_POLL_INTERVAL} seconds."
 
+    @pytest.mark.asyncio
+    @mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.get_cluster")
+    async def test_fetch_cluster_status(self, mock_get_cluster, cluster_trigger, async_get_cluster):
+        mock_get_cluster.return_value = async_get_cluster(
+            status=ClusterStatus(state=ClusterStatus.State.RUNNING)
+        )
+        cluster = await cluster_trigger.fetch_cluster()
+
+        assert cluster.status.state == ClusterStatus.State.RUNNING, "The cluster state should be RUNNING"

Review Comment:
   What does  "The cluster state should be RUNNING" mean here in the assert? 



##########
tests/providers/google/cloud/triggers/test_dataproc.py:
##########
@@ -215,9 +228,48 @@ async def test_cluster_run_loop_is_still_running(
         await asyncio.sleep(0.5)
 
         assert not task.done()
-        assert f"Current state is: {ClusterStatus.State.CREATING}"
+        assert f"Current state is: {ClusterStatus.State.CREATING}."
         assert f"Sleeping for {TEST_POLL_INTERVAL} seconds."
 
+    @pytest.mark.asyncio
+    @mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.get_cluster")
+    async def test_fetch_cluster_status(self, mock_get_cluster, cluster_trigger, async_get_cluster):
+        mock_get_cluster.return_value = async_get_cluster(
+            status=ClusterStatus(state=ClusterStatus.State.RUNNING)
+        )
+        cluster = await cluster_trigger.fetch_cluster()
+
+        assert cluster.status.state == ClusterStatus.State.RUNNING, "The cluster state should be RUNNING"
+
+    @pytest.mark.asyncio
+    @mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.delete_cluster")
+    async def test_delete_when_error_occurred(self, mock_delete_cluster, cluster_trigger):
+        mock_cluster = mock.MagicMock(spec=Cluster)
+        type(mock_cluster).status = mock.PropertyMock(
+            return_value=mock.MagicMock(state=ClusterStatus.State.ERROR)
+        )
+
+        mock_delete_future = asyncio.Future()
+        mock_delete_future.set_result(None)
+        mock_delete_cluster.return_value = mock_delete_future
+
+        cluster_trigger.delete_on_error = True
+
+        await cluster_trigger.delete_when_error_occurred(mock_cluster)
+
+        mock_delete_cluster.assert_called_once_with(
+            region=cluster_trigger.region,
+            cluster_name=cluster_trigger.cluster_name,
+            project_id=cluster_trigger.project_id,
+        )
+
+        mock_delete_cluster.reset_mock()
+        cluster_trigger.delete_on_error = False
+
+        await cluster_trigger.delete_when_error_occurred(mock_cluster)
+
+        mock_delete_cluster.assert_not_called()
+

Review Comment:
   Can we also add a test case for the CancelledError like mentioned earlier?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1576203500


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:

Review Comment:
   Would like to understand when exactly is the CancelledError raised.



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(

Review Comment:
   Does using the sync hook method wait until the cluster is deleted? If so, it would block the triggerer thread. Would it be possible to have an async hook and method here?
   
   If it's not possible to delete async, we could send the action to worker in execute_complete and delete synchronously from the worker. 



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -43,20 +44,28 @@ def __init__(
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
         polling_interval_seconds: int = 30,
+        delete_on_error: bool = True,
     ):
         super().__init__()
         self.region = region
         self.project_id = project_id
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
         self.polling_interval_seconds = polling_interval_seconds
+        self.delete_on_error = delete_on_error
 
     def get_async_hook(self):
         return DataprocAsyncHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
         )
 
+    def get_sync_hook(self):

Review Comment:
   We should avoid using sync hook methods in the Triggerer as it would block the triggerer thread.



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+                    self.log.info("Cluster deletion initiated.")
+                    async for event in self.wait_until_cluster_deleted():
+                        if event["status"] == "success":
+                            self.log.info("Cluster deletion confirmed.")
+                        elif event["status"] == "error":
+                            self.log.error("Cluster deletion failed with message: %s", event["message"])

Review Comment:
   why do we have `async for` here? `self.wait_until_cluster_deleted` does not seem to return an async iterable



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+                    self.log.info("Cluster deletion initiated.")

Review Comment:
   These logs lines are confusing. We say cluster is deleted and then say cluster deletion initiated.



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:

Review Comment:
   Could this block be combined as a common block under a cleanup method like Wei suggested and use it in the gather_diagnostics_and_delete_on_error method too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577939404


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.ERROR,

Review Comment:
   I think a better idea would just be to return the state as it is as it is being fetched from the cluster



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1578042750


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +150,72 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.DELETING,

Review Comment:
   I have changed it to `ClusterStatus.State.DELETING`



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +150,72 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.DELETING,

Review Comment:
   I have changed it to `ClusterStatus.State.DELETING` and this is correct state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "Lee-W (via GitHub)" <gi...@apache.org>.
Lee-W commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1579130087


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is deleted.
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+                raise AirflowException("Error during cancellation handling: %s", e)
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    async def delete_when_error_occurred(self, cluster: Cluster):

Review Comment:
   ```suggestion
       async def delete_when_error_occurred(self, cluster: Cluster) -> None:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1579277263


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is deleted.
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+                raise AirflowException("Error during cancellation handling: %s", e)
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    async def delete_when_error_occurred(self, cluster: Cluster):
+        """
+        Delete the cluster on error.
+
+        :param cluster: The cluster to delete.
+        """
+        if self.delete_on_error:
+            self.log.info("Deleting cluster %s.", self.cluster_name)
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            self.log.info("Cluster %s has been deleted.", self.cluster_name)
+        else:
+            self.log.info(
+                "Cluster %s is not be deleted as delete_on_error is set to False.", self.cluster_name

Review Comment:
   change it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1579263128


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575675716


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:

Review Comment:
   Removed this method completely



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575959819


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,

Review Comment:
   if the current state is ERROR then looking at the code in execute_complete for the operator, returning ERROR from here will raise an AirflowException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577301673


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:

Review Comment:
   The purpose of gather_diagnostics_and_delete_on_error is different than clean up. cleanup is called when task is cancelled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577943509


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.ERROR,

Review Comment:
   But that would be an inaccurate state no? Since we triggered delete action after that state? 
   Might be good to get the latest state or use  ClusterStatus.State.DELETING since we know that that would be cluster state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575662562


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:

Review Comment:
   I think the idea here is if the error is there - gather_diagnostics_and_maybe_delete() should be awaited else ```yield TriggerEvent(
                               {
                                   "cluster_name": self.cluster_name,
                                   "cluster_state": cluster.status.state,
                                   "cluster": cluster,
                               }
                           )``` should be yield.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577910767


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +150,72 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.DELETING,

Review Comment:
   No I meant it may not be `state.DELETING` or `state.ERROR` since `state=ClusterStatus.State.ERROR` after entering the if block. `state.ERROR` would mean `ClusterStatus.State.ERROR.ERROR`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575669669


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",
+                }
+            )
+        else:
+            return TriggerEvent(
+                {"cluster_name": self.cluster_name, "cluster_state": cluster.status.state, "cluster": cluster}
+            )
+
+    async def handle_cancellation(self) -> None:
+        """Handle the cancellation of the trigger, cleaning up resources if necessary."""
+        self.log.info("Cancellation requested. Deleting the cluster if created.")

Review Comment:
   That's right. Fixed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575953494


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",
+                }
+            )
+        else:
+            return TriggerEvent(
+                {"cluster_name": self.cluster_name, "cluster_state": cluster.status.state, "cluster": cluster}
+            )
+
+    async def handle_cancellation(self) -> None:

Review Comment:
   changed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575956404


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,

Review Comment:
   won't the current state be that the cluster is deleted? Whereas cluster.status.state would contain a value prior to it being deleted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575308741


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",
+                }
+            )
+        else:
+            return TriggerEvent(
+                {"cluster_name": self.cluster_name, "cluster_state": cluster.status.state, "cluster": cluster}
+            )
+
+    async def handle_cancellation(self) -> None:
+        """Handle the cancellation of the trigger, cleaning up resources if necessary."""

Review Comment:
   when all do we deem it necessary for cleaning up? does it depend on whether the resource was created and/or on self.delete_on_error?  can we elaborate this here?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",
+                }
+            )
+        else:
+            return TriggerEvent(
+                {"cluster_name": self.cluster_name, "cluster_state": cluster.status.state, "cluster": cluster}
+            )
+
+    async def handle_cancellation(self) -> None:
+        """Handle the cancellation of the trigger, cleaning up resources if necessary."""
+        self.log.info("Cancellation requested. Deleting the cluster if created.")
+        try:
+            if self.delete_on_error:
+                cluster = await self.fetch_cluster_status()
+                if cluster.status.state == ClusterStatus.State.ERROR:
+                    await self.get_async_hook().async_delete_cluster(

Review Comment:
   we call here `async_delete_cluster` on the hook, but in `gather_diagnostics_and_maybe_delete` we call a synchronous `delete_cluster`? 



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:

Review Comment:
   We already checked the cluster state in the above method `check_cluster_state` whether it belongs to ERROR. Can we avoid the duplicate if conditional check here for the ERROR state?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",
+                }
+            )
+        else:
+            return TriggerEvent(
+                {"cluster_name": self.cluster_name, "cluster_state": cluster.status.state, "cluster": cluster}
+            )
+
+    async def handle_cancellation(self) -> None:
+        """Handle the cancellation of the trigger, cleaning up resources if necessary."""
+        self.log.info("Cancellation requested. Deleting the cluster if created.")

Review Comment:
   looking at the code below if self.delete_on_error is not set, then the cluster won't be deleted, no? so the log could be misleading?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,

Review Comment:
   Won't this be a stale cluster state as we triggered a delete of the cluster above?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:

Review Comment:
   ```suggestion
       def is_terminal_cluster_state(self, state: ClusterStatus.State) -> bool:
   ```
   I think `is_terminal` state for the operator could better clarify what the method does than the `check` in the name which would necessitate reading the method implementation to understand what exactly it is _checking_. WDYT?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",
+                }
+            )
+        else:
+            return TriggerEvent(
+                {"cluster_name": self.cluster_name, "cluster_state": cluster.status.state, "cluster": cluster}
+            )
+
+    async def handle_cancellation(self) -> None:
+        """Handle the cancellation of the trigger, cleaning up resources if necessary."""
+        self.log.info("Cancellation requested. Deleting the cluster if created.")
+        try:
+            if self.delete_on_error:
+                cluster = await self.fetch_cluster_status()
+                if cluster.status.state == ClusterStatus.State.ERROR:
+                    await self.get_async_hook().async_delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster due to ERROR state during cancellation.")
+                else:
+                    self.log.info("Cancellation did not require cluster deletion.")
+        except Exception as e:
+            self.log.error("Error during cancellation handling: %s", e)

Review Comment:
   Do we need to reraise an exception here? As otherwise it would silently fail 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575666529


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,

Review Comment:
   Yes, that's right but the idea is to show the current state and cluster details to the user once the cluster is deleted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575672477


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",
+                }
+            )
+        else:
+            return TriggerEvent(
+                {"cluster_name": self.cluster_name, "cluster_state": cluster.status.state, "cluster": cluster}
+            )
+
+    async def handle_cancellation(self) -> None:
+        """Handle the cancellation of the trigger, cleaning up resources if necessary."""

Review Comment:
   changed the doc-string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577865132


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +150,72 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.DELETING,

Review Comment:
   Thats correct. It should be Error. Changed it



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +150,72 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The sync hook is used to delete the cluster in case of cancellation of task.

Review Comment:
   Changed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577256381


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -43,20 +44,28 @@ def __init__(
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
         polling_interval_seconds: int = 30,
+        delete_on_error: bool = True,
     ):
         super().__init__()
         self.region = region
         self.project_id = project_id
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
         self.polling_interval_seconds = polling_interval_seconds
+        self.delete_on_error = delete_on_error
 
     def get_async_hook(self):
         return DataprocAsyncHook(
             gcp_conn_id=self.gcp_conn_id,
             impersonation_chain=self.impersonation_chain,
         )
 
+    def get_sync_hook(self):

Review Comment:
   This is to achieve the deletion of the cluster when the CancelledError is raised as async deletion doesn't seem to await when task is cancelled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1579265244


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is deleted.
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+                raise AirflowException("Error during cancellation handling: %s", e)
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    async def delete_when_error_occurred(self, cluster: Cluster):
+        """
+        Delete the cluster on error.
+
+        :param cluster: The cluster to delete.
+        """
+        if self.delete_on_error:
+            self.log.info("Deleting cluster %s.", self.cluster_name)
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            self.log.info("Cluster %s has been deleted.", self.cluster_name)
+        else:
+            self.log.info(
+                "Cluster %s is not be deleted as delete_on_error is set to False.", self.cluster_name

Review Comment:
   This method was to improve the readability and reduce code complexity as the run function is huge here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575960260


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +148,116 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+                    self.log.info("Cluster deletion initiated, awaiting completion...")
+                    async for event in self.wait_until_cluster_deleted():
+                        if event["status"] == "success":
+                            self.log.info("Cluster deletion confirmed.")
+                        elif event["status"] == "error":
+                            self.log.error("Cluster deletion failed with message: %s", event["message"])
+                    self.log.info("Finished handling cluster deletion.")
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+
+    async def wait_until_cluster_deleted(self):
+        """Wait until the cluster is confirmed as deleted."""
+        end_time = time.time() + self.polling_interval_seconds * 10  # Set end time for loop
+        try:
+            while time.time() < end_time:
+                try:
+                    await self.get_async_hook().get_cluster(
+                        region=self.region,
+                        cluster_name=self.cluster_name,
+                        project_id=self.project_id,
+                    )
+                    self.log.info(
+                        "Cluster still exists. Sleeping for %s seconds.", self.polling_interval_seconds
+                    )
+                    await asyncio.sleep(self.polling_interval_seconds)
+                except NotFound:
+                    self.log.info("Cluster successfully deleted.")
+                    yield TriggerEvent({"status": "success", "message": "Cluster deleted successfully."})
+                    return
+        except Exception as e:
+            self.log.error("Error while checking for cluster deletion: %s", e)
+            yield TriggerEvent({"status": "error", "message": str(e)})
+        yield TriggerEvent(
+            {"status": "error", "message": "Timeout - cluster deletion not confirmed within expected time."}
+        )
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    async def gather_diagnostics_and_delete_on_error(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",

Review Comment:
   what is the significance of returning this action?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577301673


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:

Review Comment:
   The purpose of gather_diagnostics_and_delete_on_error is different than clean up. cleanup is just when task is cancelled



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1578308432


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is deleted.
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+                raise AirflowException("Error during cancellation handling: %s", e)
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    async def delete_when_error_occurred(self, cluster: Cluster):
+        """
+        Delete the cluster on error.
+
+        :param cluster: The cluster to delete.
+        """
+        if self.delete_on_error:
+            self.log.info("Deleting cluster %s.", self.cluster_name)
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            self.log.info("Cluster %s has been deleted.", self.cluster_name)
+        else:
+            self.log.info(
+                "Cluster %s is not be deleted as delete_on_error is set to False.", self.cluster_name

Review Comment:
   ```suggestion
                   "Cluster %s is not deleted as delete_on_error is set to False.", self.cluster_name
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575662562


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:

Review Comment:
   I have merged two the checks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575731814


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):

Review Comment:
   +1



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",
+                }
+            )
+        else:
+            return TriggerEvent(
+                {"cluster_name": self.cluster_name, "cluster_state": cluster.status.state, "cluster": cluster}
+            )
+
+    async def handle_cancellation(self) -> None:

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1579226470


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is deleted.
+                    self.get_sync_hook().delete_cluster(

Review Comment:
   As mentioned in the comment the async deletion here doesn't await when task is cancelled by the user so this would not work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575675452


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",
+                }
+            )
+        else:
+            return TriggerEvent(
+                {"cluster_name": self.cluster_name, "cluster_state": cluster.status.state, "cluster": cluster}
+            )
+
+    async def handle_cancellation(self) -> None:
+        """Handle the cancellation of the trigger, cleaning up resources if necessary."""
+        self.log.info("Cancellation requested. Deleting the cluster if created.")
+        try:
+            if self.delete_on_error:
+                cluster = await self.fetch_cluster_status()
+                if cluster.status.state == ClusterStatus.State.ERROR:
+                    await self.get_async_hook().async_delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster due to ERROR state during cancellation.")
+                else:
+                    self.log.info("Cancellation did not require cluster deletion.")
+        except Exception as e:
+            self.log.error("Error during cancellation handling: %s", e)

Review Comment:
   I have added an exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1590582325


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:

Review Comment:
   Looks like there are other cases too when CancelledError is raised e.g. when the trigger restarts as mentioned in https://github.com/apache/airflow/issues/36090#issuecomment-2094972855
   
   We might need some additional measures here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575954174


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:

Review Comment:
   renamed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575954433


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )

Review Comment:
   not relevant



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577255499


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:

Review Comment:
   This is raised when the user marks the task as failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1578035001


##########
tests/providers/google/cloud/triggers/test_dataproc.py:
##########
@@ -215,9 +228,48 @@ async def test_cluster_run_loop_is_still_running(
         await asyncio.sleep(0.5)
 
         assert not task.done()
-        assert f"Current state is: {ClusterStatus.State.CREATING}"
+        assert f"Current state is: {ClusterStatus.State.CREATING}."
         assert f"Sleeping for {TEST_POLL_INTERVAL} seconds."
 
+    @pytest.mark.asyncio
+    @mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.get_cluster")
+    async def test_fetch_cluster_status(self, mock_get_cluster, cluster_trigger, async_get_cluster):
+        mock_get_cluster.return_value = async_get_cluster(
+            status=ClusterStatus(state=ClusterStatus.State.RUNNING)
+        )
+        cluster = await cluster_trigger.fetch_cluster()
+
+        assert cluster.status.state == ClusterStatus.State.RUNNING, "The cluster state should be RUNNING"
+
+    @pytest.mark.asyncio
+    @mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.delete_cluster")
+    async def test_delete_when_error_occurred(self, mock_delete_cluster, cluster_trigger):
+        mock_cluster = mock.MagicMock(spec=Cluster)
+        type(mock_cluster).status = mock.PropertyMock(
+            return_value=mock.MagicMock(state=ClusterStatus.State.ERROR)
+        )
+
+        mock_delete_future = asyncio.Future()
+        mock_delete_future.set_result(None)
+        mock_delete_cluster.return_value = mock_delete_future
+
+        cluster_trigger.delete_on_error = True
+
+        await cluster_trigger.delete_when_error_occurred(mock_cluster)
+
+        mock_delete_cluster.assert_called_once_with(
+            region=cluster_trigger.region,
+            cluster_name=cluster_trigger.cluster_name,
+            project_id=cluster_trigger.project_id,
+        )
+
+        mock_delete_cluster.reset_mock()
+        cluster_trigger.delete_on_error = False
+
+        await cluster_trigger.delete_when_error_occurred(mock_cluster)
+
+        mock_delete_cluster.assert_not_called()
+

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577275902


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+                    self.log.info("Cluster deletion initiated.")

Review Comment:
   Removed it. I missed it while i was debugging



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577928454


##########
tests/providers/google/cloud/triggers/test_dataproc.py:
##########
@@ -215,9 +228,48 @@ async def test_cluster_run_loop_is_still_running(
         await asyncio.sleep(0.5)
 
         assert not task.done()
-        assert f"Current state is: {ClusterStatus.State.CREATING}"
+        assert f"Current state is: {ClusterStatus.State.CREATING}."
         assert f"Sleeping for {TEST_POLL_INTERVAL} seconds."
 
+    @pytest.mark.asyncio
+    @mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.get_cluster")
+    async def test_fetch_cluster_status(self, mock_get_cluster, cluster_trigger, async_get_cluster):
+        mock_get_cluster.return_value = async_get_cluster(
+            status=ClusterStatus(state=ClusterStatus.State.RUNNING)
+        )
+        cluster = await cluster_trigger.fetch_cluster()
+
+        assert cluster.status.state == ClusterStatus.State.RUNNING, "The cluster state should be RUNNING"

Review Comment:
   It throws this error when this test fails. It prompts the error message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1576082062


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +148,116 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+                    self.log.info("Cluster deletion initiated, awaiting completion...")
+                    async for event in self.wait_until_cluster_deleted():
+                        if event["status"] == "success":
+                            self.log.info("Cluster deletion confirmed.")
+                        elif event["status"] == "error":
+                            self.log.error("Cluster deletion failed with message: %s", event["message"])
+                    self.log.info("Finished handling cluster deletion.")
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+
+    async def wait_until_cluster_deleted(self):
+        """Wait until the cluster is confirmed as deleted."""
+        end_time = time.time() + self.polling_interval_seconds * 10  # Set end time for loop
+        try:
+            while time.time() < end_time:
+                try:
+                    await self.get_async_hook().get_cluster(
+                        region=self.region,
+                        cluster_name=self.cluster_name,
+                        project_id=self.project_id,
+                    )
+                    self.log.info(
+                        "Cluster still exists. Sleeping for %s seconds.", self.polling_interval_seconds
+                    )
+                    await asyncio.sleep(self.polling_interval_seconds)
+                except NotFound:
+                    self.log.info("Cluster successfully deleted.")
+                    yield TriggerEvent({"status": "success", "message": "Cluster deleted successfully."})
+                    return
+        except Exception as e:
+            self.log.error("Error while checking for cluster deletion: %s", e)
+            yield TriggerEvent({"status": "error", "message": str(e)})
+        yield TriggerEvent(
+            {"status": "error", "message": "Timeout - cluster deletion not confirmed within expected time."}
+        )
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    async def gather_diagnostics_and_delete_on_error(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",

Review Comment:
   Removed it and i have added logging instead now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1576081383


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,

Review Comment:
   I have logged the action here now. Removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577274329


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(

Review Comment:
   I checked when the task is cancelled manually the execute_complete is not called at all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1579264417


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is deleted.
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+                raise AirflowException("Error during cancellation handling: %s", e)
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )

Review Comment:
   This method was to improve readability of the code as the run function is huge here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577943509


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state.ERROR,

Review Comment:
   But that would be an inaccurate state no? Since we triggered delete action after that state? 
   Might be good to get the latest state or use  ClusterStatus.State.DELETING



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577936986


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,75 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""

Review Comment:
   Removed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "Lee-W (via GitHub)" <gi...@apache.org>.
Lee-W commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575713841


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )

Review Comment:
   ```suggestion
                           )
                           return
   ```



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:

Review Comment:
   Looks like we're fetching cluster instead of cluster_status. Or is there anything I missed?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:

Review Comment:
   Looks like we can make it a staticmethod



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):

Review Comment:
   ```suggestion
       async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster) -> TriggerEvent:
   ```
   
   Is there a way to rename the `maybe`? maybe `delete_if_...`?



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):
+        """
+        Gather diagnostics and maybe delete the cluster.
+
+        :param cluster: The cluster to gather diagnostics for.
+        """
+        self.log.info("Cluster is in ERROR state. Gathering diagnostic information.")
+        try:
+            operation = await self.get_async_hook().diagnose_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
             )
-            state = cluster.status.state
-            self.log.info("Dataproc cluster: %s is in state: %s", self.cluster_name, state)
-            if state in (
-                ClusterStatus.State.ERROR,
-                ClusterStatus.State.RUNNING,
-            ):
-                break
-            self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
-            await asyncio.sleep(self.polling_interval_seconds)
-        yield TriggerEvent({"cluster_name": self.cluster_name, "cluster_state": state, "cluster": cluster})
+            result = await operation.result()
+            gcs_uri = str(result.response.value)
+            self.log.info(
+                "Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri
+            )
+        except Exception as e:
+            self.log.error("Failed to diagnose cluster: %s", e)
+
+        if self.delete_on_error:
+            await self.get_async_hook().delete_cluster(
+                region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+            )
+            return TriggerEvent(
+                {
+                    "cluster_name": self.cluster_name,
+                    "cluster_state": cluster.status.state,
+                    "cluster": None,
+                    "action": "deleted",
+                }
+            )
+        else:
+            return TriggerEvent(
+                {"cluster_name": self.cluster_name, "cluster_state": cluster.status.state, "cluster": cluster}
+            )
+
+    async def handle_cancellation(self) -> None:

Review Comment:
   `handle_cancellation` seems to be a board idea and it's not easy to understand what's handled. I'm thinking of something like the following in the run method. WDYT?
   
   ```python
           except asyncio.CancelledError:
               
               try:
                   if self.delete_on_error:
                       await cleanup_cluster()
               except Exception:
                   ......
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "pankajkoti (via GitHub)" <gi...@apache.org>.
pankajkoti commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575733443


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:

Review Comment:
   yes makes sense.  I think this method is redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575662562


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:

Review Comment:
   I think the idea here is if the error is there - gather_diagnostics_and_maybe_delete() should be awaited else ```yield TriggerEvent(
                               {
                                   "cluster_name": self.cluster_name,
                                   "cluster_state": cluster.status.state,
                                   "cluster": cluster,
                               }
                           )``` should be yield for running state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1575953705


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:
+        """
+        Check if the state is error or running.
+
+        :param state: The state of the cluster.
+        """
+        return state in (ClusterStatus.State.ERROR, ClusterStatus.State.RUNNING)
+
+    async def gather_diagnostics_and_maybe_delete(self, cluster: Cluster):

Review Comment:
   changed it



##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +142,97 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster_status()
+                if self.check_cluster_state(cluster.status.state):
+                    if cluster.status.state == ClusterStatus.State.ERROR:
+                        await self.gather_diagnostics_and_maybe_delete(cluster)
+                    else:
+                        yield TriggerEvent(
+                            {
+                                "cluster_name": self.cluster_name,
+                                "cluster_state": cluster.status.state,
+                                "cluster": cluster,
+                            }
+                        )
+                    break
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            await self.handle_cancellation()
+
+    async def fetch_cluster_status(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    def check_cluster_state(self, state: ClusterStatus.State) -> bool:

Review Comment:
   removed it completely



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1577254990


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +149,110 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        """Run the trigger."""
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.gather_diagnostics_and_delete_on_error(cluster)
+                    break
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    break
+
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    self.get_sync_hook().delete_cluster(

Review Comment:
   No, it doesn't wait until the cluster is deleted. It just triggers cluster deletion. The async method here doesn't actually delete the cluster



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "phanikumv (via GitHub)" <gi...@apache.org>.
phanikumv merged PR #39130:
URL: https://github.com/apache/airflow/pull/39130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve DataprocCreateClusterOperator in Triggers for Enhanced Error Handling and Resource Cleanup [airflow]

Posted by "sunank200 (via GitHub)" <gi...@apache.org>.
sunank200 commented on code in PR #39130:
URL: https://github.com/apache/airflow/pull/39130#discussion_r1579266460


##########
airflow/providers/google/cloud/triggers/dataproc.py:
##########
@@ -140,24 +153,74 @@ def serialize(self) -> tuple[str, dict[str, Any]]:
                 "gcp_conn_id": self.gcp_conn_id,
                 "impersonation_chain": self.impersonation_chain,
                 "polling_interval_seconds": self.polling_interval_seconds,
+                "delete_on_error": self.delete_on_error,
             },
         )
 
     async def run(self) -> AsyncIterator[TriggerEvent]:
-        while True:
-            cluster = await self.get_async_hook().get_cluster(
-                project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        try:
+            while True:
+                cluster = await self.fetch_cluster()
+                state = cluster.status.state
+                if state == ClusterStatus.State.ERROR:
+                    await self.delete_when_error_occurred(cluster)
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": ClusterStatus.State.DELETING,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                elif state == ClusterStatus.State.RUNNING:
+                    yield TriggerEvent(
+                        {
+                            "cluster_name": self.cluster_name,
+                            "cluster_state": state,
+                            "cluster": cluster,
+                        }
+                    )
+                    return
+                self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds)
+                await asyncio.sleep(self.polling_interval_seconds)
+        except asyncio.CancelledError:
+            try:
+                if self.delete_on_error:
+                    self.log.info("Deleting cluster %s.", self.cluster_name)
+                    # The synchronous hook is utilized to delete the cluster when a task is cancelled.
+                    # This is because the asynchronous hook deletion is not awaited when the trigger task
+                    # is cancelled. The call for deleting the cluster through the sync hook is not a blocking
+                    # call, which means it does not wait until the cluster is deleted.
+                    self.get_sync_hook().delete_cluster(
+                        region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
+                    )
+                    self.log.info("Deleted cluster %s during cancellation.", self.cluster_name)
+            except Exception as e:
+                self.log.error("Error during cancellation handling: %s", e)
+                raise AirflowException("Error during cancellation handling: %s", e)
+
+    async def fetch_cluster(self) -> Cluster:
+        """Fetch the cluster status."""
+        return await self.get_async_hook().get_cluster(
+            project_id=self.project_id, region=self.region, cluster_name=self.cluster_name
+        )
+
+    async def delete_when_error_occurred(self, cluster: Cluster):

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org