You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by di...@apache.org on 2023/01/01 22:23:49 UTC

[airflow] branch main updated: Push job_id in xcom for dataproc submit job op (#28639)

This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fec1460eb7 Push job_id in xcom for dataproc submit job op (#28639)
fec1460eb7 is described below

commit fec1460eb7896da6bfad69e95c92b8e531e35485
Author: Pankaj Singh <98...@users.noreply.github.com>
AuthorDate: Mon Jan 2 03:53:28 2023 +0530

    Push job_id in xcom for dataproc submit job op (#28639)
---
 airflow/providers/google/cloud/operators/dataproc.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py
index 24ca7de401..ed47578e2e 100644
--- a/airflow/providers/google/cloud/operators/dataproc.py
+++ b/airflow/providers/google/cloud/operators/dataproc.py
@@ -1002,6 +1002,7 @@ class DataprocJobBaseOperator(BaseOperator):
         if job_state == JobStatus.State.CANCELLED:
             raise AirflowException(f"Job was cancelled:\n{job_id}")
         self.log.info("%s completed successfully.", self.task_id)
+        return job_id
 
     def on_kill(self) -> None:
         """
@@ -1899,7 +1900,6 @@ class DataprocSubmitJobOperator(BaseOperator):
                 job_id=new_job_id, region=self.region, project_id=self.project_id, timeout=self.wait_timeout
             )
             self.log.info("Job %s completed successfully.", new_job_id)
-
         return self.job_id
 
     def execute_complete(self, context, event=None) -> None:
@@ -1915,6 +1915,7 @@ class DataprocSubmitJobOperator(BaseOperator):
         if job_state == JobStatus.State.CANCELLED:
             raise AirflowException(f"Job was cancelled:\n{job_id}")
         self.log.info("%s completed successfully.", self.task_id)
+        return job_id
 
     def on_kill(self):
         if self.job_id and self.cancel_on_kill: