You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/21 16:00:42 UTC

[GitHub] [airflow] turbaszek opened a new pull request #8954: Wait for pipeline state in Data Fusion operators

turbaszek opened a new pull request #8954:
URL: https://github.com/apache/airflow/pull/8954


   Closes: #8673
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429662507



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:

Review comment:
       Ah I see, my mistake. 
   You'll get 200 and empty collection if there's no runs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r430910630



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:

Review comment:
       @sreevatsanraman how can we distinguish those two types? According to Data Fusion CDAP API reference users should use the same endpoint to start both batch and streaming pipelines:
   https://cloud.google.com/data-fusion/docs/reference/cdap-reference#start_a_batch_pipeline




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429855053



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:

Review comment:
       Yes, so I will retry the request. I am not sure if there's anything we can do about this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r438261904



##########
File path: airflow/providers/google/cloud/operators/datafusion.py
##########
@@ -636,13 +648,15 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
     template_fields = ("instance_name", "pipeline_name", "runtime_args")
 
     @apply_defaults
-    def __init__(
+    def __init__(  # pylint: disable=too-many-arguments
         self,
         pipeline_name: str,
         instance_name: str,
         location: str,
         runtime_args: Optional[Dict[str, Any]] = None,
+        success_states: Optional[List[str]] = None,
         namespace: str = "default",
+        pipeline_timeout: int = 10 * 60,

Review comment:
       The contract of this operator is to start a pipeline. not wait til pipeline completion.
   10 mins is reasonable timeout.
   COMPLETED is just a success state in case it's a super quick pipeline that completes between polls.
   We can add a sensor for waiting on pipeline completion (which should use reschedule mode if it expects to be so long).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429616796



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:

Review comment:
       You are requesting `.../runs/run-id`, the code here is calling `.../runs` to get list of all runs because we don't know yet the proper CDAP `run-id`. I assume that this request should be successful unless something wrong is with API / network. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r428873640



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]:
             raise AirflowException(operation["error"])
         return operation["response"]
 
+    def wait_for_pipeline_state(
+        self,
+        success_states: List[str],

Review comment:
       should `success_states` have a reasonable default similar to failure states?

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]:
             raise AirflowException(operation["error"])
         return operation["response"]
 
+    def wait_for_pipeline_state(
+        self,
+        success_states: List[str],
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+        failure_states: List[str] = None,
+        timeout: int = 5 * 60,
+    ):
+        """
+        Pools pipeline state and raises an exception if the state is one of
+        `failure_states` or the operation timeouted.
+        """
+        failure_states = failure_states or FAILURE_STATES
+        start_time = monotonic()
+        current_state = None
+        while True:

Review comment:
       While true seems infinite loop prone. Any reason to not merge this with the below if?
   ```python3
       while monotonic - start_time > timepout:
           ...
       raise AirflowException(....)
   ```

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -386,15 +480,29 @@ def start_pipeline(
             pipeline_name,
             "workflows",
             "DataPipelineWorkflow",
-            "start"
+            "start",
         )
+        runtime_args = runtime_args or {}
+        # Unfortunately making the start call to CDAP does not return a run_id to poll for state.
+        # So we are adding a faux job id.
+        job_id = str(uuid.uuid4())

Review comment:
       It would be easier for the user to understand what started a job (in the CDAP UI) if this contained the airflow task id or dag run id.

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -386,15 +480,29 @@ def start_pipeline(
             pipeline_name,
             "workflows",
             "DataPipelineWorkflow",
-            "start"
+            "start",
         )
+        runtime_args = runtime_args or {}
+        # Unfortunately making the start call to CDAP does not return a run_id to poll for state.

Review comment:
       nit: IMO this should provide context link to relevant  CDAP bug https://issues.cask.co/browse/CDAP-7641
   This way once that is closed we can remember to simplify this.
   ```suggestion
           # Unfortunately making the start call to CDAP does not return a run_id to poll for state.
           # https://issues.cask.co/browse/CDAP-7641
   ```

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]:
             raise AirflowException(operation["error"])
         return operation["response"]
 
