You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2023/01/12 00:44:58 UTC

[GitHub] [airflow] vandonr-amz opened a new pull request, #28869: rewrite polling code for appflow hook

vandonr-amz opened a new pull request, #28869:
URL: https://github.com/apache/airflow/pull/28869

   Existing code had several problems imho, like 
    * using 3 different sleep times, only one being configurable
    * sleeping before doing anything
    * relying on the last run being the one we care about, which is not necessarily true
   
   I also refactored some duplicated code and added a `wait_for_completion` parameter to skip the wait entirely if needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #28869: rewrite polling code for appflow hook

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #28869:
URL: https://github.com/apache/airflow/pull/28869#discussion_r1123575711


##########
airflow/providers/amazon/aws/hooks/appflow.py:
##########
@@ -54,49 +50,35 @@ def conn(self) -> AppflowClient:
         """Get the underlying boto3 Appflow client (cached)"""
         return super().conn
 
-    def run_flow(self, flow_name: str, poll_interval: int = 20) -> str:
+    def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: bool = True) -> str:
         """
         Execute an AppFlow run.
 
         :param flow_name: The flow name
         :param poll_interval: Time (seconds) to wait between two consecutive calls to check the run status
+        :param wait_for_completion: whether to wait for the run to end to return
         :return: The run execution ID
         """
-        ts_before: datetime = datetime.now(timezone.utc)
-        sleep(self.EVENTUAL_CONSISTENCY_OFFSET)
         response_start = self.conn.start_flow(flowName=flow_name)
         execution_id = response_start["executionId"]
         self.log.info("executionId: %s", execution_id)
 
-        response_desc = self.conn.describe_flow(flowName=flow_name)
-        last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait Appflow eventual consistence
-        self.log.info("Waiting for Appflow eventual consistence...")
-        while (
-            response_desc.get("lastRunExecutionDetails", {}).get(
-                "mostRecentExecutionTime", datetime(1970, 1, 1, tzinfo=timezone.utc)
-            )
-            < ts_before
-        ):
-            sleep(self.EVENTUAL_CONSISTENCY_POLLING)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait flow stops
-        self.log.info("Waiting for flow run...")
-        while (
-            "mostRecentExecutionStatus" not in last_exec_details
-            or last_exec_details["mostRecentExecutionStatus"] == "InProgress"
-        ):
-            sleep(poll_interval)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        self.log.info("lastRunExecutionDetails: %s", last_exec_details)
-
-        if last_exec_details["mostRecentExecutionStatus"] == "Error":
-            raise Exception(f"Flow error:\n{json.dumps(response_desc, default=str)}")
+        if wait_for_completion:

Review Comment:
   I definitely agree that this should be a waiter and may as well be done now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vandonr-amz commented on a diff in pull request #28869: rewrite polling code for appflow hook

Posted by "vandonr-amz (via GitHub)" <gi...@apache.org>.
vandonr-amz commented on code in PR #28869:
URL: https://github.com/apache/airflow/pull/28869#discussion_r1123512646


##########
airflow/providers/amazon/aws/hooks/appflow.py:
##########
@@ -54,49 +50,35 @@ def conn(self) -> AppflowClient:
         """Get the underlying boto3 Appflow client (cached)"""
         return super().conn
 
-    def run_flow(self, flow_name: str, poll_interval: int = 20) -> str:
+    def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: bool = True) -> str:
         """
         Execute an AppFlow run.
 
         :param flow_name: The flow name
         :param poll_interval: Time (seconds) to wait between two consecutive calls to check the run status
+        :param wait_for_completion: whether to wait for the run to end to return
         :return: The run execution ID
         """
-        ts_before: datetime = datetime.now(timezone.utc)
-        sleep(self.EVENTUAL_CONSISTENCY_OFFSET)
         response_start = self.conn.start_flow(flowName=flow_name)
         execution_id = response_start["executionId"]
         self.log.info("executionId: %s", execution_id)
 
