You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/08/05 08:27:14 UTC

[GitHub] [airflow] turbaszek opened a new pull request #9593: Improve handling Dataproc cluster creation with ERROR state

turbaszek opened a new pull request #9593:
URL: https://github.com/apache/airflow/pull/9593


   This PR brings back some features added in https://github.com/apache/airflow/pull/4064 that were lost during refactor.
   Now if the cluster is created but its state is ERROR then we run a diagnosis on it and we delete it.
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dossett commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
dossett commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-652583992


   @turbaszek Thanks!  One other small behavior that I added in AIRFLOW-3149 was that if the cluster already existed in the DELETING state, the operator would wait for the DELETE to finish and then create a new cluster.  (A rare condition, but one we hit in production more than once.)
   
   Thank you very much!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-653744181


   @dossett @olchas I've added your suggestions


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-651847946


   @dossett happy to hear your opinion!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-669063279


   @jaketf - just the fact that Terraform operator is tricky, makes me even more convinced that we should add it :). All the words about "careful design" and how to bring the scripts etc. show that it's very far from trivial to use Terraform as a step in Airflow - but IMHO conceptually it makes perfect sense :).  I can immediately start thinking about - for example - bringing the terraform scripts from Git repo by the operator.. But you are completely right - it is a big, separate discussion that seems like devlist might be the right place for :). I do not want to open another stream of discussion now, but once we get Airflow 2.0 planning under full control and scheduled, I will for sure open one :).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r449416095



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,79 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info("Cluster %s deleted", self.cluster_name)

Review comment:
       Definitely agree! I've added exception 👌 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
jaketf commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-669405211


   @turbaszek yes this PR is good to go!
   @potiuk indeed it would be good to make all the right decisions once :) (never seen so many smileys in a GH comment) 
   cc: @brandonjbjelland who's "seen it all" from terraform perspective (might help us in designing the right integration)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-667904915


   > BTW. Offtop - but should not we think about adding a Terraform/Terragrunt operator to Airflow ? I'd say it might be a good idea to have such an operator with some pre-defined ways on how to get terraform/terragrunt scripts in and how to integrate with airflow's JINJA templating.
   
   I think that's a good idea. Terraform on its own will be able to handle updates and stuff like that. However, I'm not sure if terraform allow users to use all config options 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek merged pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek merged pull request #9593:
URL: https://github.com/apache/airflow/pull/9593


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] olchas commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
olchas commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r449062072



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,79 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info("Cluster %s deleted", self.cluster_name)

Review comment:
       I wonder if it would not be a good idea to raise an exception here. It just seems weird to me that an operator that is supposed to create a cluster ends up deleting one instead and even returns a reference to no-longer-existing cluster. Raising an exception would also allow a second attempt to create the cluster on retry (if retries apply, of course).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r452554626



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,90 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        raise AirflowException(
+            f"Cluster {self.cluster_name} deleted due to ERROR"
+        )
+
+    def _get_cluster(self, hook):
+        return hook.get_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+    def _handle_error_state(self, hook):
+        self.log.info("Cluster is in ERROR state")
+        gcs_uri = hook.diagnose_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info(
+            'Diagnostic information for cluster %s available at: %s',
+            self.cluster_name, gcs_uri
+        )
+        if self.delete_on_error:
+            self._delete_cluster(hook)
+
+    def _wait_for_cluster_in_deleting_state(self, hook):
+        time_left = 60 * 5
+        for time_to_sleep in exponential_sleep_generator(initial=10, maximum=120):
+            if time_left < 0:
+                raise AirflowException(
+                    f"Cluster {self.cluster_name} is still DELETING state, aborting"
+                )
+            time.sleep(time_to_sleep)
+            time_left = time_left - time_to_sleep
+            try:
+                self._get_cluster(hook)
+            except NotFound:
+                break
 
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
         try:
-            operation = hook.create_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster=self.cluster,
-                request_id=self.request_id,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            cluster = operation.result()
-            self.log.info("Cluster created.")
+            cluster = self._create_cluster(hook)
         except AlreadyExists:
-            cluster = hook.get_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster_name=self.cluster_name,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
             self.log.info("Cluster already exists.")
+            cluster = self._get_cluster(hook)

