You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/07/19 18:43:23 UTC
[airflow] branch main updated: AirbyteHook - Consider incomplete
status (#16965)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fea2911 AirbyteHook - Consider incomplete status (#16965)
fea2911 is described below
commit fea29112bef4ad8787ae1482d829046bbba39f7e
Author: jaimefr <ja...@21buttons.com>
AuthorDate: Mon Jul 19 20:43:00 2021 +0200
AirbyteHook - Consider incomplete status (#16965)
---
airflow/providers/airbyte/hooks/airbyte.py | 3 ++-
tests/providers/airbyte/hooks/test_airbyte.py | 11 +++++++++++
2 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py
index 2f9bf6d..20aa7b8 100644
--- a/airflow/providers/airbyte/hooks/airbyte.py
+++ b/airflow/providers/airbyte/hooks/airbyte.py
@@ -44,6 +44,7 @@ class AirbyteHook(HttpHook):
PENDING = "pending"
FAILED = "failed"
ERROR = "error"
+ INCOMPLETE = "incomplete"
def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: Optional[str] = "v1") -> None:
super().__init__(http_conn_id=airbyte_conn_id)
@@ -76,7 +77,7 @@ class AirbyteHook(HttpHook):
self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
continue
- if state in (self.RUNNING, self.PENDING):
+ if state in (self.RUNNING, self.PENDING, self.INCOMPLETE):
continue
if state == self.SUCCEEDED:
break
diff --git a/tests/providers/airbyte/hooks/test_airbyte.py b/tests/providers/airbyte/hooks/test_airbyte.py
index 3c7e00d..f72d538 100644
--- a/tests/providers/airbyte/hooks/test_airbyte.py
+++ b/tests/providers/airbyte/hooks/test_airbyte.py
@@ -90,6 +90,17 @@ class TestAirbyteHook(unittest.TestCase):
assert mock_get_job.has_calls(calls)
@mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+ def test_wait_for_job_incomplete_succeeded(self, mock_get_job):
+ mock_get_job.side_effect = [
+ self.return_value_get_job(self.hook.INCOMPLETE),
+ self.return_value_get_job(self.hook.SUCCEEDED),
+ ]
+ self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0)
+
+ calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)]
+ assert mock_get_job.has_calls(calls)
+
+ @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
def test_wait_for_job_timeout(self, mock_get_job):
mock_get_job.side_effect = [
self.return_value_get_job(self.hook.PENDING),