-        response_desc = self.conn.describe_flow(flowName=flow_name)
-        last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait Appflow eventual consistence
-        self.log.info("Waiting for Appflow eventual consistence...")
-        while (
-            response_desc.get("lastRunExecutionDetails", {}).get(
-                "mostRecentExecutionTime", datetime(1970, 1, 1, tzinfo=timezone.utc)
-            )
-            < ts_before
-        ):
-            sleep(self.EVENTUAL_CONSISTENCY_POLLING)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait flow stops
-        self.log.info("Waiting for flow run...")
-        while (
-            "mostRecentExecutionStatus" not in last_exec_details
-            or last_exec_details["mostRecentExecutionStatus"] == "InProgress"
-        ):
-            sleep(poll_interval)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        self.log.info("lastRunExecutionDetails: %s", last_exec_details)
-
-        if last_exec_details["mostRecentExecutionStatus"] == "Error":
-            raise Exception(f"Flow error:\n{json.dumps(response_desc, default=str)}")
+        if wait_for_completion:

Review Comment:
   ok, but do you think it should be done as part of this PR ? How about I push the conversion to waiter as a new PR ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vandonr-amz commented on a diff in pull request #28869: rewrite polling code for appflow hook

Posted by "vandonr-amz (via GitHub)" <gi...@apache.org>.
vandonr-amz commented on code in PR #28869:
URL: https://github.com/apache/airflow/pull/28869#discussion_r1086049943


##########
airflow/providers/amazon/aws/operators/appflow.py:
##########
@@ -93,6 +98,12 @@ def execute(self, context: Context) -> None:
         self.connector_type = self._get_connector_type()
         if self.flow_update:
             self._update_flow()
+            # previous code had a wait between update and run without explaining why.
+            # since I don't have a way to actually test this behavior,
+            # I'm reproducing it out of fear of breaking workflows.
+            # It might be unnecessary.

Review Comment:
   checked it with the team, and it's true for on-demand flows (see https://docs.aws.amazon.com/appflow/latest/userguide/flow-triggers.html for a desc of the different types of flows)
   I updated the comment accordingly.
   Unfortunately, we don't have infra setup to be able to run the appflow system test (because it depends on external sources), so it's a bit hard for me to test it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] igorborgest commented on a diff in pull request #28869: rewrite polling code for appflow hook

Posted by GitBox <gi...@apache.org>.
igorborgest commented on code in PR #28869:
URL: https://github.com/apache/airflow/pull/28869#discussion_r1070118130


##########
airflow/providers/amazon/aws/operators/appflow.py:
##########
@@ -93,6 +98,12 @@ def execute(self, context: Context) -> None:
         self.connector_type = self._get_connector_type()
         if self.flow_update:
             self._update_flow()
+            # previous code had a wait between update and run without explaining why.
+            # since I don't have a way to actually test this behavior,
+            # I'm reproducing it out of fear of breaking workflows.
+            # It might be unnecessary.

Review Comment:
   If I recall correctly, the updates are not propagated atomically. So this sleep was to avoid runs using the previous configuration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #28869: rewrite polling code for appflow hook

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #28869:
URL: https://github.com/apache/airflow/pull/28869#discussion_r1123525633


##########
airflow/providers/amazon/aws/hooks/appflow.py:
##########
@@ -54,49 +50,35 @@ def conn(self) -> AppflowClient:
         """Get the underlying boto3 Appflow client (cached)"""
         return super().conn
 
-    def run_flow(self, flow_name: str, poll_interval: int = 20) -> str:
+    def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: bool = True) -> str:
         """
         Execute an AppFlow run.
 
         :param flow_name: The flow name
         :param poll_interval: Time (seconds) to wait between two consecutive calls to check the run status
+        :param wait_for_completion: whether to wait for the run to end to return
         :return: The run execution ID
         """
-        ts_before: datetime = datetime.now(timezone.utc)
-        sleep(self.EVENTUAL_CONSISTENCY_OFFSET)
         response_start = self.conn.start_flow(flowName=flow_name)
         execution_id = response_start["executionId"]
         self.log.info("executionId: %s", execution_id)
 