+    def wait_for_pipeline_state(
+        self,
+        success_states: List[str],
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+        failure_states: List[str] = None,
+        timeout: int = 5 * 60,
+    ):
+        """
+        Pools pipeline state and raises an exception if the state is one of
+        `failure_states` or the operation timeouted.
+        """
+        failure_states = failure_states or FAILURE_STATES
+        start_time = monotonic()
+        current_state = None
+        while True:
+            if monotonic() - start_time > timeout:
+                raise AirflowException(
+                    f"Pipeline {pipeline_name} state {current_state} is not "
+                    f"one of {success_states} after {timeout}s"
+                )
+            sleep(30)

Review comment:
       nit: if `monotonic() - start_time == timeout` (or `monotonic - start_time - timeout < 30`) in  above if statement we should not wait 30 seconds to raise exception.

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -357,12 +418,45 @@ def list_pipelines(
             )
         return json.loads(response.data)
 
+    def _get_pipeline(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> Optional[Dict]:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        response = self._cdap_request(url=url, method="GET")
+        if response.status != 200:
+            raise AirflowException(
+                f"Retrieving a pipeline failed with code {response.status}"
+            )
+
+        pipelines_list = json.loads(response.data)
+        for pipe in pipelines_list:

Review comment:
       It looks like this `_get_pipeline` gets called every iteration of the polling while loop.
   this means on each poll we loop through all the program runs (which may be numerous).
   
   We should only run this loop to "look up program run by faux id" once to get the "real" CDAP job ID.
   This real job ID can be used in polling to get this program run directly with a GET request.

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -67,6 +85,49 @@ def wait_for_operation(self, operation: Dict[str, Any]) -> Dict[str, Any]:
             raise AirflowException(operation["error"])
         return operation["response"]
 
+    def wait_for_pipeline_state(
+        self,
+        success_states: List[str],
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+        failure_states: List[str] = None,
+        timeout: int = 5 * 60,
+    ):
+        """
+        Pools pipeline state and raises an exception if the state is one of

Review comment:
       typo
   ```suggestion
           Polls pipeline state and raises an exception if the state is one of
   ```

##########
File path: airflow/providers/google/cloud/operators/datafusion.py
##########
@@ -674,14 +676,23 @@ def execute(self, context: Dict):
             project_id=self.project_id,
         )
         api_url = instance["apiEndpoint"]
-        hook.start_pipeline(
+        faux_pipeline_id = hook.start_pipeline(
             pipeline_name=self.pipeline_name,
             instance_url=api_url,
             namespace=self.namespace,
             runtime_args=self.runtime_args
 
         )
         self.log.info("Pipeline started")
+        hook.wait_for_pipeline_state(
+            success_states=[PipelineStates.COMPLETED],

Review comment:
       This operator is name `CloudDataFusionStartPipelineOperator` (key word start).
   IMO that means `PipelineStates.RUNNING` should be a success state by default.
   However, it might be worth allowing the user to optionally control these success states to span more use cases. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429307560



##########
File path: airflow/providers/google/cloud/operators/datafusion.py
##########
@@ -674,14 +676,23 @@ def execute(self, context: Dict):
             project_id=self.project_id,
         )
         api_url = instance["apiEndpoint"]
-        hook.start_pipeline(
+        faux_pipeline_id = hook.start_pipeline(
             pipeline_name=self.pipeline_name,
             instance_url=api_url,
             namespace=self.namespace,
             runtime_args=self.runtime_args
 
         )
         self.log.info("Pipeline started")
+        hook.wait_for_pipeline_state(
+            success_states=[PipelineStates.COMPLETED],

Review comment:
       I've added `success_states` as an optional argument. If not provided the operator will wait for operation to be `RUNNING`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RachaelDS commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
RachaelDS commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r437672204



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -386,15 +480,29 @@ def start_pipeline(
             pipeline_name,
             "workflows",
             "DataPipelineWorkflow",
-            "start"
+            "start",
         )
+        runtime_args = runtime_args or {}
+        # Unfortunately making the start call to CDAP does not return a run_id to poll for state.

Review comment:
       You can avoid using the faux run Id by making a call to the batch start pipeline endpoint - the run id will be returned in this case. For example:
   TYPE: POST
   URL: 'https://xxx.datafusion.googleusercontent.com/api/v3/namespaces/default/start'
   BODY: 
   [{"appId": "app_id", "programType": "workflow", "programId": "DataPipelineWorkflow","runtimeargs": {}}]
   
   Batch start pipeline endpoint info:
   https://docs.cdap.io/cdap/current/en/reference-manual/http-restful-api/lifecycle.html#H3293 (documentation does not currently reflect that the run Id is returned)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429617638



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -386,15 +480,29 @@ def start_pipeline(
             pipeline_name,
             "workflows",
             "DataPipelineWorkflow",
-            "start"
+            "start",
         )
+        runtime_args = runtime_args or {}
+        # Unfortunately making the start call to CDAP does not return a run_id to poll for state.
+        # So we are adding a faux job id.
+        job_id = str(uuid.uuid4())

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] sreevatsanraman commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
sreevatsanraman commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r430650030



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:

Review comment:
       @jaketf the API that CDAP exposes is the basic building blocks of programs. Which are workflows, spark, mapreduce jobs etc. The Data Fusion pipelines use workflows for batch jobs and spark streaming jobs for realtime. The operators should wait for the batch jobs and not wait for the streaming ones.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r438177959



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -357,12 +419,40 @@ def list_pipelines(
             )
         return json.loads(response.data)
 
+    def _get_workflow_state(
+        self,
+        pipeline_name: str,
+        instance_url: str,
+        pipeline_id: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+            pipeline_id,

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429617213



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:
+                raise AirflowException(
+                    f"Retrieving a pipeline failed with code {response.status}"
+                )
 
-        return None
+            pipelines_list = json.loads(response.data)
+            for pipe in pipelines_list:
+                runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
+                if runtime_args[job_id_key] == faux_pipeline_id:
+                    return pipe["runid"]
+            sleep(10)

Review comment:
       Usually it's successful on 2nd loop but sometimes I get to the 3rd. I will move the `sleep` to the beginning as you suggested so it should decrease number of requests. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RachaelDS commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
RachaelDS commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r438284379



##########
File path: airflow/providers/google/cloud/operators/datafusion.py
##########
@@ -636,13 +648,15 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
     template_fields = ("instance_name", "pipeline_name", "runtime_args")
 
     @apply_defaults
-    def __init__(
+    def __init__(  # pylint: disable=too-many-arguments
         self,
         pipeline_name: str,
         instance_name: str,
         location: str,
         runtime_args: Optional[Dict[str, Any]] = None,
+        success_states: Optional[List[str]] = None,
         namespace: str = "default",
+        pipeline_timeout: int = 10 * 60,

Review comment:
       As part of these changes you can now pass in a parameter to have the operator wait for pipeline completion (not just pipeline start).
   sensor + reschedule mode sounds like a good suggestion, thanks
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
jaketf commented on pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#issuecomment-641453114


   Thanks for following up @turbaszek.
   *tl;dr* I think we should merge this PR as it fixes the immediate issue. We can file a lower priority issue to handle streaming pipelines in the future. This can be an additional kwarg that accepts a streaming flag and uses a different paths for polling. 
   
   I've updated the threads. I agree I think we should keep this PR small and focused on patching the existing operator for starting data fusion batch pipelines.
   
   In general I think batch is more used than streaming and spark is more used than MR.
   In batch both MR and spark can be polled at the .../DataPipelineWorkflow/runs/run_id endpoint.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429358908



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:

Review comment:
       I'm not sure this will have the behavior you expect.
   
   what happens when you do a get on the program run id that isn't present yet? 404 or empty body?
   
   I personally get 404 on a 6.1.1 instance for a random uuid i generated. Has the API behavior changed?
   
   ![Screenshot 2020-05-22 at 9 58 30 AM](https://user-images.githubusercontent.com/11599048/82691494-db407380-9c12-11ea-92c3-7f5d4b94380d.png)
   

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:
+                raise AirflowException(
+                    f"Retrieving a pipeline failed with code {response.status}"
+                )
 
-        return None
+            pipelines_list = json.loads(response.data)
+            for pipe in pipelines_list:
+                runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
+                if runtime_args[job_id_key] == faux_pipeline_id:
+                    return pipe["runid"]
+            sleep(10)
+        raise AirflowException(
+            f"Unable to retrieve run id of `{pipeline_name}` pipeline."

Review comment:
       We can give a little more info here
   ```suggestion
               f"Unable to retrieve run id of `{pipeline_name}` pipeline with runtime arg {job_id_key}={faux_pipeline_id}."
   ```

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -484,24 +516,31 @@ def start_pipeline(
         )
         runtime_args = runtime_args or {}
         # Unfortunately making the start call to CDAP does not return a run_id to poll for state.
-        # So we are adding a faux job id.
-        job_id = str(uuid.uuid4())
-        runtime_args[job_id_key] = job_id
+        # So we are adding a faux job id. Context link to relevant CDAP bug:
+        # https://issues.cask.co/browse/CDAP-7641
+        faux_job_id = str(uuid.uuid4())
+        runtime_args[job_id_key] = faux_job_id
 
         response = self._cdap_request(url=url, method="POST", body=runtime_args)
         if response.status != 200:
             raise AirflowException(
                 f"Starting a pipeline failed with code {response.status}"
             )
 
+        pipeline_id = self._get_pipeline_run_id(
+            pipeline_name=pipeline_name,
+            faux_pipeline_id=faux_job_id,
+            namespace=namespace,
+            instance_url=instance_url,
+        )
         self.wait_for_pipeline_state(
             success_states=[PipelineStates.RUNNING, PipelineStates.COMPLETED],

Review comment:
       nit we already have these defined in module a constant
   ```suggestion
               success_states=SUCCESS_STATES,
   ```

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:
+                raise AirflowException(
+                    f"Retrieving a pipeline failed with code {response.status}"
+                )
 
-        return None
+            pipelines_list = json.loads(response.data)
+            for pipe in pipelines_list:
+                runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
+                if runtime_args[job_id_key] == faux_pipeline_id:
+                    return pipe["runid"]
+            sleep(10)

Review comment:
       How often do you notice the first request failing? If >50% then we expect the program run doesn't show up for some time (seconds) and we can expect first iteration to fail not work, do we expect this loop to happen at least 2-3x?
   Could we move the sleep to the beginning of the loop body to increase likelihood that this loop exits on an earlier iteration? Hopefully save API calls we don't expect to yield a successful get on the program run.

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",

Review comment:
       This CDAP API is very convoluted.
   
   Seems like there are [several program types](https://docs.cask.co/cdap/6.1.1/en/reference-manual/http-restful-api/lifecycle.html#H3581) and this is hard coding `workflows` and `DataPipelineWorkflow` .
   
   I think `DataPipelineWorkflow` this will not be present for streaming pipelines instead you have to poll a `spark` program.
   
   I'm not sure how many other scenarios require other program types.
   It would be good to get someone from CDAP community to review this.
   
   ![Screenshot 2020-05-22 at 10 21 00 AM](https://user-images.githubusercontent.com/11599048/82693211-f791df80-9c15-11ea-964b-dc8fcad3e25c.png)
   
   ![Screenshot 2020-05-22 at 10 20 27 AM](https://user-images.githubusercontent.com/11599048/82693231-037da180-9c16-11ea-8a0d-c6665c416f2a.png)
   
   

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",

Review comment:
       CC: @sreevatsanraman

##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:

Review comment:
       FYI you can get to this convenient UI by clicking system admin > configuration > Make HTTP calls
   It's very useful for getting used to / testing the CDAP REST API.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r438150533



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -357,12 +419,40 @@ def list_pipelines(
             )
         return json.loads(response.data)
 
+    def _get_workflow_state(
+        self,
+        pipeline_name: str,
+        instance_url: str,
+        pipeline_id: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+            pipeline_id,

Review comment:
       Should we URL encode this parameter?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429855053



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:

Review comment:
       Yes, so I will retry the request. I am not sure if there's anything we can do about this. When we call this method we are expecting to see some runs




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r437578183



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        # Try 5 times to get the CDAP runid. We do this because the pipeline
+        # may not be present instantly
+        for _ in range(5):
+            response = self._cdap_request(url=url, method="GET")
+            if response.status != 200:

Review comment:
       I think we should just handle batch pipelines in this PR (as this is implicitly all the current operator does). Also, anecdotally, I think this covers 90% of use cases for airflow. In the field i have not see a lot of streaming orchestration with airflow.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429236013



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -357,12 +418,45 @@ def list_pipelines(
             )
         return json.loads(response.data)
 
+    def _get_pipeline(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> Optional[Dict]:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        response = self._cdap_request(url=url, method="GET")
+        if response.status != 200:
+            raise AirflowException(
+                f"Retrieving a pipeline failed with code {response.status}"
+            )
+
+        pipelines_list = json.loads(response.data)
+        for pipe in pipelines_list:

Review comment:
       @jaketf do I correctly understand that state of program can be acceses under:
   ```
   https://xxx-dot-eun1.datafusion.googleusercontent.com/api/v3/namespaces/default/apps/airflow_test/workflows/DataPipelineWorkflow/runs/5d1a277b-17ae-4cab-b3a6-5d23a2f8f672
   ```
   at least that's how it seems to be done here
   https://github.com/GoogleCloudPlatform/terraform-provider-cdap/blob/e16a2c5188bf2299482d5c8261ca01f27cd19a44/cdap/resource_streaming_program_run.go#L244
   
   The problem is that I get 404, any ideas?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429292921



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -386,15 +480,29 @@ def start_pipeline(
             pipeline_name,
             "workflows",
             "DataPipelineWorkflow",
-            "start"
+            "start",
         )
+        runtime_args = runtime_args or {}
+        # Unfortunately making the start call to CDAP does not return a run_id to poll for state.
+        # So we are adding a faux job id.
+        job_id = str(uuid.uuid4())

Review comment:
       Then the id will have to be generated in the operator. From hook level we are unable (in a simple way) access this information. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429236013



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -357,12 +418,45 @@ def list_pipelines(
             )
         return json.loads(response.data)
 
+    def _get_pipeline(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> Optional[Dict]:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",
+            "DataPipelineWorkflow",
+            "runs",
+        )
+        response = self._cdap_request(url=url, method="GET")
+        if response.status != 200:
+            raise AirflowException(
+                f"Retrieving a pipeline failed with code {response.status}"
+            )
+
+        pipelines_list = json.loads(response.data)
+        for pipe in pipelines_list:

Review comment:
       @jaketf do I correctly understand that state of program can be acceses under:
   ```
   https://xxx-dot-eun1.datafusion.googleusercontent.com/api/v3/namespaces/default/apps/airflow_test/workflows/DataPipelineWorkflow/runs/5d1a277b-17ae-4cab-b3a6-5d23a2f8f672
   ```
   at least that's how it seems to be done here
   https://github.com/GoogleCloudPlatform/terraform-provider-cdap/blob/e16a2c5188bf2299482d5c8261ca01f27cd19a44/cdap/resource_streaming_program_run.go#L244
   
   The problem is that I get 404, any ideas?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r437578759



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",

Review comment:
       As long as we cover batch pipelines (with spark or MR backend I think we should be good)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RachaelDS commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
RachaelDS commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r438223116



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -382,19 +472,32 @@ def start_pipeline(
             "v3",
             "namespaces",
             namespace,
-            "apps",
-            pipeline_name,
-            "workflows",
-            "DataPipelineWorkflow",
-            "start"
+            "start",
         )
-
-        response = self._cdap_request(url=url, method="POST", body=runtime_args)
+        runtime_args = runtime_args or {}
+        body = [{
+            "appId": pipeline_name,
+            "programType": "workflow",
+            "programId": "DataPipelineWorkflow",
+            "runtimeargs": runtime_args
+        }]
+        response = self._cdap_request(url=url, method="POST", body=body)

Review comment:
       Just an FYI - this is the API request to start multiple pipelines. 
   There will eventually be a fix return the run Id as part of the API request to run a _single_ pipeline. 
   We can revert to your original URL when this is available. For context:
   https://issues.cask.co/browse/CDAP-7641

##########
File path: airflow/providers/google/cloud/operators/datafusion.py
##########
@@ -636,13 +648,15 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
     template_fields = ("instance_name", "pipeline_name", "runtime_args")
 
     @apply_defaults
-    def __init__(
+    def __init__(  # pylint: disable=too-many-arguments
         self,
         pipeline_name: str,
         instance_name: str,
         location: str,
         runtime_args: Optional[Dict[str, Any]] = None,
+        success_states: Optional[List[str]] = None,
         namespace: str = "default",
+        pipeline_timeout: int = 10 * 60,

Review comment:
       If the success state is COMPLETED, we will timeout before the pipeline run completes.
   It can take > 5 minutes to just provision a Data Fusion pipeline run. Some pipelines can take hours to complete.
   Can we increase the default timeout to 1 hour?

##########
File path: airflow/providers/google/cloud/operators/datafusion.py
##########
@@ -616,6 +625,9 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
     :type pipeline_name: str
     :param instance_name: The name of the instance.
     :type instance_name: str
+    :param success_states: If provided the operator will wait for pipeline to be in one of
+        the provided states.
+    :type success_states: List[str]

Review comment:
       missing info for new _pipeline_timeout_ parameter




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] jaketf commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
jaketf commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429345281



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -386,15 +480,29 @@ def start_pipeline(
             pipeline_name,
             "workflows",
             "DataPipelineWorkflow",
-            "start"
+            "start",
         )
+        runtime_args = runtime_args or {}
+        # Unfortunately making the start call to CDAP does not return a run_id to poll for state.
+        # So we are adding a faux job id.
+        job_id = str(uuid.uuid4())

Review comment:
       Could the hook parameterize this faux id to let the operator do this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r439988167



##########
File path: airflow/providers/google/cloud/operators/datafusion.py
##########
@@ -636,13 +648,15 @@ class CloudDataFusionStartPipelineOperator(BaseOperator):
     template_fields = ("instance_name", "pipeline_name", "runtime_args")
 
     @apply_defaults
-    def __init__(
+    def __init__(  # pylint: disable=too-many-arguments
         self,
         pipeline_name: str,
         instance_name: str,
         location: str,
         runtime_args: Optional[Dict[str, Any]] = None,
+        success_states: Optional[List[str]] = None,
         namespace: str = "default",
+        pipeline_timeout: int = 10 * 60,

Review comment:
       I've created an issue to limit scope of this PR
   https://github.com/apache/airflow/issues/9300




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#issuecomment-641147473


   Hi @jaketf @sreevatsanraman what should we do to move this forward?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek merged pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek merged pull request #8954:
URL: https://github.com/apache/airflow/pull/8954


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429617037



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",

Review comment:
       This seems to be a bigger change because we will have to adjust each method that uses `DataPipelineWorkflow` in URI. So, I would say we can do this but in a follow up PR.
   
   Btw. I was relying on Google docs: https://cloud.google.com/data-fusion/docs/reference/cdap-reference#start_a_batch_pipeline




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #8954: Wait for pipeline state in Data Fusion operators

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #8954:
URL: https://github.com/apache/airflow/pull/8954#discussion_r429617037



##########
File path: airflow/providers/google/cloud/hooks/datafusion.py
##########
@@ -435,20 +435,52 @@ def _get_pipeline(
             "workflows",
             "DataPipelineWorkflow",
             "runs",
+            pipeline_id,
         )
         response = self._cdap_request(url=url, method="GET")
         if response.status != 200:
             raise AirflowException(
-                f"Retrieving a pipeline failed with code {response.status}"
+                f"Retrieving a pipeline state failed with code {response.status}"
             )
+        workflow = json.loads(response.data)
+        return workflow["status"]
 
-        pipelines_list = json.loads(response.data)
-        for pipe in pipelines_list:
-            runtime_args = json.loads(pipe["properties"]["runtimeArgs"])
-            if runtime_args[job_id_key] == faux_pipeline_id:
-                return pipe
+    def _get_pipeline_run_id(
+        self,
+        pipeline_name: str,
+        faux_pipeline_id: str,
+        instance_url: str,
+        namespace: str = "default",
+    ) -> str:
+        url = os.path.join(
+            instance_url,
+            "v3",
+            "namespaces",
+            namespace,
+            "apps",
+            pipeline_name,
+            "workflows",

Review comment:
       This seems to be a bigger change because we will have to adjust each method that uses `DataPipelineWorkflow` in URI. So, I would say we can do this but in a follow up PR.
   
   Btw. I was basing on Google docs: https://cloud.google.com/data-fusion/docs/reference/cdap-reference#start_a_batch_pipeline




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org