You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/10/28 18:58:15 UTC

[GitHub] [airflow] eladkal commented on a diff in pull request #27276: Redshift Pause and Resume Operator Refactor

eladkal commented on code in PR #27276:
URL: https://github.com/apache/airflow/pull/27276#discussion_r1008350886


##########
airflow/providers/amazon/aws/hooks/redshift_cluster.py:
##########
@@ -157,16 +158,24 @@ def create_cluster_snapshot(
         )
         return response["Snapshot"] if response["Snapshot"] else None
 
-    def get_cluster_snapshot_status(self, snapshot_identifier: str, cluster_identifier: str):
+    def get_cluster_snapshot_status(self, snapshot_identifier: str, cluster_identifier: str | None = None):
         """
         Return Redshift cluster snapshot status. If cluster snapshot not found return ``None``
 
         :param snapshot_identifier: A unique identifier for the snapshot that you are requesting
-        :param cluster_identifier: The unique identifier of the cluster the snapshot was created from
+        :param cluster_identifier: (deprecated) The unique identifier of the cluster
+            the snapshot was created from

Review Comment:
   Can you please share some context?
   The PR description says the motivation is issues with system tests but there should be more to it?
   Why are we deprecating this parameter? If boto3 accept this as valid input why should we prevent users from using it?
   
   



##########
airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -392,20 +393,31 @@ def __init__(
         *,
         cluster_identifier: str,
         aws_conn_id: str = "aws_default",
+        attempts: int = 1,
+        attempt_interval: int = 30,
         **kwargs,
     ):
         super().__init__(**kwargs)
         self.cluster_identifier = cluster_identifier
         self.aws_conn_id = aws_conn_id
+        self.attempts = attempts
+        self.attempt_interval = attempt_interval
 
     def execute(self, context: Context):
         redshift_hook = RedshiftHook(aws_conn_id=self.aws_conn_id)
-        cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier)
-        if cluster_state == "paused":
-            self.log.info("Starting Redshift cluster %s", self.cluster_identifier)
-            redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
-        else:
-            raise Exception(f"Unable to resume cluster - cluster state is {cluster_state}")
+
+        while self.attempts >= 1:
+            try:
+                redshift_hook.get_conn().resume_cluster(ClusterIdentifier=self.cluster_identifier)
+                return
+            except redshift_hook.get_conn().exceptions.InvalidClusterStateFault as error:
+                self.attempts = self.attempts - 1
+
+                if self.attempts > 0:
+                    self.log.error("Unable to resume cluster. %d attempts remaining.", self.attempts)
+                    time.sleep(self.attempt_interval)
+                else:
+                    raise error

Review Comment:
   This is essentially the behavior of `retries` and `retry_delay` of BaseOperator
   
   Why do we need another layer of "internal retries"?
   I can argue that if we have such need then it possibly goes beyond this single operator and may apply to others.
   If we must have it lets not expose the parameters to the users. I'm not sure if we want each indvidual operator to set it's own user facing logic for retry.
   
   I think this is something should be discussed in more depth. I'd like to hear others on this one.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org