-        response_desc = self.conn.describe_flow(flowName=flow_name)
-        last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait Appflow eventual consistence
-        self.log.info("Waiting for Appflow eventual consistence...")
-        while (
-            response_desc.get("lastRunExecutionDetails", {}).get(
-                "mostRecentExecutionTime", datetime(1970, 1, 1, tzinfo=timezone.utc)
-            )
-            < ts_before
-        ):
-            sleep(self.EVENTUAL_CONSISTENCY_POLLING)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait flow stops
-        self.log.info("Waiting for flow run...")
-        while (
-            "mostRecentExecutionStatus" not in last_exec_details
-            or last_exec_details["mostRecentExecutionStatus"] == "InProgress"
-        ):
-            sleep(poll_interval)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        self.log.info("lastRunExecutionDetails: %s", last_exec_details)
-
-        if last_exec_details["mostRecentExecutionStatus"] == "Error":
-            raise Exception(f"Flow error:\n{json.dumps(response_desc, default=str)}")
+        if wait_for_completion:

Review Comment:
   To me it should be done in this PR. It is basically the same problem with 2 different implementations, let's aim directly the desire implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vandonr-amz commented on a diff in pull request #28869: rewrite polling code for appflow hook

Posted by "vandonr-amz (via GitHub)" <gi...@apache.org>.
vandonr-amz commented on code in PR #28869:
URL: https://github.com/apache/airflow/pull/28869#discussion_r1123823559


##########
airflow/providers/amazon/aws/hooks/appflow.py:
##########
@@ -54,49 +50,35 @@ def conn(self) -> AppflowClient:
         """Get the underlying boto3 Appflow client (cached)"""
         return super().conn
 
-    def run_flow(self, flow_name: str, poll_interval: int = 20) -> str:
+    def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: bool = True) -> str:
         """
         Execute an AppFlow run.
 
         :param flow_name: The flow name
         :param poll_interval: Time (seconds) to wait between two consecutive calls to check the run status
+        :param wait_for_completion: whether to wait for the run to end to return
         :return: The run execution ID
         """
-        ts_before: datetime = datetime.now(timezone.utc)
-        sleep(self.EVENTUAL_CONSISTENCY_OFFSET)
         response_start = self.conn.start_flow(flowName=flow_name)
         execution_id = response_start["executionId"]
         self.log.info("executionId: %s", execution_id)
 
-        response_desc = self.conn.describe_flow(flowName=flow_name)
-        last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait Appflow eventual consistence
-        self.log.info("Waiting for Appflow eventual consistence...")
-        while (
-            response_desc.get("lastRunExecutionDetails", {}).get(
-                "mostRecentExecutionTime", datetime(1970, 1, 1, tzinfo=timezone.utc)
-            )
-            < ts_before
-        ):
-            sleep(self.EVENTUAL_CONSISTENCY_POLLING)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait flow stops
-        self.log.info("Waiting for flow run...")
-        while (
-            "mostRecentExecutionStatus" not in last_exec_details
-            or last_exec_details["mostRecentExecutionStatus"] == "InProgress"
-        ):
-            sleep(poll_interval)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        self.log.info("lastRunExecutionDetails: %s", last_exec_details)
-
-        if last_exec_details["mostRecentExecutionStatus"] == "Error":
-            raise Exception(f"Flow error:\n{json.dumps(response_desc, default=str)}")
+        if wait_for_completion:

Review Comment:
   ok, but after taking a look at it, waiter configurations don't allow implementing the logic needed here.
   
   Because the API doesn't allow querying status by run ID, we have to employ other means. The old logic was somewhat flawed but still checked the timestamp to make sure we weren't picking up the previous run. The new logic relies on the run ID. Both of those behavior are not possible with regular waiters, so I wouldn't block this PR on this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on pull request #28869: rewrite polling code for appflow hook

Posted by "Taragolis (via GitHub)" <gi...@apache.org>.
Taragolis commented on PR #28869:
URL: https://github.com/apache/airflow/pull/28869#issuecomment-1411051606

   @o-nikolas Oh, I really do not use Appflow. I've just ping Igor, because remember (and git blame told me) that he initially implement. But let me have a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #28869: rewrite polling code for appflow hook

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #28869:
URL: https://github.com/apache/airflow/pull/28869#discussion_r1123283048


##########
airflow/providers/amazon/aws/hooks/appflow.py:
##########
@@ -54,49 +50,35 @@ def conn(self) -> AppflowClient:
         """Get the underlying boto3 Appflow client (cached)"""
         return super().conn
 
-    def run_flow(self, flow_name: str, poll_interval: int = 20) -> str:
+    def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: bool = True) -> str:
         """
         Execute an AppFlow run.
 
         :param flow_name: The flow name
         :param poll_interval: Time (seconds) to wait between two consecutive calls to check the run status
+        :param wait_for_completion: whether to wait for the run to end to return
         :return: The run execution ID
         """