Review comment:
       If cluster already exists (I assume this is checked by cluster id / name) then you should assert that it matches the any configuration explicitly specified in this operator by the user (e.g. a cluster could exist with this ID but have different dataproc version / missing init actions / etc.) you would not want to consider this a successful run of this operator as it did not meet it's contract of creating a cluster with explicit XYZ config provided by the user.
   
   IMO this should result in a task failure as it is not clear what the operator should do in this scenario? delete the existing cluster? add a uuid suffix to avoid the name clash and create a new cluster?

##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -437,6 +437,9 @@ class DataprocCreateClusterOperator(BaseOperator):
     :type project_id: str
     :param region: leave as 'global', might become relevant in the future. (templated)
     :type region: str
+    :parm delete_on_error: If true the claster will be deleted if created with ERROR state. Default

Review comment:
       spelling
   ```suggestion
       :parm delete_on_error: If true the cluster will be deleted if created with ERROR state. Default
   ```

##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,90 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        raise AirflowException(
+            f"Cluster {self.cluster_name} deleted due to ERROR"
+        )
+
+    def _get_cluster(self, hook):
+        return hook.get_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+    def _handle_error_state(self, hook):
+        self.log.info("Cluster is in ERROR state")
+        gcs_uri = hook.diagnose_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info(
+            'Diagnostic information for cluster %s available at: %s',
+            self.cluster_name, gcs_uri
+        )
+        if self.delete_on_error:
+            self._delete_cluster(hook)
+
+    def _wait_for_cluster_in_deleting_state(self, hook):
+        time_left = 60 * 5
+        for time_to_sleep in exponential_sleep_generator(initial=10, maximum=120):
+            if time_left < 0:
+                raise AirflowException(
+                    f"Cluster {self.cluster_name} is still DELETING state, aborting"
+                )
+            time.sleep(time_to_sleep)
+            time_left = time_left - time_to_sleep
+            try:
+                self._get_cluster(hook)
+            except NotFound:
+                break
 
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
         try:
-            operation = hook.create_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster=self.cluster,
-                request_id=self.request_id,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            cluster = operation.result()
-            self.log.info("Cluster created.")
+            cluster = self._create_cluster(hook)
         except AlreadyExists:
-            cluster = hook.get_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster_name=self.cluster_name,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
             self.log.info("Cluster already exists.")
+            cluster = self._get_cluster(hook)

Review comment:
       if cluster is in creating state should you block til it reaches running?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r452559941



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -437,6 +437,9 @@ class DataprocCreateClusterOperator(BaseOperator):
     :type project_id: str
     :param region: leave as 'global', might become relevant in the future. (templated)
     :type region: str
+    :parm delete_on_error: If true the claster will be deleted if created with ERROR state. Default
+        value is true.
+    :type delete_on_error: bool

Review comment:
       should there be a similar configurable parameter like "delete_on_configuration_mismatch" that defaults to False?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
