You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/04/06 23:19:31 UTC

[airflow] 03/03: Adds new Airbyte provider (#14492)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5d05835e1d54c87f3a05b5c2bd5f1850b8506dfc
Author: Marcos Marx <ma...@users.noreply.github.com>
AuthorDate: Sat Mar 6 11:19:30 2021 -0300

    Adds new Airbyte provider (#14492)
    
    This commit add hook, operators and sensors to interact with
    Airbyte external service.
    
    (cherry picked from commit 20b72aea4dc1e25f2aa3cfe62b45ca1ff29d1cbb)
---
 CONTRIBUTING.rst                                   |  23 ++--
 INSTALL                                            |  22 ++--
 airflow/providers/airbyte/CHANGELOG.rst            |  25 ++++
 airflow/providers/airbyte/__init__.py              |  17 +++
 airflow/providers/airbyte/example_dags/__init__.py |  16 +++
 .../example_dags/example_airbyte_trigger_job.py    |  64 +++++++++++
 airflow/providers/airbyte/hooks/__init__.py        |  17 +++
 airflow/providers/airbyte/hooks/airbyte.py         | 109 ++++++++++++++++++
 airflow/providers/airbyte/operators/__init__.py    |  17 +++
 airflow/providers/airbyte/operators/airbyte.py     |  85 ++++++++++++++
 airflow/providers/airbyte/provider.yaml            |  51 +++++++++
 airflow/providers/airbyte/sensors/__init__.py      |  16 +++
 airflow/providers/airbyte/sensors/airbyte.py       |  73 ++++++++++++
 airflow/providers/dependencies.json                |   3 +
 docs/apache-airflow-providers-airbyte/commits.rst  |  27 +++++
 .../connections.rst                                |  36 ++++++
 docs/apache-airflow-providers-airbyte/index.rst    | 121 ++++++++++++++++++++
 .../operators/airbyte.rst                          |  58 ++++++++++
 docs/apache-airflow/extra-packages-ref.rst         |   2 +
 docs/integration-logos/airbyte/Airbyte.png         | Bin 0 -> 7405 bytes
 docs/spelling_wordlist.txt                         |   2 +
 setup.py                                           |   1 +
 tests/core/test_providers_manager.py               |   1 +
 tests/providers/airbyte/__init__.py                |  16 +++
 tests/providers/airbyte/hooks/__init__.py          |  16 +++
 tests/providers/airbyte/hooks/test_airbyte.py      | 126 +++++++++++++++++++++
 tests/providers/airbyte/operators/__init__.py      |  16 +++
 tests/providers/airbyte/operators/test_airbyte.py  |  55 +++++++++
 tests/providers/airbyte/sensors/__init__.py        |  16 +++
 tests/providers/airbyte/sensors/test_airbyte.py    |  93 +++++++++++++++
 30 files changed, 1102 insertions(+), 22 deletions(-)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index e82fd4e..7ac115c 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -585,17 +585,17 @@ This is the full list of those extras:
 
   .. START EXTRAS HERE
 
-all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, apache.hdfs,
-apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop,
-apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes,
-crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
-docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google,
-google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes,
-ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
-opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole,
-rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
-snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica, virtualenv, webhdfs,
-winrm, yandex, zendesk
+airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid,
+apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark,
+apache.sqoop, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
+cncf.kubernetes, crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop,
+dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api,
+github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins,
+jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql,
+mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus,
+postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
+sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica,
+virtualenv, webhdfs, winrm, yandex, zendesk
 
   .. END EXTRAS HERE
 
@@ -653,6 +653,7 @@ Here is the list of packages and their extras:
 ========================== ===========================
 Package                    Extras
 ========================== ===========================
+airbyte                    http
 amazon                     apache.hive,google,imap,mongo,mysql,postgres,ssh
 apache.beam                google
 apache.druid               apache.hive
diff --git a/INSTALL b/INSTALL
index 34fccd2..46d15f6 100644
--- a/INSTALL
+++ b/INSTALL
@@ -97,17 +97,17 @@ The list of available extras:
 
 # START EXTRAS HERE
 
-all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid, apache.hdfs,
-apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark, apache.sqoop,
-apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, cncf.kubernetes,
-crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop, dingding, discord, doc,
-docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api, github_enterprise, google,
-google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins, jira, kerberos, kubernetes,
-ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, neo4j, odbc, openfaas,
-opsgenie, oracle, pagerduty, papermill, password, pinot, plexus, postgres, presto, qds, qubole,
-rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry, sftp, singularity, slack,
-snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica, virtualenv, webhdfs,
-winrm, yandex, zendesk
+airbyte, all, all_dbs, amazon, apache.atlas, apache.beam, apache.cassandra, apache.druid,
+apache.hdfs, apache.hive, apache.kylin, apache.livy, apache.pig, apache.pinot, apache.spark,
+apache.sqoop, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant,
+cncf.kubernetes, crypto, dask, databricks, datadog, devel, devel_all, devel_ci, devel_hadoop,
+dingding, discord, doc, docker, druid, elasticsearch, exasol, facebook, ftp, gcp, gcp_api,
+github_enterprise, google, google_auth, grpc, hashicorp, hdfs, hive, http, imap, jdbc, jenkins,
+jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql, microsoft.winrm, mongo, mssql,
+mysql, neo4j, odbc, openfaas, opsgenie, oracle, pagerduty, papermill, password, pinot, plexus,
+postgres, presto, qds, qubole, rabbitmq, redis, s3, salesforce, samba, segment, sendgrid, sentry,
+sftp, singularity, slack, snowflake, spark, sqlite, ssh, statsd, tableau, telegram, trino, vertica,
+virtualenv, webhdfs, winrm, yandex, zendesk
 
 # END EXTRAS HERE
 
diff --git a/airflow/providers/airbyte/CHANGELOG.rst b/airflow/providers/airbyte/CHANGELOG.rst
new file mode 100644
index 0000000..cef7dda
--- /dev/null
+++ b/airflow/providers/airbyte/CHANGELOG.rst
@@ -0,0 +1,25 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Changelog
+---------
+
+1.0.0
+.....
+
+Initial version of the provider.
diff --git a/airflow/providers/airbyte/__init__.py b/airflow/providers/airbyte/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/airbyte/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/airbyte/example_dags/__init__.py b/airflow/providers/airbyte/example_dags/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/airbyte/example_dags/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py b/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
new file mode 100644
index 0000000..1ac62a8
--- /dev/null
+++ b/airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the AirbyteTriggerSyncOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval=None,
+    start_date=days_ago(1),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_airbyte_synchronous]
+    sync_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_sync_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+    )
+    # [END howto_operator_airbyte_synchronous]
+
+    # [START howto_operator_airbyte_asynchronous]
+    async_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_async_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+        asynchronous=True,
+    )
+
+    airbyte_sensor = AirbyteJobSensor(
+        task_id='airbyte_sensor_source_dest_example',
+        airbyte_job_id="{{task_instance.xcom_pull(task_ids='airbyte_async_source_dest_example')}}",
+        airbyte_conn_id='airbyte_default',
+    )
+    # [END howto_operator_airbyte_asynchronous]
+
+    async_source_destination >> airbyte_sensor
diff --git a/airflow/providers/airbyte/hooks/__init__.py b/airflow/providers/airbyte/hooks/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/airbyte/hooks/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/airbyte/hooks/airbyte.py b/airflow/providers/airbyte/hooks/airbyte.py
new file mode 100644
index 0000000..0aeb4f8
--- /dev/null
+++ b/airflow/providers/airbyte/hooks/airbyte.py
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: Optional[str] = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(
+        self, job_id: str, wait_seconds: Optional[float] = 3, timeout: Optional[float] = 3600
+    ) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Required. Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Optional. Number of seconds between checks.
+        :type wait_seconds: float
+        :param timeout: Optional. How many seconds wait for job to be ready.
+            Used only if ``asynchronous`` is False.
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while True:
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+                continue
+
+            if state in (self.RUNNING, self.PENDING):
+                continue
+            if state == self.SUCCEEDED:
+                break
+            if state == self.ERROR:
+                raise AirflowException(f"Job failed:\n{job}")
+            elif state == self.CANCELLED:
+                raise AirflowException(f"Job was cancelled:\n{job}")
+            else:
+                raise Exception(f"Encountered unexpected state `{state}` for job_id `{job_id}`")
+
+    def submit_sync_connection(self, connection_id: str) -> Any:
+        """
+        Submits a job to a Airbyte server.
+
+        :param connection_id: Required. The ConnectionId of the Airbyte Connection.
+        :type connectiond_id: str
+        """
+        return self.run(
+            endpoint=f"api/{self.api_version}/connections/sync",
+            json={"connectionId": connection_id},
+            headers={"accept": "application/json"},
+        )
+
+    def get_job(self, job_id: int) -> Any:
+        """
+        Gets the resource representation for a job in Airbyte.
+
+        :param job_id: Required. Id of the Airbyte job
+        :type job_id: int
+        """
+        return self.run(
+            endpoint=f"api/{self.api_version}/jobs/get",
+            json={"id": job_id},
+            headers={"accept": "application/json"},
+        )
diff --git a/airflow/providers/airbyte/operators/__init__.py b/airflow/providers/airbyte/operators/__init__.py
new file mode 100644
index 0000000..217e5db
--- /dev/null
+++ b/airflow/providers/airbyte/operators/__init__.py
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/airbyte/operators/airbyte.py b/airflow/providers/airbyte/operators/airbyte.py
new file mode 100644
index 0000000..6932fa3
--- /dev/null
+++ b/airflow/providers/airbyte/operators/airbyte.py
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+        This is useful for submitting long running jobs and
+        waiting on them asynchronously using the AirbyteJobSensor.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param wait_seconds: Optional. Number of seconds between checks. Only used when ``asynchronous`` is False.
+    :type wait_seconds: float
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+        Only used when ``asynchronous`` is False.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "airbyte_default",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        wait_seconds: Optional[float] = 3,
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.wait_seconds = wait_seconds
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json()['job']['id']
+
+        self.log.info("Job %s was submitted to Airbyte Server", job_id)
+        if not self.asynchronous:
+            self.log.info('Waiting for job %s to complete', job_id)
+            hook.wait_for_job(job_id=job_id, wait_seconds=self.wait_seconds, timeout=self.timeout)
+            self.log.info('Job %s completed successfully', job_id)
+
+        return job_id
diff --git a/airflow/providers/airbyte/provider.yaml b/airflow/providers/airbyte/provider.yaml
new file mode 100644
index 0000000..77b109f
--- /dev/null
+++ b/airflow/providers/airbyte/provider.yaml
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-airbyte
+name: Airbyte
+description: |
+  `Airbyte <https://airbyte.io/>`__
+
+versions:
+  - 1.0.0
+
+integrations:
+  - integration-name: Airbyte
+    external-doc-url: https://www.airbyte.io/
+    logo: /integration-logos/airbyte/Airbyte.png
+    how-to-guide:
+      - /docs/apache-airflow-providers-airbyte/operators/airbyte.rst
+    tags: [service]
+
+operators:
+  - integration-name: Airbyte
+    python-modules:
+      - airflow.providers.airbyte.operators.airbyte
+
+hooks:
+  - integration-name: Airbyte
+    python-modules:
+      - airflow.providers.airbyte.hooks.airbyte
+
+sensors:
+  - integration-name: Airbyte
+    python-modules:
+      - airflow.providers.airbyte.sensors.airbyte
+
+hook-class-names:
+  - airflow.providers.airbyte.hooks.airbyte.AirbyteHook
diff --git a/airflow/providers/airbyte/sensors/__init__.py b/airflow/providers/airbyte/sensors/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/airflow/providers/airbyte/sensors/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/airbyte/sensors/airbyte.py b/airflow/providers/airbyte/sensors/airbyte.py
new file mode 100644
index 0000000..9799ade
--- /dev/null
+++ b/airflow/providers/airbyte/sensors/airbyte.py
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+    """
+    Check for the state of a previously submitted Airbyte job.
+
+    :param airbyte_job_id: Required. Id of the Airbyte job
+    :type airbyte_job_id: str
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    """
+
+    template_fields = ('airbyte_job_id',)
+    ui_color = '#6C51FD'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        airbyte_job_id: str,
+        airbyte_conn_id: str = 'airbyte_default',
+        api_version: Optional[str] = "v1",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.airbyte_job_id = airbyte_job_id
+        self.api_version = api_version
+
+    def poke(self, context: dict) -> bool:
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job = hook.get_job(job_id=self.airbyte_job_id)
+        status = job.json()['job']['status']
+
+        if status == hook.FAILED:
+            raise AirflowException(f"Job failed: \n{job}")
+        elif status == hook.CANCELLED:
+            raise AirflowException(f"Job was cancelled: \n{job}")
+        elif status == hook.SUCCEEDED:
+            self.log.info("Job %s completed successfully.", self.airbyte_job_id)
+            return True
+        elif status == hook.ERROR:
+            self.log.info("Job %s attempt has failed.", self.airbyte_job_id)
+
+        self.log.info("Waiting for job %s to complete.", self.airbyte_job_id)
+        return False
diff --git a/airflow/providers/dependencies.json b/airflow/providers/dependencies.json
index 81a3ba4..6027656 100644
--- a/airflow/providers/dependencies.json
+++ b/airflow/providers/dependencies.json
@@ -1,4 +1,7 @@
 {
+  "airbyte": [
+    "http"
+  ],
   "amazon": [
     "apache.hive",
     "google",
diff --git a/docs/apache-airflow-providers-airbyte/commits.rst b/docs/apache-airflow-providers-airbyte/commits.rst
new file mode 100644
index 0000000..cae1272
--- /dev/null
+++ b/docs/apache-airflow-providers-airbyte/commits.rst
@@ -0,0 +1,27 @@
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Package apache-airflow-providers-airbyte
+----------------------------------------
+
+`Airbyte <https://airbyte.io/>`__
+
+
+This is detailed commit list of changes for versions provider package: ``airbyte``.
+For high-level changelog, see :doc:`package information including changelog <index>`.
diff --git a/docs/apache-airflow-providers-airbyte/connections.rst b/docs/apache-airflow-providers-airbyte/connections.rst
new file mode 100644
index 0000000..31b69c7
--- /dev/null
+++ b/docs/apache-airflow-providers-airbyte/connections.rst
@@ -0,0 +1,36 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+
+Airbyte Connection
+==================
+The Airbyte connection type use the HTTP protocol.
+
+Configuring the Connection
+--------------------------
+Host(required)
+    The host to connect to the Airbyte server.
+
+Port (required)
+    The port for the Airbyte server.
+
+Login (optional)
+    Specify the user name to connect.
+
+Password (optional)
+    Specify the password to connect.
diff --git a/docs/apache-airflow-providers-airbyte/index.rst b/docs/apache-airflow-providers-airbyte/index.rst
new file mode 100644
index 0000000..d83f5e0
--- /dev/null
+++ b/docs/apache-airflow-providers-airbyte/index.rst
@@ -0,0 +1,121 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+``apache-airflow-providers-airbyte``
+====================================
+
+Content
+-------
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Guides
+
+    Operators <operators/airbyte>
+    Connection types <connections>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: References
+
+    Python API <_api/airflow/providers/airbyte/index>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Resources
+
+    Example DAGs <https://github.com/apache/airflow/tree/master/airflow/providers/airbyte/example_dags>
+    PyPI Repository <https://pypi.org/project/apache-airflow-providers-airbyte/>
+
+.. toctree::
+    :maxdepth: 1
+    :caption: Commits
+
+    Detailed list of commits <commits>
+
+Package apache-airflow-providers-airbyte
+----------------------------------------
+
+`Airbyte <https://www.airbyte.io/>`__
+
+
+Release: 1.0.0
+
+Provider package
+----------------
+
+This is a provider package for ``airbyte`` provider. All classes for this provider package
+are in ``airflow.providers.airbyte`` python package.
+
+Installation
+------------
+
+.. note::
+
+    On November 2020, new version of PIP (20.3) has been released with a new, 2020 resolver. This resolver
+    does not yet work with Apache Airflow and might lead to errors in installation - depends on your choice
+    of extras. In order to install Airflow you need to either downgrade pip to version 20.2.4
+    ``pip install --upgrade pip==20.2.4`` or, in case you use Pip 20.3, you need to add option
+    ``--use-deprecated legacy-resolver`` to your pip install command.
+
+
+You can install this package on top of an existing airflow 2.* installation via
+``pip install apache-airflow-providers-airbyte``
+
+Cross provider package dependencies
+-----------------------------------
+
+Those are dependencies that might be needed in order to use all the features of the package.
+You need to install the specified backport providers package in order to use them.
+
+You can install such cross-provider dependencies when installing from PyPI. For example:
+
+.. code-block:: bash
+
+    pip install apache-airflow-providers-airbyte[http]
+
+
+================================================================================================  ========
+Dependent package                                                                                 Extra
+================================================================================================  ========
+`apache-airflow-providers-http <https://airflow.apache.org/docs/apache-airflow-providers-http>`_  ``http``
+================================================================================================  ========
+
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+Changelog
+---------
+
+1.0.0
+.....
+
+Initial version of the provider.
diff --git a/docs/apache-airflow-providers-airbyte/operators/airbyte.rst b/docs/apache-airflow-providers-airbyte/operators/airbyte.rst
new file mode 100644
index 0000000..b674627
--- /dev/null
+++ b/docs/apache-airflow-providers-airbyte/operators/airbyte.rst
@@ -0,0 +1,58 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:AirbyteTriggerSyncOperator:
+
+AirbyteTriggerSyncOperator
+==========================
+
+Use the :class:`~airflow.providers.airbyte.operators.AirbyteTriggerSyncOperator` to
+trigger an existing ConnectionId sync job in Airbyte.
+
+.. warning::
+  This operator triggers a synchronization job in Airbyte.
+  If triggered again, this operator does not guarantee idempotency.
+  You must be aware of the source (database, API, etc) you are updating/sync and
+  the method applied to perform the operation in Airbyte.
+
+
+Using the Operator
+^^^^^^^^^^^^^^^^^^
+
+The AirbyteTriggerSyncOperator requires the ``connection_id`` this is the uuid identifier
+create in Airbyte between a source and destination synchronization job.
+Use the ``airbyte_conn_id`` parameter to specify the Airbyte connection to use to
+connect to your account.
+
+You can trigger a synchronization job in Airflow in two ways with the Operator. The first one
+is a synchronous process. This will trigger the Airbyte job and the Operator manage the status
+of the job. Another way is use the flag ``async = True`` so the Operator only trigger the job and
+return the ``job_id`` that should be pass to the AirbyteSensor.
+
+An example using the synchronous way:
+
+.. exampleinclude:: /../../airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
+    :language: python
+    :start-after: [START howto_operator_airbyte_synchronous]
+    :end-before: [END howto_operator_airbyte_synchronous]
+
+An example using the async way:
+
+.. exampleinclude:: /../../airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
+    :language: python
+    :start-after: [START howto_operator_airbyte_asynchronous]
+    :end-before: [END howto_operator_airbyte_asynchronous]
diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst
index 601c6bc..b902868 100644
--- a/docs/apache-airflow/extra-packages-ref.rst
+++ b/docs/apache-airflow/extra-packages-ref.rst
@@ -141,6 +141,8 @@ Those are extras that add dependencies needed for integration with external serv
 +---------------------+-----------------------------------------------------+-----------------------------------------------------+
 | extra               | install command                                     | enables                                             |
 +=====================+=====================================================+=====================================================+
+| airbyte             | ``pip install 'apache-airflow[airbyte]'``           | Airbyte hooks and operators                         |
++---------------------+-----------------------------------------------------+-----------------------------------------------------+
 | amazon              | ``pip install 'apache-airflow[amazon]'``            | Amazon Web Services                                 |
 +---------------------+-----------------------------------------------------+-----------------------------------------------------+
 | azure               | ``pip install 'apache-airflow[microsoft.azure]'``   | Microsoft Azure                                     |
diff --git a/docs/integration-logos/airbyte/Airbyte.png b/docs/integration-logos/airbyte/Airbyte.png
new file mode 100644
index 0000000..0cc1d07
Binary files /dev/null and b/docs/integration-logos/airbyte/Airbyte.png differ
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index ace29d3..2ebb5d1 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -1,6 +1,7 @@
 Ack
 Acyclic
 Airbnb
+Airbyte
 AirflowException
 Aizhamal
 Alphasort
@@ -420,6 +421,7 @@ acyclic
 adhoc
 aijamalnk
 airbnb
+airbyte
 airfl
 airflowignore
 ajax
diff --git a/setup.py b/setup.py
index 9ccd60e..5ec7d37 100644
--- a/setup.py
+++ b/setup.py
@@ -523,6 +523,7 @@ devel_hadoop = devel_minreq + hdfs + hive + kerberos + presto + webhdfs
 
 # Dict of all providers which are part of the Apache Airflow repository together with their requirements
 PROVIDERS_REQUIREMENTS: Dict[str, List[str]] = {
+    'airbyte': [],
     'amazon': amazon,
     'apache.beam': apache_beam,
     'apache.cassandra': cassandra,
diff --git a/tests/core/test_providers_manager.py b/tests/core/test_providers_manager.py
index 5fd0af4..7299971 100644
--- a/tests/core/test_providers_manager.py
+++ b/tests/core/test_providers_manager.py
@@ -21,6 +21,7 @@ import unittest
 from airflow.providers_manager import ProvidersManager
 
 ALL_PROVIDERS = [
+    'apache-airflow-providers-airbyte',
     'apache-airflow-providers-amazon',
     'apache-airflow-providers-apache-beam',
     'apache-airflow-providers-apache-cassandra',
diff --git a/tests/providers/airbyte/__init__.py b/tests/providers/airbyte/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/airbyte/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/airbyte/hooks/__init__.py b/tests/providers/airbyte/hooks/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/airbyte/hooks/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/airbyte/hooks/test_airbyte.py b/tests/providers/airbyte/hooks/test_airbyte.py
new file mode 100644
index 0000000..09f10be
--- /dev/null
+++ b/tests/providers/airbyte/hooks/test_airbyte.py
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+from unittest import mock
+
+import pytest
+import requests_mock
+
+from airflow.exceptions import AirflowException
+from airflow.models import Connection
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils import db
+
+
+class TestAirbyteHook(unittest.TestCase):
+    """
+    Test all functions from Airbyte Hook
+    """
+
+    airbyte_conn_id = 'airbyte_conn_id_test'
+    connection_id = 'conn_test_sync'
+    job_id = 1
+    sync_connection_endpoint = 'http://test-airbyte:8001/api/v1/connections/sync'
+    get_job_endpoint = 'http://test-airbyte:8001/api/v1/jobs/get'
+    _mock_sync_conn_success_response_body = {'job': {'id': 1}}
+    _mock_job_status_success_response_body = {'job': {'status': 'succeeded'}}
+
+    def setUp(self):
+        db.merge_conn(
+            Connection(
+                conn_id='airbyte_conn_id_test', conn_type='http', host='http://test-airbyte', port=8001
+            )
+        )
+        self.hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id)
+
+    def return_value_get_job(self, status):
+        response = mock.Mock()
+        response.json.return_value = {'job': {'status': status}}
+        return response
+
+    @requests_mock.mock()
+    def test_submit_sync_connection(self, m):
+        m.post(
+            self.sync_connection_endpoint, status_code=200, json=self._mock_sync_conn_success_response_body
+        )
+        resp = self.hook.submit_sync_connection(connection_id=self.connection_id)
+        assert resp.status_code == 200
+        assert resp.json() == self._mock_sync_conn_success_response_body
+
+    @requests_mock.mock()
+    def test_get_job_status(self, m):
+        m.post(self.get_job_endpoint, status_code=200, json=self._mock_job_status_success_response_body)
+        resp = self.hook.get_job(job_id=self.job_id)
+        assert resp.status_code == 200
+        assert resp.json() == self._mock_job_status_success_response_body
+
+    @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+    def test_wait_for_job_succeeded(self, mock_get_job):
+        mock_get_job.side_effect = [self.return_value_get_job(self.hook.SUCCEEDED)]
+        self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0)
+        mock_get_job.assert_called_once_with(job_id=self.job_id)
+
+    @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+    def test_wait_for_job_error(self, mock_get_job):
+        mock_get_job.side_effect = [
+            self.return_value_get_job(self.hook.RUNNING),
+            self.return_value_get_job(self.hook.ERROR),
+        ]
+        with pytest.raises(AirflowException, match="Job failed"):
+            self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0)
+
+        calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)]
+        assert mock_get_job.has_calls(calls)
+
+    @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+    def test_wait_for_job_timeout(self, mock_get_job):
+        mock_get_job.side_effect = [
+            self.return_value_get_job(self.hook.PENDING),
+            self.return_value_get_job(self.hook.RUNNING),
+            self.return_value_get_job(self.hook.RUNNING),
+        ]
+        with pytest.raises(AirflowException, match="Timeout"):
+            self.hook.wait_for_job(job_id=self.job_id, wait_seconds=2, timeout=1)
+
+        calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)]
+        assert mock_get_job.has_calls(calls)
+
+    @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+    def test_wait_for_job_state_unrecognized(self, mock_get_job):
+        mock_get_job.side_effect = [
+            self.return_value_get_job(self.hook.RUNNING),
+            self.return_value_get_job("UNRECOGNIZED"),
+        ]
+        with pytest.raises(Exception, match="unexpected state"):
+            self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0)
+
+        calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)]
+        assert mock_get_job.has_calls(calls)
+
+    @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+    def test_wait_for_job_cancelled(self, mock_get_job):
+        mock_get_job.side_effect = [
+            self.return_value_get_job(self.hook.RUNNING),
+            self.return_value_get_job(self.hook.CANCELLED),
+        ]
+        with pytest.raises(AirflowException, match="Job was cancelled"):
+            self.hook.wait_for_job(job_id=self.job_id, wait_seconds=0)
+
+        calls = [mock.call(job_id=self.job_id), mock.call(job_id=self.job_id)]
+        assert mock_get_job.has_calls(calls)
diff --git a/tests/providers/airbyte/operators/__init__.py b/tests/providers/airbyte/operators/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/airbyte/operators/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/airbyte/operators/test_airbyte.py b/tests/providers/airbyte/operators/test_airbyte.py
new file mode 100644
index 0000000..bc56c5d
--- /dev/null
+++ b/tests/providers/airbyte/operators/test_airbyte.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+from unittest import mock
+
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+
+
+class TestAirbyteTriggerSyncOp(unittest.TestCase):
+    """
+    Test execute function from Airbyte Operator
+    """
+
+    airbyte_conn_id = 'test_airbyte_conn_id'
+    connection_id = 'test_airbyte_connection'
+    job_id = 1
+    wait_seconds = 0
+    timeout = 360
+
+    @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.submit_sync_connection')
+    @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.wait_for_job', return_value=None)
+    def test_execute(self, mock_wait_for_job, mock_submit_sync_connection):
+        mock_submit_sync_connection.return_value = mock.Mock(
+            **{'json.return_value': {'job': {'id': self.job_id}}}
+        )
+
+        op = AirbyteTriggerSyncOperator(
+            task_id='test_Airbyte_op',
+            airbyte_conn_id=self.airbyte_conn_id,
+            connection_id=self.connection_id,
+            wait_seconds=self.wait_seconds,
+            timeout=self.timeout,
+        )
+        op.execute({})
+
+        mock_submit_sync_connection.assert_called_once_with(connection_id=self.connection_id)
+        mock_wait_for_job.assert_called_once_with(
+            job_id=self.job_id, wait_seconds=self.wait_seconds, timeout=self.timeout
+        )
diff --git a/tests/providers/airbyte/sensors/__init__.py b/tests/providers/airbyte/sensors/__init__.py
new file mode 100644
index 0000000..13a8339
--- /dev/null
+++ b/tests/providers/airbyte/sensors/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/airbyte/sensors/test_airbyte.py b/tests/providers/airbyte/sensors/test_airbyte.py
new file mode 100644
index 0000000..5bd69b8
--- /dev/null
+++ b/tests/providers/airbyte/sensors/test_airbyte.py
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from unittest import mock
+
+import pytest
+
+from airflow import AirflowException
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+
+
+class TestAirbyteJobSensor(unittest.TestCase):
+
+    task_id = "task-id"
+    airbyte_conn_id = "airbyte-conn-test"
+    job_id = 1
+    timeout = 120
+
+    def get_job(self, status):
+        response = mock.Mock()
+        response.json.return_value = {'job': {'status': status}}
+        return response
+
+    @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+    def test_done(self, mock_get_job):
+        mock_get_job.return_value = self.get_job('succeeded')
+
+        sensor = AirbyteJobSensor(
+            task_id=self.task_id,
+            airbyte_job_id=self.job_id,
+            airbyte_conn_id=self.airbyte_conn_id,
+        )
+        ret = sensor.poke(context={})
+        mock_get_job.assert_called_once_with(job_id=self.job_id)
+        assert ret
+
+    @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+    def test_failed(self, mock_get_job):
+        mock_get_job.return_value = self.get_job('failed')
+
+        sensor = AirbyteJobSensor(
+            task_id=self.task_id,
+            airbyte_job_id=self.job_id,
+            airbyte_conn_id=self.airbyte_conn_id,
+        )
+        with pytest.raises(AirflowException, match="Job failed"):
+            sensor.poke(context={})
+
+        mock_get_job.assert_called_once_with(job_id=self.job_id)
+
+    @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+    def test_running(self, mock_get_job):
+        mock_get_job.return_value = self.get_job('running')
+
+        sensor = AirbyteJobSensor(
+            task_id=self.task_id,
+            airbyte_job_id=self.job_id,
+            airbyte_conn_id=self.airbyte_conn_id,
+        )
+        ret = sensor.poke(context={})
+
+        mock_get_job.assert_called_once_with(job_id=self.job_id)
+
+        assert not ret
+
+    @mock.patch('airflow.providers.airbyte.hooks.airbyte.AirbyteHook.get_job')
+    def test_cancelled(self, mock_get_job):
+        mock_get_job.return_value = self.get_job('cancelled')
+
+        sensor = AirbyteJobSensor(
+            task_id=self.task_id,
+            airbyte_job_id=self.job_id,
+            airbyte_conn_id=self.airbyte_conn_id,
+        )
+        with pytest.raises(AirflowException, match="Job was cancelled"):
+            sensor.poke(context={})
+
+        mock_get_job.assert_called_once_with(job_id=self.job_id)