You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/07/19 18:43:23 UTC

[airflow] branch main updated: AirbyteHook - Consider incomplete status (#16965)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fea2911  AirbyteHook - Consider incomplete status (#16965)
fea2911 is described below

commit fea29112bef4ad8787ae1482d829046bbba39f7e
Author: jaimefr <ja...@21buttons.com>
AuthorDate: Mon Jul 19 20:43:00 2021 +0200

    AirbyteHook - Consider incomplete status (#16965)
---
 airflow/providers/airbyte/hooks/airbyte.py    |  3 ++-
 tests/providers/airbyte/hooks/test_airbyte.py | 11 +++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py
index 2f9bf6d..20aa7b8 100644
--- a/airflow/providers/airbyte/hooks/airbyte.py
+++ b/airflow/providers/airbyte/hooks/airbyte.py
@@ -44,6 +44,7 @@ class AirbyteHook(HttpHook):
     PENDING = "pending"
     FAILED = "failed"
     ERROR = "error"
+    INCOMPLETE = "incomplete"
 
     def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: Optional[str] = "v1") -> None:
         super().__init__(http_conn_id=airbyte_conn_id)
@@ -76,7 +77,7 @@ class AirbyteHook(HttpHook):
                 self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
                 continue
 
-            if state in (self.RUNNING, self.PENDING):
+            if state in (self.RUNNING, self.PENDING, self.INCOMPLETE):
                 continue
             if state == self.SUCCEEDED:
                 break
diff --git a/tests/providers/airbyte/hooks/test_airbyte.py b/tests/providers/airbyte/hooks/test_airbyte.py
index 3c7e00d..f72d538 100644
--- a/tests/providers/airbyte/hooks/test_airbyte.py
+++ b/tests/providers/airbyte/hooks/test_airbyte.py
@@ -90,6 +90,17 @@ class TestAirbyteHook(unittest.TestCase):
         assert mock_get_job.has_calls(calls)
 
     @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+    def test_wait_for_job_incomplete_succeeded(self, mock_get_job):
+        mock_get_job.side_effect = [
+            self.return_value_get_job(self.hook.INCOMPLETE),
+            self.return_value_get_job(self.hook.SUCCEEDED),
+        ]
+        self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0)
+
+        calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)]
+        assert mock_get_job.has_calls(calls)
+
+    @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
     def test_wait_for_job_timeout(self, mock_get_job):
         mock_get_job.side_effect = [
             self.return_value_get_job(self.hook.PENDING),