You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/04/06 23:19:31 UTC
[airflow] 03/03: Adds new Airbyte provider (#14492)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5d05835e1d54c87f3a05b5c2bd5f1850b8506dfc
Author: Marcos Marx <ma...@users.noreply.github.com>
AuthorDate: Sat Mar 6 11:19:30 2021 -0300
Adds new Airbyte provider (#14492)
This commit add hook, operators and sensors to interact with
Airbyte external service.
(cherry picked from commit 20b72aea4dc1e25f2aa3cfe62b45ca1ff29d1cbb)
---
CONTRIBUTING.rst | 23 ++--
INSTALL | 22 ++--
airflow/providers/airbyte/CHANGELOG.rst | 25 ++++
airflow/providers/airbyte/__init__.py | 17 +++
airflow/providers/airbyte/example_dags/__init__.py | 16 +++
.../example_dags/example_airbyte_trigger_job.py | 64 +++++++++++
airflow/providers/airbyte/hooks/__init__.py | 17 +++
airflow/providers/airbyte/hooks/airbyte.py | 109 ++++++++++++++++++
airflow/providers/airbyte/operators/__init__.py | 17 +++
airflow/providers/airbyte/operators/airbyte.py | 85 ++++++++++++++
airflow/providers/airbyte/provider.yaml | 51 +++++++++
airflow/providers/airbyte/sensors/__init__.py | 16 +++
airflow/providers/airbyte/sensors/airbyte.py | 73 ++++++++++++
airflow/providers/dependencies.json | 3 +
docs/apache-airflow-providers-airbyte/commits.rst | 27 +++++
.../connections.rst | 36 ++++++
docs/apache-airflow-providers-airbyte/index.rst | 121 ++++++++++++++++++++
.../operators/airbyte.rst | 58 ++++++++++
docs/apache-airflow/extra-packages-ref.rst | 2 +
docs/integration-logos/airbyte/Airbyte.png | Bin 0 -> 7405 bytes
docs/spelling_wordlist.txt | 2 +
setup.py | 1 +
tests/core/test_providers_manager.py | 1 +
tests/providers/airbyte/__init__.py | 16 +++
tests/providers/airbyte/hooks/__init__.py | 16 +++
tests/providers/airbyte/hooks/test_airbyte.py | 126 +++++++++++++++++++++
tests/providers/airbyte/operators/__init__.py | 16 +++
tests/providers/airbyte/operators/test_airbyte.py | 55 +++++++++
tests/providers/airbyte/sensors/__init__.py | 16 +++
tests/providers/airbyte/sensors/test_airbyte.py | 93 +++++++++++++++
30 files changed, 1102 insertions(+), 22 deletions(-)
diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index e82fd4e..7ac115c 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -585,17 +585,17 @@ This is the full list of those extras:
.. START EXTRAS HERE
-all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, apache.hdfs,
-apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop,
-apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes,
-crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
-docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google,
-google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes,
-ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
-opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole,
-rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
-snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica, virtualenv, webhdfs,
-winrm, yandex, zendesk
+airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid,
+apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark,
+apache.sqoop, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
+cncf.kubernetes, crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop,
+dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api,
+github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins,
+jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql,
+mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus,
+postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
+sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica,
+virtualenv, webhdfs, winrm, yandex, zendesk
.. END EXTRAS HERE
@@ -653,6 +653,7 @@ Here is the list of packages and their extras:
========================== ===========================
Package Extras
========================== ===========================
+airbyte http
amazon apache.hive,google,imap,mongo,mysql,postgres,ssh
apache.beam google
apache.druid apache.hive
diff --git a/INSTALL b/INSTALL
index 34fccd2..46d15f6 100644
--- a/INSTALL
+++ b/INSTALL
@@ -97,17 +97,17 @@ The list of available extras:
# START EXTRAS HERE
-all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, apache.hdfs,
-apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop,
-apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes,
-crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
-docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google,
-google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes,
-ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
-opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole,
-rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
-snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica, virtualenv, webhdfs,
-winrm, yandex, zendesk
+airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid,
+apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark,
+apache.sqoop, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
+cncf.kubernetes, crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop,
+dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api,
+github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins,
+jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql,
+mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus,
+postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
+sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica,
+virtualenv, webhdfs, winrm, yandex, zendesk
# END EXTRAS HERE
diff --git a/airflow/providers/airbyte/CHANGELOG.rst b/airflow/providers/airbyte/CHANGELOG.rst
new file mode 100644
index 0000000..cef7dda
--- /dev/null
+++ b/airflow/providers/airbyte/CHANGELOG.rst
@@ -0,0 +1,25 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Changelog
+---------
+
+1.0.0
+.....
+
+Initial version of the provider.
diff --git a/airflow/providers/airbyte/__init__.py b/airflow/providers/airbyte/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/airbyte/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/airbyte/example_dags/__init__.py b/airflow/providers/airbyte/example_dags/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/airbyte/example_dags/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py b/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
new file mode 100644
index 0000000..1ac62a8
--- /dev/null
+++ b/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the AirbyteTriggerSyncOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+ 'owner': 'airflow',
+}
+
+with DAG(
+ dag_id='example_airbyte_operator',
+ default_args=args,
+ schedule_interval=None,
+ start_date=days_ago(1),
+ dagrun_timeout=timedelta(minutes=60),
+ tags=['example'],
+) as dag:
+
+ # [START howto_operator_airbyte_synchronous]
+ sync_source_destination = AirbyteTriggerSyncOperator(
+ task_id='airbyte_sync_source_dest_example',
+ airbyte_conn_id='airbyte_default',
+ connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+ )
+ # [END howto_operator_airbyte_synchronous]
+
+ # [START howto_operator_airbyte_asynchronous]
+ async_source_destination = AirbyteTriggerSyncOperator(
+ task_id='airbyte_async_source_dest_example',
+ airbyte_conn_id='airbyte_default',
+ connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+ asynchronous=True,
+ )
+
+ airbyte_sensor = AirbyteJobSensor(
+ task_id='airbyte_sensor_source_dest_example',
+ airbyte_job_id="{{task_instance.xcom_pull(task_ids='airbyte_async_source_dest_example')}}",
+ airbyte_conn_id='airbyte_default',
+ )
+ # [END howto_operator_airbyte_asynchronous]
+
+ async_source_destination >> airbyte_sensor
diff --git a/airflow/providers/airbyte/hooks/__init__.py b/airflow/providers/airbyte/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/airbyte/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py
new file mode 100644
index 0000000..0aeb4f8
--- /dev/null
+++ b/airflow/providers/airbyte/hooks/airbyte.py
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+ """
+ Hook for Airbyte API
+
+ :param airbyte_conn_id: Required. The name of the Airflow connection to get
+ connection information for Airbyte.
+ :type airbyte_conn_id: str
+ :param api_version: Optional. Airbyte API version.
+ :type api_version: str
+ """
+
+ RUNNING = "running"
+ SUCCEEDED = "succeeded"
+ CANCELLED = "cancelled"
+ PENDING = "pending"
+ FAILED = "failed"
+ ERROR = "error"
+
+ def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: Optional[str] = "v1") -> None:
+ super().__init__(http_conn_id=airbyte_conn_id)
+ self.api_version: str = api_version
+
+ def wait_for_job(
+ self, job_id: str, wait_seconds: Optional[float] = 3, timeout: Optional[float] = 3600
+ ) -> None:
+ """
+ Helper method which polls a job to check if it finishes.
+
+ :param job_id: Required. Id of the Airbyte job
+ :type job_id: str
+ :param wait_seconds: Optional. Number of seconds between checks.
+ :type wait_seconds: float
+ :param timeout: Optional. How many seconds wait for job to be ready.
+ Used only if ``asynchronous`` is False.
+ :type timeout: float
+ """
+ state = None
+ start = time.monotonic()
+ while True:
+ if timeout and start + timeout < time.monotonic():
+ raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+ time.sleep(wait_seconds)
+ try:
+ job = self.get_job(job_id=job_id)
+ state = job.json()["job"]["status"]
+ except AirflowException as err:
+ self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+ continue
+
+ if state in (self.RUNNING, self.PENDING):
+ continue
+ if state == self.SUCCEEDED:
+ break
+ if state == self.ERROR:
+ raise AirflowException(f"Job failed:\n{job}")
+ elif state == self.CANCELLED:
+ raise AirflowException(f"Job was cancelled:\n{job}")
+ else:
+ raise Exception(f"Encountered unexpected state `{state}` for job_id `{job_id}`")
+
+ def submit_sync_connection(self, connection_id: str) -> Any:
+ """
+ Submits a job to a Airbyte server.
+
+ :param connection_id: Required. The ConnectionId of the Airbyte Connection.
+ :type connectiond_id: str
+ """
+ return self.run(
+ endpoint=f"api/{self.api_version}/connections/sync",
+ json={"connectionId": connection_id},
+ headers={"accept": "application/json"},
+ )
+
+ def get_job(self, job_id: int) -> Any:
+ """
+ Gets the resource representation for a job in Airbyte.
+
+ :param job_id: Required. Id of the Airbyte job
+ :type job_id: int
+ """
+ return self.run(
+ endpoint=f"api/{self.api_version}/jobs/get",
+ json={"id": job_id},
+ headers={"accept": "application/json"},
+ )
diff --git a/airflow/providers/airbyte/operators/__init__.py b/airflow/providers/airbyte/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/airbyte/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/airbyte/operators/airbyte.py b/airflow/providers/airbyte/operators/airbyte.py
new file mode 100644
index 0000000..6932fa3
--- /dev/null
+++ b/airflow/providers/airbyte/operators/airbyte.py
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+ """
+ This operator allows you to submit a job to an Airbyte server to run a integration
+ process between your source and destination.
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+ :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+ information for Airbyte.
+ :type airbyte_conn_id: str
+ :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+ :type connection_id: str
+ :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+ This is useful for submitting long running jobs and
+ waiting on them asynchronously using the AirbyteJobSensor.
+ :type asynchronous: bool
+ :param api_version: Optional. Airbyte API version.
+ :type api_version: str
+ :param wait_seconds: Optional. Number of seconds between checks. Only used when ``asynchronous`` is False.
+ :type wait_seconds: float
+ :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+ Only used when ``asynchronous`` is False.
+ :type timeout: float
+ """
+
+ template_fields = ('connection_id',)
+
+ @apply_defaults
+ def __init__(
+ self,
+ connection_id: str,
+ airbyte_conn_id: str = "airbyte_default",
+ asynchronous: Optional[bool] = False,
+ api_version: Optional[str] = "v1",
+ wait_seconds: Optional[float] = 3,
+ timeout: Optional[float] = 3600,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.airbyte_conn_id = airbyte_conn_id
+ self.connection_id = connection_id
+ self.timeout = timeout
+ self.api_version = api_version
+ self.wait_seconds = wait_seconds
+ self.asynchronous = asynchronous
+
+ def execute(self, context) -> None:
+ """Create Airbyte Job and wait to finish"""
+ hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+ job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+ job_id = job_object.json()['job']['id']
+
+ self.log.info("Job %s was submitted to Airbyte Server", job_id)
+ if not self.asynchronous:
+ self.log.info('Waiting for job %s to complete', job_id)
+ hook.wait_for_job(job_id=job_id, wait_seconds=self.wait_seconds, timeout=self.timeout)
+ self.log.info('Job %s completed successfully', job_id)
+
+ return job_id
diff --git a/airflow/providers/airbyte/provider.yaml b/airflow/providers/airbyte/provider.yaml
new file mode 100644
index 0000000..77b109f
--- /dev/null
+++ b/airflow/providers/airbyte/provider.yaml
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-airbyte
+name: Airbyte
+description: |
+ `Airbyte <https://airbyte.io/>`__
+
+versions:
+ - 1.0.0
+
+integrations:
+ - integration-name: Airbyte
+ external-doc-url: https://www.airbyte.io/
+ logo: /integration-logos/airbyte/Airbyte.png
+ how-to-guide:
+ - /docs/apache-airflow-providers-airbyte/operators/airbyte.rst
+ tags: [service]
+
+operators:
+ - integration-name: Airbyte
+ python-modules:
+ - airflow.providers.airbyte.operators.airbyte
+
+hooks:
+ - integration-name: Airbyte
+ python-modules:
+ - airflow.providers.airbyte.hooks.airbyte
+
+sensors:
+ - integration-name: Airbyte
+ python-modules:
+ - airflow.providers.airbyte.sensors.airbyte
+
+hook-class-names:
+ - airflow.providers.airbyte.hooks.airbyte.AirbyteHook
diff --git a/airflow/providers/airbyte/sensors/__init__.py b/airflow/providers/airbyte/sensors/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/airbyte/sensors/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/airbyte/sensors/airbyte.py b/airflow/providers/airbyte/sensors/airbyte.py
new file mode 100644
index 0000000..9799ade
--- /dev/null
+++ b/airflow/providers/airbyte/sensors/airbyte.py
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+ """
+ Check for the state of a previously submitted Airbyte job.
+
+ :param airbyte_job_id: Required. Id of the Airbyte job
+ :type airbyte_job_id: str
+ :param airbyte_conn_id: Required. The name of the Airflow connection to get
+ connection information for Airbyte.
+ :type airbyte_conn_id: str
+ :param api_version: Optional. Airbyte API version.
+ :type api_version: str
+ """
+
+ template_fields = ('airbyte_job_id',)
+ ui_color = '#6C51FD'
+
+ @apply_defaults
+ def __init__(
+ self,
+ *,
+ airbyte_job_id: str,
+ airbyte_conn_id: str = 'airbyte_default',
+ api_version: Optional[str] = "v1",
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.airbyte_conn_id = airbyte_conn_id
+ self.airbyte_job_id = airbyte_job_id
+ self.api_version = api_version
+
+ def poke(self, context: dict) -> bool:
+ hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+ job = hook.get_job(job_id=self.airbyte_job_id)
+ status = job.json()['job']['status']
+
+ if status == hook.FAILED:
+ raise AirflowException(f"Job failed: \n{job}")
+ elif status == hook.CANCELLED:
+ raise AirflowException(f"Job was cancelled: \n{job}")
+ elif status == hook.SUCCEEDED:
+ self.log.info("Job %s completed successfully.", self.airbyte_job_id)
+ return True
+ elif status == hook.ERROR:
+ self.log.info("Job %s attempt has failed.", self.airbyte_job_id)
+
+ self.log.info("Waiting for job %s to complete.", self.airbyte_job_id)
+ return False
diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json
index 81a3ba4..6027656 100644
--- a/airflow/providers/dependencies.json
+++ b/airflow/providers/dependencies.json
@@ -1,4 +1,7 @@
{
+ "airbyte": [
+ "http"
+ ],
"amazon": [
"apache.hive",
"google",
diff --git a/docs/apache-airflow-providers-airbyte/commits.rst b/docs/apache-airflow-providers-airbyte/commits.rst
new file mode 100644
index 0000000..cae1272
--- /dev/null
+++ b/docs/apache-airflow-providers-airbyte/commits.rst
@@ -0,0 +1,27 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Package apache-airflow-providers-airbyte
+----------------------------------------
+
+`Airbyte <https://airbyte.io/>`__
+
+
+This is detailed commit list of changes for versions provider package: ``airbyte``.
+For high-level changelog, see :doc:`package information including changelog <index>`.
diff --git a/docs/apache-airflow-providers-airbyte/connections.rst b/docs/apache-airflow-providers-airbyte/connections.rst
new file mode 100644
index 0000000..31b69c7
--- /dev/null
+++ b/docs/apache-airflow-providers-airbyte/connections.rst
@@ -0,0 +1,36 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+Airbyte Connection
+==================
+The Airbyte connection type use the HTTP protocol.
+
+Configuring the Connection
+--------------------------
+Host(required)
+ The host to connect to the Airbyte server.
+
+Port (required)
+ The port for the Airbyte server.
+
+Login (optional)
+ Specify the user name to connect.
+
+Password (optional)
+ Specify the password to connect.
diff --git a/docs/apache-airflow-providers-airbyte/index.rst b/docs/apache-airflow-providers-airbyte/index.rst
new file mode 100644
index 0000000..d83f5e0
--- /dev/null
+++ b/docs/apache-airflow-providers-airbyte/index.rst
@@ -0,0 +1,121 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+``apache-airflow-providers-airbyte``
+====================================
+
+Content
+-------
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Guides
+
+ Operators <operators/airbyte>
+ Connection types <connections>
+
+.. toctree::
+ :maxdepth: 1
+ :caption: References
+
+ Python API <_api/airflow/providers/airbyte/index>
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Resources
+
+ Example DAGs <https://github.com/apache/airflow/tree/master/airflow/providers/airbyte/example_dags>
+ PyPI Repository <https://pypi.org/project/apache-airflow-providers-airbyte/>
+
+.. toctree::
+ :maxdepth: 1
+ :caption: Commits
+
+ Detailed list of commits <commits>
+
+Package apache-airflow-providers-airbyte
+----------------------------------------
+
+`Airbyte <https://www.airbyte.io/>`__
+
+
+Release: 1.0.0
+
+Provider package
+----------------
+
+This is a provider package for ``airbyte`` provider. All classes for this provider package
+are in ``airflow.providers.airbyte`` python package.
+
+Installation
+------------
+
+.. note::
+
+ On November 2020, new version of PIP (20.3) has been released with a new, 2020 resolver. This resolver
+ does not yet work with Apache Airflow and might lead to errors in installation - depends on your choice
+ of extras. In order to install Airflow you need to either downgrade pip to version 20.2.4
+ ``pip install --upgrade pip==20.2.4`` or, in case you use Pip 20.3, you need to add option
+ ``--use-deprecated legacy-resolver`` to your pip install command.
+
+
+You can install this package on top of an existing airflow 2.* installation via
+``pip install apache-airflow-providers-airbyte``
+
+Cross provider package dependencies
+-----------------------------------
+
+Those are dependencies that might be needed in order to use all the features of the package.
+You need to install the specified backport providers package in order to use them.
+
+You can install such cross-provider dependencies when installing from PyPI. For example:
+
+.. code-block:: bash
+
+ pip install apache-airflow-providers-airbyte[http]
+
+
+================================================================================================ ========
+Dependent package Extra
+================================================================================================ ========
+`apache-airflow-providers-http <https://airflow.apache.org/docs/apache-airflow-providers-http>`_ ``http``
+================================================================================================ ========
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+Changelog
+---------
+
+1.0.0
+.....
+
+Initial version of the provider.
diff --git a/docs/apache-airflow-providers-airbyte/operators/airbyte.rst b/docs/apache-airflow-providers-airbyte/operators/airbyte.rst
new file mode 100644
index 0000000..b674627
--- /dev/null
+++ b/docs/apache-airflow-providers-airbyte/operators/airbyte.rst
@@ -0,0 +1,58 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+.. _howto/operator:AirbyteTriggerSyncOperator:
+
+AirbyteTriggerSyncOperator
+==========================
+
+Use the :class:`~airflow.providers.airbyte.operators.AirbyteTriggerSyncOperator` to
+trigger an existing ConnectionId sync job in Airbyte.
+
+.. warning::
+ This operator triggers a synchronization job in Airbyte.
+ If triggered again, this operator does not guarantee idempotency.
+ You must be aware of the source (database, API, etc) you are updating/sync and
+ the method applied to perform the operation in Airbyte.
+
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+The AirbyteTriggerSyncOperator requires the ``connection_id`` this is the uuid identifier
+create in Airbyte between a source and destination synchronization job.
+Use the ``airbyte_conn_id`` parameter to specify the Airbyte connection to use to
+connect to your account.
+
+You can trigger a synchronization job in Airflow in two ways with the Operator. The first one
+is a synchronous process. This will trigger the Airbyte job and the Operator manage the status
+of the job. Another way is use the flag ``async = True`` so the Operator only trigger the job and
+return the ``job_id`` that should be pass to the AirbyteSensor.
+
+An example using the synchronous way:
+
+.. exampleinclude:: /../../airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
+ :language: python
+ :start-after: [START howto_operator_airbyte_synchronous]
+ :end-before: [END howto_operator_airbyte_synchronous]
+
+An example using the async way:
+
+.. exampleinclude:: /../../airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
+ :language: python
+ :start-after: [START howto_operator_airbyte_asynchronous]
+ :end-before: [END howto_operator_airbyte_asynchronous]
diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst
index 601c6bc..b902868 100644
--- a/docs/apache-airflow/extra-packages-ref.rst
+++ b/docs/apache-airflow/extra-packages-ref.rst
@@ -141,6 +141,8 @@ Those are extras that add dependencies needed for integration with external serv
+---------------------+-----------------------------------------------------+-----------------------------------------------------+
| extra | install command | enables |
+=====================+=====================================================+=====================================================+
+| airbyte | ``pip install 'apache-airflow[airbyte]'`` | Airbyte hooks and operators |
++---------------------+-----------------------------------------------------+-----------------------------------------------------+
| amazon | ``pip install 'apache-airflow[amazon]'`` | Amazon Web Services |
+---------------------+-----------------------------------------------------+-----------------------------------------------------+
| azure | ``pip install 'apache-airflow[microsoft.azure]'`` | Microsoft Azure |
diff --git a/docs/integration-logos/airbyte/Airbyte.png b/docs/integration-logos/airbyte/Airbyte.png
new file mode 100644
index 0000000..0cc1d07
Binary files /dev/null and b/docs/integration-logos/airbyte/Airbyte.png differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index ace29d3..2ebb5d1 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1,6 +1,7 @@
Ack
Acyclic
Airbnb
+Airbyte
AirflowException
Aizhamal
Alphasort
@@ -420,6 +421,7 @@ acyclic
adhoc
aijamalnk
airbnb
+airbyte
airfl
airflowignore
ajax
diff --git a/setup.py b/setup.py
index 9ccd60e..5ec7d37 100644
--- a/setup.py
+++ b/setup.py
@@ -523,6 +523,7 @@ devel_hadoop = devel_minreq + hdfs + hive + kerberos + presto + webhdfs
# Dict of all providers which are part of the Apache Airflow repository together with their requirements
PROVIDERS_REQUIREMENTS: Dict[str, List[str]] = {
+ 'airbyte': [],
'amazon': amazon,
'apache.beam': apache_beam,
'apache.cassandra': cassandra,
diff --git a/tests/core/test_providers_manager.py b/tests/core/test_providers_manager.py
index 5fd0af4..7299971 100644
--- a/tests/core/test_providers_manager.py
+++ b/tests/core/test_providers_manager.py
@@ -21,6 +21,7 @@ import unittest
from airflow.providers_manager import ProvidersManager
ALL_PROVIDERS = [
+ 'apache-airflow-providers-airbyte',
'apache-airflow-providers-amazon',
'apache-airflow-providers-apache-beam',
'apache-airflow-providers-apache-cassandra',
diff --git a/tests/providers/airbyte/__init__.py b/tests/providers/airbyte/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/airbyte/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/airbyte/hooks/__init__.py b/tests/providers/airbyte/hooks/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/airbyte/hooks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/airbyte/hooks/test_airbyte.py b/tests/providers/airbyte/hooks/test_airbyte.py
new file mode 100644
index 0000000..09f10be
--- /dev/null
+++ b/tests/providers/airbyte/hooks/test_airbyte.py
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+from unittest import mock
+
+import pytest
+import requests_mock
+
+from airflow.exceptions import AirflowException
+from airflow.models import Connection
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils import db
+
+
+class TestAirbyteHook(unittest.TestCase):
+ """
+ Test all functions from Airbyte Hook
+ """
+
+ airbyte_conn_id = 'airbyte_conn_id_test'
+ connection_id = 'conn_test_sync'
+ job_id = 1
+ sync_connection_endpoint = 'http://test-airbyte:8001/api/v1/connections/sync'
+ get_job_endpoint = 'http://test-airbyte:8001/api/v1/jobs/get'
+ _mock_sync_conn_success_response_body = {'job': {'id': 1}}
+ _mock_job_status_success_response_body = {'job': {'status': 'succeeded'}}
+
+ def setUp(self):
+ db.merge_conn(
+ Connection(
+ conn_id='airbyte_conn_id_test', conn_type='http', host='http://test-airbyte', port=8001
+ )
+ )
+ self.hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id)
+
+ def return_value_get_job(self, status):
+ response = mock.Mock()
+ response.json.return_value = {'job': {'status': status}}
+ return response
+
+ @requests_mock.mock()
+ def test_submit_sync_connection(self, m):
+ m.post(
+ self.sync_connection_endpoint, status_code=200, json=self._mock_sync_conn_success_response_body
+ )
+ resp = self.hook.submit_sync_connection(connection_id=self.connection_id)
+ assert resp.status_code == 200
+ assert resp.json() == self._mock_sync_conn_success_response_body
+
+ @requests_mock.mock()
+ def test_get_job_status(self, m):
+ m.post(self.get_job_endpoint, status_code=200, json=self._mock_job_status_success_response_body)
+ resp = self.hook.get_job(job_id=self.job_id)
+ assert resp.status_code == 200
+ assert resp.json() == self._mock_job_status_success_response_body
+
+ @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+ def test_wait_for_job_succeeded(self, mock_get_job):
+ mock_get_job.side_effect = [self.return_value_get_job(self.hook.SUCCEEDED)]
+ self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0)
+ mock_get_job.assert_called_once_with(job_id=self.job_id)
+
+ @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+ def test_wait_for_job_error(self, mock_get_job):
+ mock_get_job.side_effect = [
+ self.return_value_get_job(self.hook.RUNNING),
+ self.return_value_get_job(self.hook.ERROR),
+ ]
+ with pytest.raises(AirflowException, match="Job failed"):
+ self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0)
+
+ calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)]
+ assert mock_get_job.has_calls(calls)
+
+ @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+ def test_wait_for_job_timeout(self, mock_get_job):
+ mock_get_job.side_effect = [
+ self.return_value_get_job(self.hook.PENDING),
+ self.return_value_get_job(self.hook.RUNNING),
+ self.return_value_get_job(self.hook.RUNNING),
+ ]
+ with pytest.raises(AirflowException, match="Timeout"):
+ self.hook.wait_for_job(job_id=self.job_id, wait_seconds=2, timeout=1)
+
+ calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)]
+ assert mock_get_job.has_calls(calls)
+
+ @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+ def test_wait_for_job_state_unrecognized(self, mock_get_job):
+ mock_get_job.side_effect = [
+ self.return_value_get_job(self.hook.RUNNING),
+ self.return_value_get_job("UNRECOGNIZED"),
+ ]
+ with pytest.raises(Exception, match="unexpected state"):
+ self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0)
+
+ calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)]
+ assert mock_get_job.has_calls(calls)
+
+ @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+ def test_wait_for_job_cancelled(self, mock_get_job):
+ mock_get_job.side_effect = [
+ self.return_value_get_job(self.hook.RUNNING),
+ self.return_value_get_job(self.hook.CANCELLED),
+ ]
+ with pytest.raises(AirflowException, match="Job was cancelled"):
+ self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0)
+
+ calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)]
+ assert mock_get_job.has_calls(calls)
diff --git a/tests/providers/airbyte/operators/__init__.py b/tests/providers/airbyte/operators/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/airbyte/operators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/airbyte/operators/test_airbyte.py b/tests/providers/airbyte/operators/test_airbyte.py
new file mode 100644
index 0000000..bc56c5d
--- /dev/null
+++ b/tests/providers/airbyte/operators/test_airbyte.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+from unittest import mock
+
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+
+
+class TestAirbyteTriggerSyncOp(unittest.TestCase):
+ """
+ Test execute function from Airbyte Operator
+ """
+
+ airbyte_conn_id = 'test_airbyte_conn_id'
+ connection_id = 'test_airbyte_connection'
+ job_id = 1
+ wait_seconds = 0
+ timeout = 360
+
+ @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.submit_sync_connection')
+ @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.wait_for_job', return_value=None)
+ def test_execute(self, mock_wait_for_job, mock_submit_sync_connection):
+ mock_submit_sync_connection.return_value = mock.Mock(
+ **{'json.return_value': {'job': {'id': self.job_id}}}
+ )
+
+ op = AirbyteTriggerSyncOperator(
+ task_id='test_Airbyte_op',
+ airbyte_conn_id=self.airbyte_conn_id,
+ connection_id=self.connection_id,
+ wait_seconds=self.wait_seconds,
+ timeout=self.timeout,
+ )
+ op.execute({})
+
+ mock_submit_sync_connection.assert_called_once_with(connection_id=self.connection_id)
+ mock_wait_for_job.assert_called_once_with(
+ job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout
+ )
diff --git a/tests/providers/airbyte/sensors/__init__.py b/tests/providers/airbyte/sensors/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/airbyte/sensors/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/airbyte/sensors/test_airbyte.py b/tests/providers/airbyte/sensors/test_airbyte.py
new file mode 100644
index 0000000..5bd69b8
--- /dev/null
+++ b/tests/providers/airbyte/sensors/test_airbyte.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+import pytest
+
+from airflow import AirflowException
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+
+
+class TestAirbyteJobSensor(unittest.TestCase):
+
+ task_id = "task-id"
+ airbyte_conn_id = "airbyte-conn-test"
+ job_id = 1
+ timeout = 120
+
+ def get_job(self, status):
+ response = mock.Mock()
+ response.json.return_value = {'job': {'status': status}}
+ return response
+
+ @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+ def test_done(self, mock_get_job):
+ mock_get_job.return_value = self.get_job('succeeded')
+
+ sensor = AirbyteJobSensor(
+ task_id=self.task_id,
+ airbyte_job_id=self.job_id,
+ airbyte_conn_id=self.airbyte_conn_id,
+ )
+ ret = sensor.poke(context={})
+ mock_get_job.assert_called_once_with(job_id=self.job_id)
+ assert ret
+
+ @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+ def test_failed(self, mock_get_job):
+ mock_get_job.return_value = self.get_job('failed')
+
+ sensor = AirbyteJobSensor(
+ task_id=self.task_id,
+ airbyte_job_id=self.job_id,
+ airbyte_conn_id=self.airbyte_conn_id,
+ )
+ with pytest.raises(AirflowException, match="Job failed"):
+ sensor.poke(context={})
+
+ mock_get_job.assert_called_once_with(job_id=self.job_id)
+
+ @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+ def test_running(self, mock_get_job):
+ mock_get_job.return_value = self.get_job('running')
+
+ sensor = AirbyteJobSensor(
+ task_id=self.task_id,
+ airbyte_job_id=self.job_id,
+ airbyte_conn_id=self.airbyte_conn_id,
+ )
+ ret = sensor.poke(context={})
+
+ mock_get_job.assert_called_once_with(job_id=self.job_id)
+
+ assert not ret
+
+ @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+ def test_cancelled(self, mock_get_job):
+ mock_get_job.return_value = self.get_job('cancelled')
+
+ sensor = AirbyteJobSensor(
+ task_id=self.task_id,
+ airbyte_job_id=self.job_id,
+ airbyte_conn_id=self.airbyte_conn_id,
+ )
+ with pytest.raises(AirflowException, match="Job was cancelled"):
+ sensor.poke(context={})
+
+ mock_get_job.assert_called_once_with(job_id=self.job_id)