-        ts_before: datetime = datetime.now(timezone.utc)
-        sleep(self.EVENTUAL_CONSISTENCY_OFFSET)
         response_start = self.conn.start_flow(flowName=flow_name)
         execution_id = response_start["executionId"]
         self.log.info("executionId: %s", execution_id)
 
-        response_desc = self.conn.describe_flow(flowName=flow_name)
-        last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait Appflow eventual consistence
-        self.log.info("Waiting for Appflow eventual consistence...")
-        while (
-            response_desc.get("lastRunExecutionDetails", {}).get(
-                "mostRecentExecutionTime", datetime(1970, 1, 1, tzinfo=timezone.utc)
-            )
-            < ts_before
-        ):
-            sleep(self.EVENTUAL_CONSISTENCY_POLLING)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait flow stops
-        self.log.info("Waiting for flow run...")
-        while (
-            "mostRecentExecutionStatus" not in last_exec_details
-            or last_exec_details["mostRecentExecutionStatus"] == "InProgress"
-        ):
-            sleep(poll_interval)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        self.log.info("lastRunExecutionDetails: %s", last_exec_details)
-
-        if last_exec_details["mostRecentExecutionStatus"] == "Error":
-            raise Exception(f"Flow error:\n{json.dumps(response_desc, default=str)}")
+        if wait_for_completion:

Review Comment:
   Instead of having this custom logic, would not it be possible to use a custom waiter as in #29822? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] igorborgest commented on pull request #28869: rewrite polling code for appflow hook

Posted by GitBox <gi...@apache.org>.
igorborgest commented on PR #28869:
URL: https://github.com/apache/airflow/pull/28869#issuecomment-1382458573

   Great improvements @vandonr-amz, thank you!
   
   Just one point, I recommend testing it against the real service.
   AppFlow is one of the most annoying services I have worked with. Seems everything is eventually consistent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] Taragolis commented on a diff in pull request #28869: rewrite polling code for appflow hook

Posted by GitBox <gi...@apache.org>.
Taragolis commented on code in PR #28869:
URL: https://github.com/apache/airflow/pull/28869#discussion_r1069517827


##########
airflow/providers/amazon/aws/operators/appflow.py:
##########
@@ -93,6 +98,12 @@ def execute(self, context: Context) -> None:
         self.connector_type = self._get_connector_type()
         if self.flow_update:
             self._update_flow()
+            # previous code had a wait between update and run without explaining why.
+            # since I don't have a way to actually test this behavior,
+            # I'm reproducing it out of fear of breaking workflows.
+            # It might be unnecessary.

Review Comment:
   @igorborgest maybe you could and explanation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] vincbeck commented on a diff in pull request #28869: rewrite polling code for appflow hook

Posted by "vincbeck (via GitHub)" <gi...@apache.org>.
vincbeck commented on code in PR #28869:
URL: https://github.com/apache/airflow/pull/28869#discussion_r1123525633


##########
airflow/providers/amazon/aws/hooks/appflow.py:
##########
@@ -54,49 +50,35 @@ def conn(self) -> AppflowClient:
         """Get the underlying boto3 Appflow client (cached)"""
         return super().conn
 
-    def run_flow(self, flow_name: str, poll_interval: int = 20) -> str:
+    def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: bool = True) -> str:
         """
         Execute an AppFlow run.
 
         :param flow_name: The flow name
         :param poll_interval: Time (seconds) to wait between two consecutive calls to check the run status
+        :param wait_for_completion: whether to wait for the run to end to return
         :return: The run execution ID
         """
-        ts_before: datetime = datetime.now(timezone.utc)
-        sleep(self.EVENTUAL_CONSISTENCY_OFFSET)
         response_start = self.conn.start_flow(flowName=flow_name)
         execution_id = response_start["executionId"]
         self.log.info("executionId: %s", execution_id)
 
