You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/10/25 15:34:08 UTC

[GitHub] [airflow] jaketf commented on a change in pull request #6386: [AIRFLOW-5716][part of AIRFLOW-5697][depends on AIRFLOW-5711] Simplify DataflowJobsController logic

jaketf commented on a change in pull request #6386: [AIRFLOW-5716][part of AIRFLOW-5697][depends on AIRFLOW-5711] Simplify DataflowJobsController logic
URL: https://github.com/apache/airflow/pull/6386#discussion_r339114360
 
 

 ##########
 File path: airflow/gcp/hooks/dataflow.py
 ##########
 @@ -334,15 +357,16 @@ def _start_dataflow(
         name: str,
         command_prefix: List[str],
         label_formatter: Callable[[Dict], List[str]],
-        multiple_jobs: bool = False
+        project_id: str,
+        multiple_jobs: bool = False,
     ) -> None:
         variables = self._set_variables(variables)
-        cmd = command_prefix + self._build_cmd(variables, label_formatter)
+        cmd = command_prefix + self._build_cmd(variables, label_formatter, project_id)
         runner = _DataflowRunner(cmd)
         job_id = runner.wait_for_done()
 
 Review comment:
   The clean up in this PR LGTM. My only thought for further clean up is IMO this function is a misnomer it is called "`_start_dataflow`" but it actually does two things `start` and `wait_for_done`. detangling this so the hook provides a function for starting and a function for waiting and leave details to the Operator's execute. I think this would make it simpler once we sort out my reschedule poking operator PR. Another place it could be useful is we could allow the hook to start a Dataflow Streaming job without waiting on it until some other system cancels. I think this could be cool for streaming jobs we'd only need running at certain times of day. Of course we'd have to add a function to the hook to stop / drain a dataflow streaming job.  This could be interesting if you are using a  dataflow job to do streaming analytics on IoT data but only during 8 hr working day. Your dag could be `@daily` start dataflow job and then have a stop dataflow job which reschedules itself for 8hrs after the start dataflow job succeeds. This "ephemeral streaming job " is a rather contrived use case but it demonstrates additional value of separating `start` and `wait_for_done` operations in hooks like this one. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services