You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/03 09:02:13 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

potiuk commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r464284946



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,90 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        raise AirflowException(
+            f"Cluster {self.cluster_name} deleted due to ERROR"
+        )
+
+    def _get_cluster(self, hook):
+        return hook.get_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+    def _handle_error_state(self, hook):
+        self.log.info("Cluster is in ERROR state")
+        gcs_uri = hook.diagnose_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info(
+            'Diagnostic information for cluster %s available at: %s',
+            self.cluster_name, gcs_uri
+        )
+        if self.delete_on_error:
+            self._delete_cluster(hook)
+
+    def _wait_for_cluster_in_deleting_state(self, hook):
+        time_left = 60 * 5
+        for time_to_sleep in exponential_sleep_generator(initial=10, maximum=120):
+            if time_left < 0:
+                raise AirflowException(
+                    f"Cluster {self.cluster_name} is still DELETING state, aborting"
+                )
+            time.sleep(time_to_sleep)
+            time_left = time_left - time_to_sleep
+            try:
+                self._get_cluster(hook)
+            except NotFound:
+                break
 
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
         try:
-            operation = hook.create_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster=self.cluster,
-                request_id=self.request_id,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            cluster = operation.result()
-            self.log.info("Cluster created.")
+            cluster = self._create_cluster(hook)
         except AlreadyExists:
-            cluster = hook.get_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster_name=self.cluster_name,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
             self.log.info("Cluster already exists.")
+            cluster = self._get_cluster(hook)

Review comment:
       Yeah. See my "terraform" comment above. I think we are pretty good with manual deletion of the cluster in case we want to change configuration, I don't think we should handle all potential complexity of computing difference between expected/actual cluster configuration.

##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -437,6 +437,9 @@ class DataprocCreateClusterOperator(BaseOperator):
     :type project_id: str
     :param region: leave as 'global', might become relevant in the future. (templated)
     :type region: str
+    :parm delete_on_error: If true the claster will be deleted if created with ERROR state. Default
+        value is true.
+    :type delete_on_error: bool

Review comment:
       I think this is not really something that should be handled by the operator itself. I'd argue that if you really want to change configuration of cluster you can simply delete it manually and let it be re-created. I think working in a "terraformy" or "kubectly" "apply" fashion in this case should be left to terraform. I.e. if you really want to use this kind of approach, why not write a terraform script and run terraform.
   
   BTW. Offtop - but should not we think about adding a Terraform/Terragrunt operator to Airflow ? I'd say it might be a good idea to have such an operator with some pre-defined ways on how to get terraform/terragrunt scripts in and how to integrate with airflow's JINJA templating. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org