You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/21 19:48:49 UTC

[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r428873640



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]:
             raise AirflowException(operation["error"])
         return operation["response"]
 
+    def wait_for_pipeline_state(
+        self,
+        success_states: List[str],

Review comment:
       should `success_states` have a reasonable default similar to failure states?

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]:
             raise AirflowException(operation["error"])
         return operation["response"]
 
+    def wait_for_pipeline_state(
+        self,
+        success_states: List[str],
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+        failure_states: List[str] = None,
+        timeout: int = 5 * 60,
+    ):
+        """
+        Pools pipeline state and raises an exception if the state is one of
+        `failure_states` or the operation timeouted.
+        """
+        failure_states = failure_states or FAILURE_STATES
+        start_time = monotonic()
+        current_state = None
+        while True:

Review comment:
       While true seems infinite loop prone. Any reason to not merge this with the below if?
   ```python3
       while monotonic - start_time > timepout:
           ...
       raise AirflowException(....)
   ```

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -386,15 +480,29 @@ def start_pipeline(
             pipeline_name,
             "workflows",
             "DataPipelineWorkflow",
-            "start"
+            "start",
         )
+        runtime_args = runtime_args or {}
+        # Unfortunately making the start call to CDAP does not return a run_id to poll for state.
+        # So we are adding a faux job id.
+        job_id = str(uuid.uuid4())

Review comment:
       It would be easier for the user to understand what started a job (in the CDAP UI) if this contained the airflow task id or dag run id.

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -386,15 +480,29 @@ def start_pipeline(
             pipeline_name,
             "workflows",
             "DataPipelineWorkflow",
-            "start"
+            "start",
         )
+        runtime_args = runtime_args or {}
+        # Unfortunately making the start call to CDAP does not return a run_id to poll for state.

Review comment:
       nit: IMO this should provide context link to relevant  CDAP bug https://issues.cask.co/browse/CDAP-7641
   This way once that is closed we can remember to simplify this.
   ```suggestion
           # Unfortunately making the start call to CDAP does not return a run_id to poll for state.
           # https://issues.cask.co/browse/CDAP-7641
   ```

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]:
             raise AirflowException(operation["error"])
         return operation["response"]
 
+    def wait_for_pipeline_state(
+        self,
+        success_states: List[str],
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+        failure_states: List[str] = None,
+        timeout: int = 5 * 60,
+    ):
+        """
+        Pools pipeline state and raises an exception if the state is one of
+        `failure_states` or the operation timeouted.
+        """
+        failure_states = failure_states or FAILURE_STATES
+        start_time = monotonic()
+        current_state = None
+        while True:
+            if monotonic() - start_time > timeout:
+                raise AirflowException(
+                    f"Pipeline {pipeline_name} state {current_state} is not "
+                    f"one of {success_states} after {timeout}s"
+                )
+            sleep(30)

Review comment:
       nit: if `monotonic() - start_time == timeout` (or `monotonic - start_time - timeout < 30`) in  above if statement we should not wait 30 seconds to raise exception.

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -357,12 +418,45 @@ def list_pipelines(
             )
         return json.loads(response.data)
 
+    def _get_pipeline(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> Optional[Dict]:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        response = self._cdap_request(url=url, method="GET")
+        if response.status != 200:
+            raise AirflowException(
+                f"Retrieving a pipeline failed with code {response.status}"
+            )
+
+        pipelines_list = json.loads(response.data)
+        for pipe in pipelines_list:

Review comment:
       It looks like this `_get_pipeline` gets called every iteration of the polling while loop.
   this means on each poll we loop through all the program runs (which may be numerous).
   
   We should only run this loop to "look up program run by faux id" once to get the "real" CDAP job ID.
   This real job ID can be used in polling to get this program run directly with a GET request.

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]:
             raise AirflowException(operation["error"])
         return operation["response"]
 
+    def wait_for_pipeline_state(
+        self,
+        success_states: List[str],
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+        failure_states: List[str] = None,
+        timeout: int = 5 * 60,
+    ):
+        """
+        Pools pipeline state and raises an exception if the state is one of

Review comment:
       typo
   ```suggestion
           Polls pipeline state and raises an exception if the state is one of
   ```

##########
File path: airflow/providers/google/cloud/operators/datafusion.py
##########
@@ -674,14 +676,23 @@ def execute(self, context: Dict):
             project_id=self.project_id,
         )
         api_url = instance["apiEndpoint"]
-        hook.start_pipeline(
+        faux_pipeline_id = hook.start_pipeline(
             pipeline_name=self.pipeline_name,
             instance_url=api_url,
             namespace=self.namespace,
             runtime_args=self.runtime_args
 
         )
         self.log.info("Pipeline started")
+        hook.wait_for_pipeline_state(
+            success_states=[PipelineStates.COMPLETED],

Review comment:
       This operator is name `CloudDataFusionStartPipelineOperator` (key word start).
   IMO that means `PipelineStates.RUNNING` should be a success state by default.
   However, it might be worth allowing the user to optionally control these success states to span more use cases. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org