You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/05/19 19:25:06 UTC

[GitHub] [airflow] mik-laj commented on a change in pull request #8550: Add DataflowStartFlexTemplateOperator

mik-laj commented on a change in pull request #8550:
URL: https://github.com/apache/airflow/pull/8550#discussion_r427545395



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -583,6 +623,49 @@ def start_template_dataflow(
         jobs_controller.wait_for_done()
         return response["job"]
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def start_flex_template(
+        self,
+        body: Dict,
+        location: str,
+        project_id: str,
+        on_new_job_id_callback: Optional[Callable[[str], None]] = None
+    ):
+        """
+        Starts flex templates with the Dataflow  pipeline.
+
+        :param body: The request body
+        :param location: The location of the Dataflow job (for example europe-west1)
+        :type location: str
+        :param project_id: The ID of the GCP project that owns the job.
+            If set to ``None`` or missing, the default project_id from the GCP connection is used.
+        :type project_id: Optional[str]
+        :param on_new_job_id_callback: A callback that is called when a Job ID is detected.
+        :return: the Job
+        """
+        service = self.get_conn()
+        request = service.projects().locations().flexTemplates().launch(  # pylint: disable=no-member
+            projectId=project_id,
+            body=body,
+            location=location
+        )
+        response = request.execute(num_retries=self.num_retries)
+        job_id = response['job']['id']
+
+        if on_new_job_id_callback:
+            on_new_job_id_callback(job_id)
+
+        jobs_controller = _DataflowJobsController(
+            dataflow=self.get_conn(),
+            project_number=project_id,
+            job_id=job_id,
+            location=location,
+            poll_sleep=self.poll_sleep,
+            num_retries=self.num_retries)
+        jobs_controller.wait_for_done()

Review comment:
       I am trying to imitate the naming conventions that we currently have in this integration.
   DataflowTemplatedJobStartOperator
   It uses the verb "Start" to describe identical operations.  
   
   I plan to add a blocking and non-blocking mode to all operators in a separate PR, because this requires the development of sensors. For now, I only wanted to focus on one issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org