You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2020/10/09 16:45:21 UTC

[GitHub] [airflow] aaltay commented on a change in pull request #11374: [WIP] Add drain option when canceling Dataflow pipelines

aaltay commented on a change in pull request #11374:
URL: https://github.com/apache/airflow/pull/11374#discussion_r502552189



##########
File path: airflow/providers/google/cloud/hooks/dataflow.py
##########
@@ -298,22 +303,27 @@ def get_jobs(self) -> List[Dict]:
 
     def cancel(self) -> None:
         """
-        Cancels current job
+        Cancels or drains current job
         """
         jobs = self._get_current_jobs()
         batch = self._dataflow.new_batch_http_request()
         job_ids = [job['id'] for job in jobs]
         self.log.info("Canceling jobs: %s", ", ".join(job_ids))
-        for job_id in job_ids:
+        for job in jobs:
+            requested_state = (
+                DataflowJobType.JOB_TYPE_STREAMING

Review comment:
       `DataflowJobStatus.JOB_STATE_DRAINED` instead of `DataflowJobType.JOB_TYPE_STREAMING` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org