You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/11/22 13:07:32 UTC

[airflow] branch main updated: Added wait mechanizm to the DataprocJobSensor to avoid 509 errors when Job is not available (#19740)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b2e1a8  Added wait mechanizm to the DataprocJobSensor to avoid 509 errors when Job is not available (#19740)
0b2e1a8 is described below

commit 0b2e1a8744ac0d5965cb11f6a6fa74cee1d03f3d
Author: Ɓukasz Wyszomirski <wy...@google.com>
AuthorDate: Mon Nov 22 14:07:01 2021 +0100

    Added wait mechanizm to the DataprocJobSensor to avoid 509 errors when Job is not available (#19740)
---
 airflow/providers/google/cloud/sensors/dataproc.py | 37 ++++++++++++++++--
 .../google/cloud/sensors/test_dataproc.py          | 44 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 4 deletions(-)

diff --git a/airflow/providers/google/cloud/sensors/dataproc.py b/airflow/providers/google/cloud/sensors/dataproc.py
index fd5ead0..5f2b235 100644
--- a/airflow/providers/google/cloud/sensors/dataproc.py
+++ b/airflow/providers/google/cloud/sensors/dataproc.py
@@ -17,9 +17,11 @@
 # under the License.
 """This module contains a Dataproc Job sensor."""
 # pylint: disable=C0302
+import time
 import warnings
-from typing import Optional
+from typing import Dict, Optional
 
+from google.api_core.exceptions import ServerError
 from google.cloud.dataproc_v1.types import JobStatus
 
 from airflow.exceptions import AirflowException
@@ -42,6 +44,8 @@ class DataprocJobSensor(BaseSensorOperator):
     :type location: str
     :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
     :type gcp_conn_id: str
+    :param wait_timeout: How many seconds wait for job to be ready.
+    :type wait_timeout: int
     """
 
     template_fields = ('project_id', 'region', 'dataproc_job_id')
@@ -55,6 +59,7 @@ class DataprocJobSensor(BaseSensorOperator):
         region: str = None,
         location: Optional[str] = None,
         gcp_conn_id: str = 'google_cloud_default',
+        wait_timeout: Optional[int] = None,
         **kwargs,
     ) -> None:
         if region is None:
@@ -73,12 +78,36 @@ class DataprocJobSensor(BaseSensorOperator):
         self.gcp_conn_id = gcp_conn_id
         self.dataproc_job_id = dataproc_job_id
         self.region = region
+        self.wait_timeout = wait_timeout
+        self.start_sensor_time = None
 
-    def poke(self, context: dict) -> bool:
+    def execute(self, context: Dict):
+        self.start_sensor_time = time.monotonic()
+        super().execute(context)
+
+    def _duration(self):
+        return time.monotonic() - self.start_sensor_time
+
+    def poke(self, context: Dict) -> bool:
         hook = DataprocHook(gcp_conn_id=self.gcp_conn_id)
-        job = hook.get_job(job_id=self.dataproc_job_id, region=self.region, project_id=self.project_id)
-        state = job.status.state
+        if self.wait_timeout:
+            try:
+                job = hook.get_job(
+                    job_id=self.dataproc_job_id, region=self.region, project_id=self.project_id
+                )
+            except ServerError as err:
+                self.log.info(f"DURATION RUN: {self._duration()}")
+                if self._duration() > self.wait_timeout:
+                    raise AirflowException(
+                        f"Timeout: dataproc job {self.dataproc_job_id} "
+                        f"is not ready after {self.wait_timeout}s"
+                    )
+                self.log.info("Retrying. Dataproc API returned server error when waiting for job: %s", err)
+                return False
+        else:
+            job = hook.get_job(job_id=self.dataproc_job_id, region=self.region, project_id=self.project_id)
 
+        state = job.status.state
         if state == JobStatus.State.ERROR:
             raise AirflowException(f'Job failed:\n{job}')
         elif state in {
diff --git a/tests/providers/google/cloud/sensors/test_dataproc.py b/tests/providers/google/cloud/sensors/test_dataproc.py
index 0f9f096..35fa969 100644
--- a/tests/providers/google/cloud/sensors/test_dataproc.py
+++ b/tests/providers/google/cloud/sensors/test_dataproc.py
@@ -17,8 +17,10 @@
 
 import unittest
 from unittest import mock
+from unittest.mock import Mock
 
 import pytest
+from google.api_core.exceptions import ServerError
 from google.cloud.dataproc_v1.types import JobStatus
 
 from airflow import AirflowException
@@ -164,3 +166,45 @@ class TestDataprocJobSensor(unittest.TestCase):
                 timeout=TIMEOUT,
             )
             sensor.poke(context={})
+
+    @mock.patch(DATAPROC_PATH.format("DataprocHook"))
+    def test_wait_timeout(self, mock_hook):
+        job_id = "job_id"
+        mock_hook.return_value.get_job.side_effect = ServerError("Job are not ready")
+
+        sensor = DataprocJobSensor(
+            task_id=TASK_ID,
+            region=GCP_LOCATION,
+            project_id=GCP_PROJECT,
+            dataproc_job_id=job_id,
+            gcp_conn_id=GCP_CONN_ID,
+            timeout=TIMEOUT,
+            wait_timeout=300,
+        )
+
+        sensor._duration = Mock()
+        sensor._duration.return_value = 200
+
+        result = sensor.poke(context={})
+        assert not result
+
+    @mock.patch(DATAPROC_PATH.format("DataprocHook"))
+    def test_wait_timeout_raise_exception(self, mock_hook):
+        job_id = "job_id"
+        mock_hook.return_value.get_job.side_effect = ServerError("Job are not ready")
+
+        sensor = DataprocJobSensor(
+            task_id=TASK_ID,
+            region=GCP_LOCATION,
+            project_id=GCP_PROJECT,
+            dataproc_job_id=job_id,
+            gcp_conn_id=GCP_CONN_ID,
+            timeout=TIMEOUT,
+            wait_timeout=300,
+        )
+
+        sensor._duration = Mock()
+        sensor._duration.return_value = 301
+
+        with pytest.raises(AirflowException, match="Timeout: dataproc job job_id is not ready after 300s"):
+            sensor.poke(context={})