-        response_desc = self.conn.describe_flow(flowName=flow_name)
-        last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait Appflow eventual consistence
-        self.log.info("Waiting for Appflow eventual consistence...")
-        while (
-            response_desc.get("lastRunExecutionDetails", {}).get(
-                "mostRecentExecutionTime", datetime(1970, 1, 1, tzinfo=timezone.utc)
-            )
-            < ts_before
-        ):
-            sleep(self.EVENTUAL_CONSISTENCY_POLLING)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait flow stops
-        self.log.info("Waiting for flow run...")
-        while (
-            "mostRecentExecutionStatus" not in last_exec_details
-            or last_exec_details["mostRecentExecutionStatus"] == "InProgress"
-        ):
-            sleep(poll_interval)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        self.log.info("lastRunExecutionDetails: %s", last_exec_details)
-
-        if last_exec_details["mostRecentExecutionStatus"] == "Error":
-            raise Exception(f"Flow error:\n{json.dumps(response_desc, default=str)}")
+        if wait_for_completion:

Review Comment:
   To me it should be done in this PR. It is basically the same problem with 2 different implementation, let's aim directly the desire implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas commented on pull request #28869: rewrite polling code for appflow hook

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas commented on PR #28869:
URL: https://github.com/apache/airflow/pull/28869#issuecomment-1410894943

   @Taragolis @igorborgest are you happy with the state of this PR or would you like to see any other changes/testing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] o-nikolas merged pull request #28869: rewrite polling code for appflow hook

Posted by "o-nikolas (via GitHub)" <gi...@apache.org>.
o-nikolas merged PR #28869:
URL: https://github.com/apache/airflow/pull/28869


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #28869: rewrite polling code for appflow hook

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #28869:
URL: https://github.com/apache/airflow/pull/28869#discussion_r1123575711


##########
airflow/providers/amazon/aws/hooks/appflow.py:
##########
@@ -54,49 +50,35 @@ def conn(self) -> AppflowClient:
         """Get the underlying boto3 Appflow client (cached)"""
         return super().conn
 
-    def run_flow(self, flow_name: str, poll_interval: int = 20) -> str:
+    def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: bool = True) -> str:
         """
         Execute an AppFlow run.
 
         :param flow_name: The flow name
         :param poll_interval: Time (seconds) to wait between two consecutive calls to check the run status
+        :param wait_for_completion: whether to wait for the run to end to return
         :return: The run execution ID
         """
-        ts_before: datetime = datetime.now(timezone.utc)
-        sleep(self.EVENTUAL_CONSISTENCY_OFFSET)
         response_start = self.conn.start_flow(flowName=flow_name)
         execution_id = response_start["executionId"]
         self.log.info("executionId: %s", execution_id)
 
-        response_desc = self.conn.describe_flow(flowName=flow_name)
-        last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait Appflow eventual consistence
-        self.log.info("Waiting for Appflow eventual consistence...")
-        while (
-            response_desc.get("lastRunExecutionDetails", {}).get(
-                "mostRecentExecutionTime", datetime(1970, 1, 1, tzinfo=timezone.utc)
-            )
-            < ts_before
-        ):
-            sleep(self.EVENTUAL_CONSISTENCY_POLLING)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait flow stops
-        self.log.info("Waiting for flow run...")
-        while (
-            "mostRecentExecutionStatus" not in last_exec_details
-            or last_exec_details["mostRecentExecutionStatus"] == "InProgress"
-        ):
-            sleep(poll_interval)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        self.log.info("lastRunExecutionDetails: %s", last_exec_details)
-
-        if last_exec_details["mostRecentExecutionStatus"] == "Error":
-            raise Exception(f"Flow error:\n{json.dumps(response_desc, default=str)}")
+        if wait_for_completion:

Review Comment:
   I definitely agree that this should be a waiter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #28869: rewrite polling code for appflow hook

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #28869:
URL: https://github.com/apache/airflow/pull/28869#discussion_r1124997260


##########
airflow/providers/amazon/aws/hooks/appflow.py:
##########
@@ -54,49 +50,35 @@ def conn(self) -> AppflowClient:
         """Get the underlying boto3 Appflow client (cached)"""
         return super().conn
 
-    def run_flow(self, flow_name: str, poll_interval: int = 20) -> str:
+    def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: bool = True) -> str:
         """
         Execute an AppFlow run.
 
         :param flow_name: The flow name
         :param poll_interval: Time (seconds) to wait between two consecutive calls to check the run status
+        :param wait_for_completion: whether to wait for the run to end to return
         :return: The run execution ID
         """
-        ts_before: datetime = datetime.now(timezone.utc)
-        sleep(self.EVENTUAL_CONSISTENCY_OFFSET)
         response_start = self.conn.start_flow(flowName=flow_name)
         execution_id = response_start["executionId"]
         self.log.info("executionId: %s", execution_id)
 
-        response_desc = self.conn.describe_flow(flowName=flow_name)
-        last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait Appflow eventual consistence
-        self.log.info("Waiting for Appflow eventual consistence...")
-        while (
-            response_desc.get("lastRunExecutionDetails", {}).get(
-                "mostRecentExecutionTime", datetime(1970, 1, 1, tzinfo=timezone.utc)
-            )
-            < ts_before
-        ):
-            sleep(self.EVENTUAL_CONSISTENCY_POLLING)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait flow stops
-        self.log.info("Waiting for flow run...")
-        while (
-            "mostRecentExecutionStatus" not in last_exec_details
-            or last_exec_details["mostRecentExecutionStatus"] == "InProgress"
-        ):
-            sleep(poll_interval)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        self.log.info("lastRunExecutionDetails: %s", last_exec_details)
-
-        if last_exec_details["mostRecentExecutionStatus"] == "Error":
-            raise Exception(f"Flow error:\n{json.dumps(response_desc, default=str)}")
+        if wait_for_completion:

Review Comment:
   We talked offline and it does look like the AppFlow waiters are a non-trivial change.  They handle their waiters completely differently than other services.  I still think a custom-waiters-first approach is still the best approach, but in this case I'm willing to approve this as is.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [airflow] ferruzzi commented on a diff in pull request #28869: rewrite polling code for appflow hook

Posted by "ferruzzi (via GitHub)" <gi...@apache.org>.
ferruzzi commented on code in PR #28869:
URL: https://github.com/apache/airflow/pull/28869#discussion_r1124997260


##########
airflow/providers/amazon/aws/hooks/appflow.py:
##########
@@ -54,49 +50,35 @@ def conn(self) -> AppflowClient:
         """Get the underlying boto3 Appflow client (cached)"""
         return super().conn
 
-    def run_flow(self, flow_name: str, poll_interval: int = 20) -> str:
+    def run_flow(self, flow_name: str, poll_interval: int = 20, wait_for_completion: bool = True) -> str:
         """
         Execute an AppFlow run.
 
         :param flow_name: The flow name
         :param poll_interval: Time (seconds) to wait between two consecutive calls to check the run status
+        :param wait_for_completion: whether to wait for the run to end to return
         :return: The run execution ID
         """
-        ts_before: datetime = datetime.now(timezone.utc)
-        sleep(self.EVENTUAL_CONSISTENCY_OFFSET)
         response_start = self.conn.start_flow(flowName=flow_name)
         execution_id = response_start["executionId"]
         self.log.info("executionId: %s", execution_id)
 
-        response_desc = self.conn.describe_flow(flowName=flow_name)
-        last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait Appflow eventual consistence
-        self.log.info("Waiting for Appflow eventual consistence...")
-        while (
-            response_desc.get("lastRunExecutionDetails", {}).get(
-                "mostRecentExecutionTime", datetime(1970, 1, 1, tzinfo=timezone.utc)
-            )
-            < ts_before
-        ):
-            sleep(self.EVENTUAL_CONSISTENCY_POLLING)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        # Wait flow stops
-        self.log.info("Waiting for flow run...")
-        while (
-            "mostRecentExecutionStatus" not in last_exec_details
-            or last_exec_details["mostRecentExecutionStatus"] == "InProgress"
-        ):
-            sleep(poll_interval)
-            response_desc = self.conn.describe_flow(flowName=flow_name)
-            last_exec_details = response_desc["lastRunExecutionDetails"]
-
-        self.log.info("lastRunExecutionDetails: %s", last_exec_details)
-
-        if last_exec_details["mostRecentExecutionStatus"] == "Error":
-            raise Exception(f"Flow error:\n{json.dumps(response_desc, default=str)}")
+        if wait_for_completion:

Review Comment:
   We talked offline and it does look like the AppFlow waiters are a non-trivial change.  They handle their waiters completely differently than other services.  I still think a custom-waiters-first approach is
    the best approach, but in this case I'm willing to approve this as is.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org