You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/12/14 17:10:46 UTC
[airflow] branch main updated: Fix mypy airbyte provider errors (#20271)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f5df92e Fix mypy airbyte provider errors (#20271)
f5df92e is described below
commit f5df92e156cf933f34dc34239297184fb2d0c5b2
Author: Kanthi <su...@gmail.com>
AuthorDate: Tue Dec 14 12:10:16 2021 -0500
Fix mypy airbyte provider errors (#20271)
---
airflow/providers/airbyte/hooks/airbyte.py | 8 ++++----
airflow/providers/airbyte/operators/airbyte.py | 4 ++--
airflow/providers/airbyte/sensors/airbyte.py | 5 ++---
3 files changed, 8 insertions(+), 9 deletions(-)
diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py
index 20aa7b8..8ea82b2 100644
--- a/airflow/providers/airbyte/hooks/airbyte.py
+++ b/airflow/providers/airbyte/hooks/airbyte.py
@@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
import time
-from typing import Any, Optional
+from typing import Any, Optional, Union
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
@@ -46,12 +46,12 @@ class AirbyteHook(HttpHook):
ERROR = "error"
INCOMPLETE = "incomplete"
- def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: Optional[str] = "v1") -> None:
+ def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
super().__init__(http_conn_id=airbyte_conn_id)
self.api_version: str = api_version
def wait_for_job(
- self, job_id: str, wait_seconds: Optional[float] = 3, timeout: Optional[float] = 3600
+ self, job_id: Union[str, int], wait_seconds: float = 3, timeout: Optional[float] = 3600
) -> None:
"""
Helper method which polls a job to check if it finishes.
@@ -71,7 +71,7 @@ class AirbyteHook(HttpHook):
raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
time.sleep(wait_seconds)
try:
- job = self.get_job(job_id=job_id)
+ job = self.get_job(job_id=(int(job_id)))
state = job.json()["job"]["status"]
except AirflowException as err:
self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
diff --git a/airflow/providers/airbyte/operators/airbyte.py b/airflow/providers/airbyte/operators/airbyte.py
index aac4563..2e1400f 100644
--- a/airflow/providers/airbyte/operators/airbyte.py
+++ b/airflow/providers/airbyte/operators/airbyte.py
@@ -55,8 +55,8 @@ class AirbyteTriggerSyncOperator(BaseOperator):
connection_id: str,
airbyte_conn_id: str = "airbyte_default",
asynchronous: Optional[bool] = False,
- api_version: Optional[str] = "v1",
- wait_seconds: Optional[float] = 3,
+ api_version: str = "v1",
+ wait_seconds: float = 3,
timeout: Optional[float] = 3600,
**kwargs,
) -> None:
diff --git a/airflow/providers/airbyte/sensors/airbyte.py b/airflow/providers/airbyte/sensors/airbyte.py
index dc202d0..a33d3a8 100644
--- a/airflow/providers/airbyte/sensors/airbyte.py
+++ b/airflow/providers/airbyte/sensors/airbyte.py
@@ -16,7 +16,6 @@
# specific language governing permissions and limitations
# under the License.
"""This module contains a Airbyte Job sensor."""
-from typing import Optional
from airflow.exceptions import AirflowException
from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
@@ -42,9 +41,9 @@ class AirbyteJobSensor(BaseSensorOperator):
def __init__(
self,
*,
- airbyte_job_id: str,
+ airbyte_job_id: int,
airbyte_conn_id: str = 'airbyte_default',
- api_version: Optional[str] = "v1",
+ api_version: str = "v1",
**kwargs,
) -> None:
super().__init__(**kwargs)