jaketf commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-668844747


   > BTW. Offtop - but should not we think about adding a Terraform/Terragrunt operator to Airflow ? I'd say it might be a good idea to have such an operator with some pre-defined ways on how to get terraform/terragrunt scripts in and how to integrate with airflow's JINJA templating.
   
   Having two levels of rendering (JINJA / hcl string interpolation) sounds like a great way to have hard to debug situations of "What level of this rendering is going wrong". And what really would by dynamic between task runs from an infra perspective?
   
   I think a terraform hook might be a nice feature but would take some careful design. I vaguely remember this being brought up on slack or dev list but can't seem to find it. I found myself wanting it to bring up / tear down a CI composer environment during sleeping hours as cost cutting measure.
   
   For OSS terraform / terragrunt this could be really tricky:
   1. It means the execution environment must have terraform / terragrunt binaries (plus download remote modules referenced or source for local modules). would we provide an airflow extra for this? would this be up to the user? The latter seems sad and not easy to do with Composer specifically (could be easier if you build your own airflow images to toss terraform in and run on kubernetes).
   2. you need an easy way need to sync terraform source from a repo (unless you imagine single resource type things that might be embedded in DAG code).
   I think it would mostly end up as a Kubernetes Pod Operator that happens to run terraform.
   
   However for terraform enterprise hook might be much simpler (as the the execution environment and source syncing  and state management become not airflow's problem).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r460761418



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,90 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        raise AirflowException(
+            f"Cluster {self.cluster_name} deleted due to ERROR"
+        )
+
+    def _get_cluster(self, hook):
+        return hook.get_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+    def _handle_error_state(self, hook):
+        self.log.info("Cluster is in ERROR state")
+        gcs_uri = hook.diagnose_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info(
+            'Diagnostic information for cluster %s available at: %s',
+            self.cluster_name, gcs_uri
+        )
+        if self.delete_on_error:
+            self._delete_cluster(hook)
+
+    def _wait_for_cluster_in_deleting_state(self, hook):
+        time_left = 60 * 5
+        for time_to_sleep in exponential_sleep_generator(initial=10, maximum=120):
+            if time_left < 0:
+                raise AirflowException(
+                    f"Cluster {self.cluster_name} is still DELETING state, aborting"
+                )
+            time.sleep(time_to_sleep)
+            time_left = time_left - time_to_sleep
+            try:
+                self._get_cluster(hook)
+            except NotFound:
+                break
 
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
         try:
-            operation = hook.create_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster=self.cluster,
-                request_id=self.request_id,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            cluster = operation.result()
-            self.log.info("Cluster created.")
+            cluster = self._create_cluster(hook)
         except AlreadyExists:
-            cluster = hook.get_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster_name=self.cluster_name,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
             self.log.info("Cluster already exists.")
+            cluster = self._get_cluster(hook)

Review comment:
       @jaketf is there any bulletproof, simple way to compare cluster configuration? Comparing dicts doesn't sound like a way to go because created cluster includes more information than the user provided config




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r454193518



##########
File path: airflow/providers/google/cloud/hooks/dataproc.py
##########
@@ -376,15 +376,25 @@ def diagnose_cluster(
         :type metadata: Sequence[Tuple[str, str]]
         """
         client = self.get_cluster_client(location=region)
-        result = client.diagnose_cluster(
+        operation = client.diagnose_cluster(
             project_id=project_id,
             region=region,
             cluster_name=cluster_name,
             retry=retry,
             timeout=timeout,
             metadata=metadata,
         )
-        return result
+        done = False
+        while not done:
+            time.sleep(3)
+            # The dataproc python client has some problems
+            # TypeError: Could not convert Any to Empty
+            try:
+                done = operation.done()
+            except TypeError:
+                pass

Review comment:
       ETA this end of this week
   https://github.com/googleapis/python-dataproc/issues/51#issuecomment-657888007




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dossett commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
dossett commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r449064584



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,79 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info("Cluster %s deleted", self.cluster_name)

Review comment:
       If the cluster isn't deleted, then the retries will fail because the cluster already exists (even if it exists in an ERROR state and is not usable).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r462643349



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,90 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        raise AirflowException(
+            f"Cluster {self.cluster_name} deleted due to ERROR"
+        )
+
+    def _get_cluster(self, hook):
+        return hook.get_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+    def _handle_error_state(self, hook):
+        self.log.info("Cluster is in ERROR state")
+        gcs_uri = hook.diagnose_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info(
+            'Diagnostic information for cluster %s available at: %s',
+            self.cluster_name, gcs_uri
+        )
+        if self.delete_on_error:
+            self._delete_cluster(hook)
+
+    def _wait_for_cluster_in_deleting_state(self, hook):
+        time_left = 60 * 5
+        for time_to_sleep in exponential_sleep_generator(initial=10, maximum=120):
+            if time_left < 0:
+                raise AirflowException(
+                    f"Cluster {self.cluster_name} is still DELETING state, aborting"
+                )
+            time.sleep(time_to_sleep)
+            time_left = time_left - time_to_sleep
+            try:
+                self._get_cluster(hook)
+            except NotFound:
+                break
 
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
         try:
-            operation = hook.create_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster=self.cluster,
-                request_id=self.request_id,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            cluster = operation.result()
-            self.log.info("Cluster created.")
+            cluster = self._create_cluster(hook)
         except AlreadyExists:
-            cluster = hook.get_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster_name=self.cluster_name,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
             self.log.info("Cluster already exists.")
+            cluster = self._get_cluster(hook)

Review comment:
       I think you'd just compare the keys of the dict that were specified by the user.
   This might be easier said than done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] olchas commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
olchas commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r449056748



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,79 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info("Cluster %s deleted", self.cluster_name)
+
+    def _get_cluster(self, hook):
+        return hook.get_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+    def _handle_error_state(self, hook):
+        self.log.info("Cluster is in ERROR state")
+        gcs_uri = hook.diagnose_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info(
+            'Diagnostic information for cluster %s available at: %s',
+            self.cluster_name, gcs_uri
+        )
+        if self.delete_on_error:
+            self._delete_cluster(hook)
 
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
         try:
-            operation = hook.create_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster=self.cluster,
-                request_id=self.request_id,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            cluster = operation.result()
-            self.log.info("Cluster created.")
+            cluster = self._create_cluster(hook)
         except AlreadyExists:
-            cluster = hook.get_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster_name=self.cluster_name,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
             self.log.info("Cluster already exists.")
+            cluster = self._get_cluster(hook)
+
+        if cluster.status.state == cluster.status.ERROR:
+            self._handle_error_state(hook)
+        elif cluster.status.state == cluster.status.DELETING:
+            # Wait for cluster to delete
+            for time_to_sleep in exponential_sleep_generator(initial=10, maximum=120):

Review comment:
       I know that we are waiting here for an operation that is supposed to finish within a finite amount but what do you think about adding a maximum amount of total sleep before raising an exception?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dossett commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
dossett commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r449103732



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,79 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info("Cluster %s deleted", self.cluster_name)

Review comment:
       Ah, yes, thank you!  +1




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-667906041


   > > BTW. Offtop - but should not we think about adding a Terraform/Terragrunt operator to Airflow ? I'd say it might be a good idea to have such an operator with some pre-defined ways on how to get terraform/terragrunt scripts in and how to integrate with airflow's JINJA templating.
   > 
   > I think that's a good idea. Terraform on its own will be able to handle updates and stuff like that. However, I'm not sure if terraform allow users to use all config options
   
   Agree. We shoudl have both. It's great to have dedicated Operators where you have explicit configuration options etc.. But having a generic Terraform operator if you are familiar with Terraform, and use it elsewhere would be great addition.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-669407496


   > @turbaszek yes this PR is good to go!
   > @potiuk indeed it would be good to make all the right decisions once :) (never seen so many smileys in a GH comment)
   
   I was smiling all the time when I was writing it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r462194406



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -437,6 +437,9 @@ class DataprocCreateClusterOperator(BaseOperator):
     :type project_id: str
     :param region: leave as 'global', might become relevant in the future. (templated)
     :type region: str
+    :parm delete_on_error: If true the claster will be deleted if created with ERROR state. Default
+        value is true.
+    :type delete_on_error: bool

Review comment:
       I think this case will be difficult to handle in all cases correctly. You will not always need to create a new cluster when the configuration is different eg. additional components installed do not affect the usability of the cluster.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] olchas commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
olchas commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r449062517



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -437,6 +437,9 @@ class DataprocCreateClusterOperator(BaseOperator):
     :type project_id: str
     :param region: leave as 'global', might become relevant in the future. (templated)
     :type region: str
+    :parm delete_on_error: If true the claster will be deleted if created with ERROR state. Default

Review comment:
       ```suggestion
       :param delete_on_error: If true the cluster will be deleted if created with ERROR state. Default
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] olchas commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
olchas commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r449082822



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,79 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info("Cluster %s deleted", self.cluster_name)

Review comment:
       But I meant to raise an exception **after** the cluster is deleted. Then, as far as I understand, on retry we would follow the logic of cluster existing but being in 'DELETING' state, so we would get another chance to create it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r448189393



##########
File path: airflow/providers/google/cloud/hooks/dataproc.py
##########
@@ -376,15 +376,25 @@ def diagnose_cluster(
         :type metadata: Sequence[Tuple[str, str]]
         """
         client = self.get_cluster_client(location=region)
-        result = client.diagnose_cluster(
+        operation = client.diagnose_cluster(
             project_id=project_id,
             region=region,
             cluster_name=cluster_name,
             retry=retry,
             timeout=timeout,
             metadata=metadata,
         )
-        return result
+        done = False
+        while not done:
+            time.sleep(3)
+            # The dataproc python client has some problems
+            # TypeError: Could not convert Any to Empty
+            try:
+                done = operation.done()
+            except TypeError:
+                pass

Review comment:
       Due to error reported in https://github.com/googleapis/python-dataproc/issues/51




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-669046724


   > would we provide an airflow extra for this? would this be up to the user? 
   
   Correct me if I'm wrong but already docker, singularity, 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dossett commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
dossett commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r449067489



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,79 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info("Cluster %s deleted", self.cluster_name)

Review comment:
       The ERROR cluster will eventually be lifecycled by dataproc, but in my experience that often takes longer (20-30 minutes) than typical retry counts / retry waits will cover.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-669392106


   Sure - we are good - it was offtop :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dossett commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
dossett commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-665178224


   LGTM, thank you!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-669084592


   @potiuk @jaketf are we good with this PR? Terraform sounds like the way to go in my opinion (and is/was already used for system tests?) as it would provide a unified way of managing infrastructure from Airflow level. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dossett commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
dossett commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r450344785



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,79 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info("Cluster %s deleted", self.cluster_name)
+
+    def _get_cluster(self, hook):
+        return hook.get_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+    def _handle_error_state(self, hook):
+        self.log.info("Cluster is in ERROR state")
+        gcs_uri = hook.diagnose_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info(
+            'Diagnostic information for cluster %s available at: %s',
+            self.cluster_name, gcs_uri
+        )
+        if self.delete_on_error:
+            self._delete_cluster(hook)
 
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
         try:
-            operation = hook.create_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster=self.cluster,
-                request_id=self.request_id,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            cluster = operation.result()
-            self.log.info("Cluster created.")
+            cluster = self._create_cluster(hook)
         except AlreadyExists:
-            cluster = hook.get_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster_name=self.cluster_name,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
             self.log.info("Cluster already exists.")
+            cluster = self._get_cluster(hook)
+
+        if cluster.status.state == cluster.status.ERROR:
+            self._handle_error_state(hook)
+        elif cluster.status.state == cluster.status.DELETING:
+            # Wait for cluster to delete
+            for time_to_sleep in exponential_sleep_generator(initial=10, maximum=120):

Review comment:
       Honestly, as an Airflow user and Airflow admin I would rather there not be a timeout (or just use whatever global timeouts might exist).  I've seen DELETE ops take 45-60 minutes before and if they are taking that long, there's nothing to do except wait for it to complete.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r454198867



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,79 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info("Cluster %s deleted", self.cluster_name)
+
+    def _get_cluster(self, hook):
+        return hook.get_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+    def _handle_error_state(self, hook):
+        self.log.info("Cluster is in ERROR state")
+        gcs_uri = hook.diagnose_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info(
+            'Diagnostic information for cluster %s available at: %s',
+            self.cluster_name, gcs_uri
+        )
+        if self.delete_on_error:
+            self._delete_cluster(hook)
 
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
         try:
-            operation = hook.create_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster=self.cluster,
-                request_id=self.request_id,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            cluster = operation.result()
-            self.log.info("Cluster created.")
+            cluster = self._create_cluster(hook)
         except AlreadyExists:
-            cluster = hook.get_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster_name=self.cluster_name,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
             self.log.info("Cluster already exists.")
+            cluster = self._get_cluster(hook)
+
+        if cluster.status.state == cluster.status.ERROR:
+            self._handle_error_state(hook)
+        elif cluster.status.state == cluster.status.DELETING:
+            # Wait for cluster to delete
+            for time_to_sleep in exponential_sleep_generator(initial=10, maximum=120):

Review comment:
       Hm, not sure if this is the right way for "create operator". We even started a discussion about this on devlist:
   https://lists.apache.org/thread.html/r9a6833ebafa3f00f79f86d9688f77a958a73ab7b6d9eccd1f0998fe2%40%3Cdev.airflow.apache.org%3E




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r449415934



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,79 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info("Cluster %s deleted", self.cluster_name)
+
+    def _get_cluster(self, hook):
+        return hook.get_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+    def _handle_error_state(self, hook):
+        self.log.info("Cluster is in ERROR state")
+        gcs_uri = hook.diagnose_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info(
+            'Diagnostic information for cluster %s available at: %s',
+            self.cluster_name, gcs_uri
+        )
+        if self.delete_on_error:
+            self._delete_cluster(hook)
 
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
         try:
-            operation = hook.create_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster=self.cluster,
-                request_id=self.request_id,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            cluster = operation.result()
-            self.log.info("Cluster created.")
+            cluster = self._create_cluster(hook)
         except AlreadyExists:
-            cluster = hook.get_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster_name=self.cluster_name,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
             self.log.info("Cluster already exists.")
+            cluster = self._get_cluster(hook)
+
+        if cluster.status.state == cluster.status.ERROR:
+            self._handle_error_state(hook)
+        elif cluster.status.state == cluster.status.DELETING:
+            # Wait for cluster to delete
+            for time_to_sleep in exponential_sleep_generator(initial=10, maximum=120):

Review comment:
       I've added 5m timeout, @dossett do you think it should be ok in most cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-652980847


   @olchas would you mind taking a look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-664228344


   Related #10014


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek removed a comment on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek removed a comment on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-669046724


   > would we provide an airflow extra for this? would this be up to the user? 
   
   Correct me if I'm wrong but already docker, singularity, 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#issuecomment-652954399


   @dossett I've added handling of this case


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on a change in pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
potiuk commented on a change in pull request #9593:
URL: https://github.com/apache/airflow/pull/9593#discussion_r464284946



##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -502,32 +506,90 @@ def __init__(self,
         self.timeout = timeout
         self.metadata = metadata
         self.gcp_conn_id = gcp_conn_id
+        self.delete_on_error = delete_on_error
+
+    def _create_cluster(self, hook):
+        operation = hook.create_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster=self.cluster,
+            request_id=self.request_id,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+        cluster = operation.result()
+        self.log.info("Cluster created.")
+        return cluster
+
+    def _delete_cluster(self, hook):
+        self.log.info("Deleting the cluster")
+        hook.delete_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        raise AirflowException(
+            f"Cluster {self.cluster_name} deleted due to ERROR"
+        )
+
+    def _get_cluster(self, hook):
+        return hook.get_cluster(
+            project_id=self.project_id,
+            region=self.region,
+            cluster_name=self.cluster_name,
+            retry=self.retry,
+            timeout=self.timeout,
+            metadata=self.metadata,
+        )
+
+    def _handle_error_state(self, hook):
+        self.log.info("Cluster is in ERROR state")
+        gcs_uri = hook.diagnose_cluster(
+            region=self.region,
+            cluster_name=self.cluster_name,
+            project_id=self.project_id,
+        )
+        self.log.info(
+            'Diagnostic information for cluster %s available at: %s',
+            self.cluster_name, gcs_uri
+        )
+        if self.delete_on_error:
+            self._delete_cluster(hook)
+
+    def _wait_for_cluster_in_deleting_state(self, hook):
+        time_left = 60 * 5
+        for time_to_sleep in exponential_sleep_generator(initial=10, maximum=120):
+            if time_left < 0:
+                raise AirflowException(
+                    f"Cluster {self.cluster_name} is still DELETING state, aborting"
+                )
+            time.sleep(time_to_sleep)
+            time_left = time_left - time_to_sleep
+            try:
+                self._get_cluster(hook)
+            except NotFound:
+                break
 
     def execute(self, context):
         self.log.info('Creating cluster: %s', self.cluster_name)
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
         try:
-            operation = hook.create_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster=self.cluster,
-                request_id=self.request_id,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
-            cluster = operation.result()
-            self.log.info("Cluster created.")
+            cluster = self._create_cluster(hook)
         except AlreadyExists:
-            cluster = hook.get_cluster(
-                project_id=self.project_id,
-                region=self.region,
-                cluster_name=self.cluster_name,
-                retry=self.retry,
-                timeout=self.timeout,
-                metadata=self.metadata,
-            )
             self.log.info("Cluster already exists.")
+            cluster = self._get_cluster(hook)

Review comment:
       Yeah. See my "terraform" comment above. I think we are pretty good with manual deletion of the cluster in case we want to change configuration, I don't think we should handle all potential complexity of computing difference between expected/actual cluster configuration.

##########
File path: airflow/providers/google/cloud/operators/dataproc.py
##########
@@ -437,6 +437,9 @@ class DataprocCreateClusterOperator(BaseOperator):
     :type project_id: str
     :param region: leave as 'global', might become relevant in the future. (templated)
     :type region: str
+    :parm delete_on_error: If true the claster will be deleted if created with ERROR state. Default
+        value is true.
+    :type delete_on_error: bool

Review comment:
       I think this is not really something that should be handled by the operator itself. I'd argue that if you really want to change configuration of cluster you can simply delete it manually and let it be re-created. I think working in a "terraformy" or "kubectly" "apply" fashion in this case should be left to terraform. I.e. if you really want to use this kind of approach, why not write a terraform script and run terraform.
   
   BTW. Offtop - but should not we think about adding a Terraform/Terragrunt operator to Airflow ? I'd say it might be a good idea to have such an operator with some pre-defined ways on how to get terraform/terragrunt scripts in and how to integrate with airflow's JINJA templating. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek closed pull request #9593: Improve handling Dataproc cluster creation with ERROR state

Posted by GitBox <gi...@apache.org>.
turbaszek closed pull request #9593:
URL: https://github.com/apache/airflow/pull/9593


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org