You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/12/14 17:10:46 UTC

[airflow] branch main updated: Fix mypy airbyte provider errors (#20271)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f5df92e  Fix mypy airbyte provider errors (#20271)
f5df92e is described below

commit f5df92e156cf933f34dc34239297184fb2d0c5b2
Author: Kanthi <su...@gmail.com>
AuthorDate: Tue Dec 14 12:10:16 2021 -0500

    Fix mypy airbyte provider errors (#20271)
---
 airflow/providers/airbyte/hooks/airbyte.py     | 8 ++++----
 airflow/providers/airbyte/operators/airbyte.py | 4 ++--
 airflow/providers/airbyte/sensors/airbyte.py   | 5 ++---
 3 files changed, 8 insertions(+), 9 deletions(-)

diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py
index 20aa7b8..8ea82b2 100644
--- a/airflow/providers/airbyte/hooks/airbyte.py
+++ b/airflow/providers/airbyte/hooks/airbyte.py
@@ -16,7 +16,7 @@
 # specific language governing permissions and limitations
 # under the License.
 import time
-from typing import Any, Optional
+from typing import Any, Optional, Union
 
 from airflow.exceptions import AirflowException
 from airflow.providers.http.hooks.http import HttpHook
@@ -46,12 +46,12 @@ class AirbyteHook(HttpHook):
     ERROR = "error"
     INCOMPLETE = "incomplete"
 
-    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: Optional[str] = "v1") -> None:
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
         super().__init__(http_conn_id=airbyte_conn_id)
         self.api_version: str = api_version
 
     def wait_for_job(
-        self, job_id: str, wait_seconds: Optional[float] = 3, timeout: Optional[float] = 3600
+        self, job_id: Union[str, int], wait_seconds: float = 3, timeout: Optional[float] = 3600
     ) -> None:
         """
         Helper method which polls a job to check if it finishes.
@@ -71,7 +71,7 @@ class AirbyteHook(HttpHook):
                 raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
             time.sleep(wait_seconds)
             try:
-                job = self.get_job(job_id=job_id)
+                job = self.get_job(job_id=(int(job_id)))
                 state = job.json()["job"]["status"]
             except AirflowException as err:
                 self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
diff --git a/airflow/providers/airbyte/operators/airbyte.py b/airflow/providers/airbyte/operators/airbyte.py
index aac4563..2e1400f 100644
--- a/airflow/providers/airbyte/operators/airbyte.py
+++ b/airflow/providers/airbyte/operators/airbyte.py
@@ -55,8 +55,8 @@ class AirbyteTriggerSyncOperator(BaseOperator):
         connection_id: str,
         airbyte_conn_id: str = "airbyte_default",
         asynchronous: Optional[bool] = False,
-        api_version: Optional[str] = "v1",
-        wait_seconds: Optional[float] = 3,
+        api_version: str = "v1",
+        wait_seconds: float = 3,
         timeout: Optional[float] = 3600,
         **kwargs,
     ) -> None:
diff --git a/airflow/providers/airbyte/sensors/airbyte.py b/airflow/providers/airbyte/sensors/airbyte.py
index dc202d0..a33d3a8 100644
--- a/airflow/providers/airbyte/sensors/airbyte.py
+++ b/airflow/providers/airbyte/sensors/airbyte.py
@@ -16,7 +16,6 @@
 # specific language governing permissions and limitations
 # under the License.
 """This module contains a Airbyte Job sensor."""
-from typing import Optional
 
 from airflow.exceptions import AirflowException
 from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
@@ -42,9 +41,9 @@ class AirbyteJobSensor(BaseSensorOperator):
     def __init__(
         self,
         *,
-        airbyte_job_id: str,
+        airbyte_job_id: int,
         airbyte_conn_id: str = 'airbyte_default',
-        api_version: Optional[str] = "v1",
+        api_version: str = "v1",
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)