You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/26 23:37:18 UTC

[GitHub] [airflow] marcosmarxm opened a new pull request #14492: Added new community provider Airbyte

marcosmarxm opened a new pull request #14492:
URL: https://github.com/apache/airflow/pull/14492


   This PR includes the new community provider Airbyte.
   [Airbyte](www.airbyte.io) is an open-source integration tool. It's an essential tool for working with data.
   In my opinion, it will be handy to have this integration working. (I work as a Data Engineer, and it will help a lot in my projects to be able to control Airbyte jobs through Airflow)
   
   The Airbyte API was released at the beginning of the month, so I took the opportunity to start building Operator and Hook. In a discussion on the [Airbyte Github](https://github.com/airbytehq/airbyte/issues/836) was suggested to use the UUID identification (`connectionId`) of the connection itself. This way, the Operator will act as a trigger calling the Airbyte API.
   
   There are some additional files like the logo and editing of docs that I am doing. If you can comment on it I appreciate it.
   
   ---
   
   **About the execution steps from the Operator and Hook**
   I built the **AirbyteTriggerSyncOperator** calls the **AirbyteHook** (like most providers do).
   The AirbyteHook performs a call to the `submit_job` function, and this is a request to the API that will return the `job_id` value.
   Because jobs can take a long time, a flow is performed to wait for some status other than running.
   With the `job_id` variable, the process is monitored with the `wait_for_job` function until it returns some **success** or error **status**, or **timeout**.
   
   ---
   
   I used the GoogleDataProc and OpsGenie providers as the basis for building mine.
   OpsGenie also uses API calls based on HttpHook.
   Dataproc helped me build the process of waiting for the job to finish to release the flow from the DAG.
   
   I already executed the tests locally but using my own setup. At the moment, I'd a problem configuring Airflow following CONTRIBUTORS_QUICK_START. As this is my first contribution, I think there are some points for improvement. I will be very grateful for your feedback.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588746034



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,106 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while True:
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+                continue
+
+            if state == self.RUNNING:

Review comment:
       nice catch!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587893347



##########
File path: airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the BashOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval=None,
+    start_date=days_ago(1),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_airbyte_synchronous]
+    sync_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_sync_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+    )
+    # [END howto_operator_airbyte_synchronous]
+
+    # [START howto_operator_airbyte_asynchronous]
+    async_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_async_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+        asynchronous=True,
+    )
+
+    airbyte_sensor = AirbyteJobSensor(
+        task_id='airbyte_sensor_source_dest_example',
+        airbyte_job_id=async_source_destination.output,

Review comment:
       @dstandish  and @tuanchris The CI broke in **Backport packages: wheel** step in this line. Complained that the `AirbyteTriggerSyncOperator` doesn't have `output` property. Any suggestions?
   I could use the `task_instance xcom_pull`.
   ```
           dataproc_job_id="{{task_instance.xcom_pull(task_ids='spark_task_async')}}",
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584225044



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       Very generous of you to share so many examples -- thank you.
   
   > I'm personally leaning to op >> sensor approach but many users want to do "atomic" operations 
   
   Before airflow 2.0, I was really not a big fan of the op >> sensor approach.  The task latency bothered me and I tended to try to combine more operations into a single operator.  (and this approach still has some appeal for me ) but with 2.0 and specifically the improved scheduler, there's definitely not as much reason to do so, and having the separation becomes relatively more compelling.  Although then you have the issue of coordination (e.g. xcom keys...)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584368457



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_time: Number of seconds between checks
+        :type wait_time: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: int
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_time)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json().get("job").get("status")
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+
+        if state == self.ERROR:
+            raise AirflowException(f"Job failed:\n{job}")
+        if state == self.CANCELLED:
+            raise AirflowException(f"Job was cancelled:\n{job}")
+
+    def submit_job(self, connection_id: str) -> dict:

Review comment:
       You are correct! Already changed it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587747285



##########
File path: airflow/providers/airbyte/sensors/airbyte.py
##########
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+    """
+    Check for the state of a previously submitted Airbyte job.
+
+    :param job_id: Id of the Airbyte job
+    :type job_id: str

Review comment:
       In constructor we have `airbyte_job_id` - we should align the name




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-786989481


   [The Workflow run](https://github.com/apache/airflow/actions/runs/604604607) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r585727430



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+

Review comment:
       ```suggestion
   template_fields = ('connection_id',)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586933619



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "default_airbyte_conn",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')
+        if not job_id:

Review comment:
       I changed to the `[]` pattern. Thinking about it, using `[]` will be more straightforward to help people in debugging.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584090240



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):

Review comment:
       this seems a little odd
   
   i think more standard approach would be one of the following:
   
   1. define the constants in the module directly
   2. define the constants directly on the AirbyteHook class
   3. define the constants as part of an enum class (you can make an enum class whose elements also behave like a string)  -- and just reference the values directly rather than through inheritance
   
   i would suggest using one of these 3 alternative approaches.  AirbyteJobController makes it sound much more elaborate than what it is, and there's really not a need for inheritance here.
   
   ...
   
   i think the naming and the inheritance is the main thing that's sticking out to me...
   
   even if you just called it `AirbyteJobStatus` and didn't use inheritance that would be fine

##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       this is very close to a sensor... might consider making a sensor so you can free up worker slot while waiting for job to complete.  

##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       maybe `wait_seconds` would be more explicit

##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_time: Number of seconds between checks
+        :type wait_time: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: int
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_time)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json().get("job").get("status")
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+
+        if state == self.ERROR:
+            raise AirflowException(f"Job failed:\n{job}")
+        if state == self.CANCELLED:
+            raise AirflowException(f"Job was cancelled:\n{job}")
+
+    def submit_job(self, connection_id: str) -> dict:

Review comment:
       is `connections/sync` the only type of job?
   
   the name `submit_job` sounds way more generic than just syncing a defined connection.  e.g. with like EMR maybe `submit_job` lets you pass all kinds of parameters like jarfile and args etc.  But here you can really only do one thing.  So in that case better to name the method more clearly like `submit_sync_connection` perhaps.  presumably airbyte lets you submit more kinds of jobs than simply this one case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584315717



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       > Although then you have the issue of coordination (e.g. xcom keys...)
   
   Exactly - imho having a `async` switch gives us best of both world. To those who care for performance (yes, not all care) they may do `op >> sensor` and those who want simpler DAGs can use polling in operator 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r585746606



##########
File path: docs/apache-airflow-providers-airbyte/operators/airbyte.rst
##########
@@ -0,0 +1,38 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:AirbyteTriggerSyncOperator:
+
+AirbyteTriggerSyncOperator
+==========================
+
+Use the :class:`~airflow.providers.airbyte.operators.AirbyteTriggerSyncOperator` to
+trigger an existing ConnectionId sync job in Airbyte.
+
+WARNING: This operator triggers a synchronization job in Airbyte.

Review comment:
       I think it would be best to make `job_id` optional and generate is as an `uuid` if not passed by user - in this way users won't have to care about uniqueness of `job_id`, WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-786956084


   [The Workflow run](https://github.com/apache/airflow/actions/runs/604320326) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r585728122



##########
File path: airflow/providers/airbyte/sensors/airbyte.py
##########
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+    """
+    Check for the state of a previously submitted Airbyte job.
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    """
+
+    template_fields = 'airbyte_job_id'
+    ui_color = '#6C51FD'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        airbyte_job_id: str,
+        airbyte_conn_id: str = 'default_airbyte_conn',
+        api_version: str = "v1",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.airbyte_job_id = airbyte_job_id
+        self.api_version = api_version
+
+    def poke(self, context: dict) -> bool:
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job = hook.get_job(job_id=self.airbyte_job_id)
+        status = job.json().get('job').get('status')
+
+        if status == hook.FAILED:
+            raise AirflowException(f'Job failed: \n{job}')
+        elif status == hook.CANCELLED:
+            raise AirflowException(f'Job was cancelled: \n{job}')
+        elif status == hook.SUCCEEDED:
+            self.log.debug("Job %s completed successfully.", self.airbyte_job_id)

Review comment:
       ```suggestion
               self.log.info("Job %s completed successfully.", self.airbyte_job_id)
   ```
   It's ok to use `info` this will give more information to users




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588430595



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+                continue
+
+        if state == self.ERROR:
+            raise AirflowException(f"Job failed:\n{job}")
+        if state == self.CANCELLED:
+            raise AirflowException(f"Job was cancelled:\n{job}")

Review comment:
       ```suggestion
           state = None
           start = time.monotonic()
           while True:
               if timeout and start + timeout < time.monotonic():
                   raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
               time.sleep(wait_seconds)
               try:
                   job = self.get_job(job_id=job_id)
                   state = job.json()["job"]["status"]
               except AirflowException as err:
                   self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
                   continue
               if state == self.ERROR:
                   raise AirflowException(f"Job failed:\n{job}")
               elif state == self.CANCELLED:
                   raise AirflowException(f"Job was cancelled:\n{job}")
               elif state == self.SUCCESS:
                   break
               else:
                   raise Exception(f"Encountered unexpected value for state: {state}")
   ```
   explanation:
   * evaluates state in only one section of code (before it was part of looping logic and also evaluated after loop completes)
   * fails in the case where job `status` is some unexpected value (instead of silently completing)
   * consolidates logic within the loop so there's no post-loop processing to consider
   
   what do you think @marcosmarxm ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-791787027


   [The Workflow run](https://github.com/apache/airflow/actions/runs/625973076) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584379136



##########
File path: tests/providers/airbyte/hooks/test_airbyte.py
##########
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+from unittest import mock
+
+import pytest
+import requests
+import requests_mock
+
+from airflow.exceptions import AirflowException
+from airflow.models import Connection
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+
+AIRBYTE_CONN_ID = 'test'
+CONNECTION_ID = {"connectionId": "test"}
+JOB_ID = 1
+
+
+def get_airbyte_connection(unused_conn_id=None):
+    return Connection(conn_id='test', conn_type='http', host='test:8001/')
+
+
+def mock_init(*args, **kwargs):
+    pass
+
+
+class TestAirbyteHook(unittest.TestCase):
+    """Test get, post and raise_for_status"""
+
+    def setUp(self):
+        session = requests.Session()
+        adapter = requests_mock.Adapter()
+        session.mount('mock', adapter)
+        get_airbyte_connection()

Review comment:
       does this have any effect?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r585726942



##########
File path: airflow/providers/airbyte/sensors/airbyte.py
##########
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+    """
+    Check for the state of a previously submitted Airbyte job.
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    """
+
+    template_fields = 'airbyte_job_id'

Review comment:
       ```suggestion
       template_fields = ('airbyte_job_id',)
   ```
   `template_fields` needs to be an iterable 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r585066065



##########
File path: tests/providers/airbyte/hooks/test_airbyte.py
##########
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+from unittest import mock
+
+import pytest
+import requests
+import requests_mock
+
+from airflow.exceptions import AirflowException
+from airflow.models import Connection
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+
+AIRBYTE_CONN_ID = 'test'
+CONNECTION_ID = {"connectionId": "test"}
+JOB_ID = 1
+
+
+def get_airbyte_connection(unused_conn_id=None):
+    return Connection(conn_id='test', conn_type='http', host='test:8001/')
+
+
+def mock_init(*args, **kwargs):
+    pass
+
+
+class TestAirbyteHook(unittest.TestCase):
+    """Test get, post and raise_for_status"""
+
+    def setUp(self):
+        session = requests.Session()
+        adapter = requests_mock.Adapter()
+        session.mount('mock', adapter)
+        get_airbyte_connection()

Review comment:
       You are right! Removed it :D




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-789553860


   @dstandish would you mind taking a second look?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-789567839


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584513785



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    :param airbyte_conn_id: Required. The name of the Airbyte connection to use

Review comment:
       ```suggestion
       :param airbyte_conn_id: Required. The name of Airflow connection to use to get connection information for Airbyte
   ```
   Not sure if this is necessary, but having both `airbyte_conn_id` and `connection_id` it may be a good idea to distinguish them more




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586523021



##########
File path: airflow/providers/airbyte/sensors/airbyte.py
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+    """
+    Check for the state of a previously submitted Airbyte job.
+
+    :param job_id: Id of the Airbyte job
+    :type job_id: str
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    """
+
+    template_fields = ('airbyte_job_id',)
+    ui_color = '#6C51FD'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        airbyte_job_id: str,
+        airbyte_conn_id: str = 'default_airbyte_conn',

Review comment:
       i think `'airbyte_default'` is more conventional.... why not stick with that convention?

##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "default_airbyte_conn", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json().get("job").get("status")

Review comment:
       here's another instance where, as kamil pointed out, it does make more sense to access with `['job']` in this case.
   
   under certain circumstances you might end up with state = `None` here which could produce confusing errors

##########
File path: airflow/providers/airbyte/sensors/airbyte.py
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+    """
+    Check for the state of a previously submitted Airbyte job.
+
+    :param job_id: Id of the Airbyte job
+    :type job_id: str
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    """
+
+    template_fields = ('airbyte_job_id',)
+    ui_color = '#6C51FD'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        airbyte_job_id: str,
+        airbyte_conn_id: str = 'default_airbyte_conn',
+        api_version: str = "v1",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.airbyte_job_id = airbyte_job_id
+        self.api_version = api_version
+
+    def poke(self, context: dict) -> bool:
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job = hook.get_job(job_id=self.airbyte_job_id)
+        status = job.json().get('job').get('status')

Review comment:
       use `[]`

##########
File path: docs/apache-airflow-providers-airbyte/operators/airbyte.rst
##########
@@ -0,0 +1,39 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:AirbyteTriggerSyncOperator:
+
+AirbyteTriggerSyncOperator
+==========================
+
+Use the :class:`~airflow.providers.airbyte.operators.AirbyteTriggerSyncOperator` to
+trigger an existing ConnectionId sync job in Airbyte.
+
+.. warning::
+  This operator triggers a synchronization job in Airbyte.
+  If triggered again, this operator does not guarantee idempotency.
+  You must be aware of the source (database, API, etc) you are updating/sync and
+  the method applied to perform the operation in Airbyte.
+
+
+Using the Operator

Review comment:
       it is probably worth calling out here that you can use it in sync or async mode, and if async you pair it with the sensor
   
   also, don't see a doc for sensor. i'm not a familiar with provider conventions but i suspect there should be a minimal doc for each operator / sensor




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r585221627



##########
File path: docs/apache-airflow-providers-airbyte/operators/airbyte.rst
##########
@@ -0,0 +1,38 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:AirbyteTriggerSyncOperator:
+
+AirbyteTriggerSyncOperator
+==========================
+
+Use the :class:`~airflow.providers.airbyte.operators.AirbyteTriggerSyncOperator` to
+trigger an existing ConnectionId sync job in Airbyte.
+
+WARNING: This operator triggers a synchronization job in Airbyte.

Review comment:
       @turbaszek I wrote this comment about the problem with idempotency. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-791787288


   [The Workflow run](https://github.com/apache/airflow/actions/runs/625939193) is cancelling this PR. Building image for the PR has been cancelled


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584107949



##########
File path: airflow/providers/airbyte/provider.yaml
##########
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+---
+package-name: apache-airflow-providers-airbyte
+name: Airbyte
+description: |
+  `Airbyte <https://airbyte.io/>`__
+
+versions:
+  - 1.0.0

Review comment:
       ```suggestion
   versions: []
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588726043



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,106 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while True:
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+                continue
+
+            if state == self.RUNNING:

Review comment:
       should this be `state in (self.RUNNING, self.PENDING)`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-789554998


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588430595



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+                continue
+
+        if state == self.ERROR:
+            raise AirflowException(f"Job failed:\n{job}")
+        if state == self.CANCELLED:
+            raise AirflowException(f"Job was cancelled:\n{job}")

Review comment:
       ```suggestion
           state = None
           start = time.monotonic()
           while True:
               if timeout and start + timeout < time.monotonic():
                   raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
               time.sleep(wait_seconds)
               try:
                   job = self.get_job(job_id=job_id)
                   state = job.json()["job"]["status"]
               except AirflowException as err:
                   self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
                   continue
               if state == self.ERROR:
                   raise AirflowException(f"Job failed:\n{job}")
               elif state == self.CANCELLED:
                   raise AirflowException(f"Job was cancelled:\n{job}")
               elif state == self.SUCCESS:
                   break
               else:
                   raise Exception(f"Encountered unexpected state `{state}` for job_id `{job_id}`")
   ```
   explanation:
   * evaluates state in only one section of code (before it was part of looping logic and also evaluated after loop completes)
   * fails in the case where job `status` is some unexpected value (instead of silently completing)
   * consolidates logic within the loop so there's no post-loop processing to consider
   
   what do you think @marcosmarxm ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584514097



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    :param airbyte_conn_id: Required. The name of the Airbyte connection to use
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination
+    :type connection_id: str
+
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        airbyte_conn_id: str,
+        connection_id: str,
+        asynchronous: bool = False,
+        api_version: str = "v1",

Review comment:
       Missing in docstring 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-786989580


   [The Workflow run](https://github.com/apache/airflow/actions/runs/604567191) is cancelling this PR. Building image for the PR has been cancelled


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586565706



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "default_airbyte_conn",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')
+        if not job_id:

Review comment:
       i think you can leave it how you have it but i defer to @turbaszek  and @mik-laj here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584512624



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -31,23 +31,36 @@ class AirbyteTriggerSyncOperator(BaseOperator):
     :type airbyte_conn_id: str
     :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination
     :type connection_id: str
+
     :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
     :type timeout: float
     """
 
     @apply_defaults
     def __init__(
-        self, airbyte_conn_id: str, connection_id: str, timeout: Optional[int] = 3600, **kwargs
+        self,
+        airbyte_conn_id: str,

Review comment:
       ```suggestion
           airbyte_conn_id: str = "default_airbyte_conn",
   ```
   
   Most of operators and hooks usually have default conn_id




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584108287



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    :param airbyte_conn_id: Required. The name of the Airbyte connection to use
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination
+    :type connection_id: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    @apply_defaults
+    def __init__(
+        self, airbyte_conn_id: str, connection_id: str, timeout: Optional[int] = 3600, **kwargs
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id)
+        job_object = hook.submit_job(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')
+        hook.wait_for_job(job_id=job_id, timeout=self.timeout)
+        self.log.info('Job %s completed successfully', job_id)

Review comment:
       ```suggestion
           self.log.info('Job %s completed successfully', job_id)
           return job_object
   ```
   In this way the object will be pushed to XCom and can be accessed by following task (for example to access some info)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RosterIn commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
RosterIn commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586242972



##########
File path: airflow/providers/airbyte/sensors/airbyte.py
##########
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+    """
+    Check for the state of a previously submitted Airbyte job.
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.

Review comment:
       Where is this parameter being used in the Sensor?

##########
File path: airflow/providers/airbyte/sensors/airbyte.py
##########
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+    """
+    Check for the state of a previously submitted Airbyte job.
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    """
+
+    template_fields = ('airbyte_job_id',)
+    ui_color = '#6C51FD'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        airbyte_job_id: str,

Review comment:
       This parameter isn't listed in the doc-string.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588430595



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+                continue
+
+        if state == self.ERROR:
+            raise AirflowException(f"Job failed:\n{job}")
+        if state == self.CANCELLED:
+            raise AirflowException(f"Job was cancelled:\n{job}")

Review comment:
       ```suggestion
           state = None
           start = time.monotonic()
           while True:
               if timeout and start + timeout < time.monotonic():
                   raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
               time.sleep(wait_seconds)
               try:
                   job = self.get_job(job_id=job_id)
                   state = job.json()["job"]["status"]
               except AirflowException as err:
                   self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
                   continue
               if state == self.ERROR:
                   raise AirflowException(f"Job failed:\n{job}")
               elif state == self.CANCELLED:
                   raise AirflowException(f"Job was cancelled:\n{job}")
               elif state == self.SUCCESS:
                   break
               else:
                   raise Exception(f"Encountered unexpected value for state: {state}")
   ```
   explanation:
   * evaluates state in only one section of code (before it was part of looping logic and also evaluated after loop completes)
   * fails in the case where job `status` is some unexpected value (instead of silently completing)
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584633201



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    :param airbyte_conn_id: Required. The name of the Airbyte connection to use
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination
+    :type connection_id: str
+
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        airbyte_conn_id: str,
+        connection_id: str,
+        asynchronous: bool = False,
+        api_version: str = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)

Review comment:
       It will trigger a new job to the connection. This will cause Airbyte to fetch new data. Airbyte is currently unable to guarantee idempotency through the API. But I already raised the case of being able to send other parameters and the problem of being idempotent.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek merged pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek merged pull request #14492:
URL: https://github.com/apache/airflow/pull/14492


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588699532



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+                continue
+
+        if state == self.ERROR:
+            raise AirflowException(f"Job failed:\n{job}")
+        if state == self.CANCELLED:
+            raise AirflowException(f"Job was cancelled:\n{job}")

Review comment:
       ok so @marcosmarxm i think we're just missing the `running` state.  currently the only way this continues to next loop is if we get HTTP error.  we just need to add the valid "still running" states to the if / else block.
   _then_ i think we should be good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584225044



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       Very generous of you to share so many examples -- thank you.
   
   > I'm personally leaning to op >> sensor approach but many users want to do "atomic" operations 
   
   Before airflow 2.0, I was really not a fan of sensors.  The task latency bothered me and I tended to try to combine more operations into a single operator.  (and this approach still has some appeal for me ) but with 2.0 and specifically the improved scheduler, there's definitely not as much reason to do so, and having the separation becomes relatively more compelling.  Although then you have the issue of coordination (e.g. xcom keys...)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588067984



##########
File path: airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the BashOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval=None,
+    start_date=days_ago(1),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_airbyte_synchronous]
+    sync_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_sync_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+    )
+    # [END howto_operator_airbyte_synchronous]
+
+    # [START howto_operator_airbyte_asynchronous]
+    async_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_async_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+        asynchronous=True,
+    )
+
+    airbyte_sensor = AirbyteJobSensor(
+        task_id='airbyte_sensor_source_dest_example',
+        airbyte_job_id=async_source_destination.output,

Review comment:
       Ok forgive me, XcomArg is new to me.  So is backports process.
   
   I found this line which suggests that for "legacy" operators (which I presume to mean < 2.0?) we must use jinja to use xcomargs. 
   
   https://github.com/apache/airflow/blob/master/airflow/models/xcom_arg.py#L46
   
   It looks like backports assume 1.10.x version of BaseOperator.   So what this suggests is that you need to change your example to use jinja.
   
   @potiuk may have some guidance as to the best thing to do here.  Perhaps examples should always be written in backport-compatible way.  Or perhaps there can be versioned example files.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587763575



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)

Review comment:
       yeah i mean looking at it more closely...
   
   one thing is i'd put `continue` after logging retry.  for one this would signal more strongly what it's doing.  but additionally you don't need to proceed within this loop in a retry scenario --- you  need to try again.
   
   but also, if the service is down or inaccessible do you want to keep trying for the full timeout duration? maybe better to limit retries to a given number with a stated retry interval that might not equal the job poke loop interval




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm edited a comment on pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm edited a comment on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-790202277


   > Also I think it's important to add example dag & documentation about the provider itself. Maybe some information about setting up the connection like:
   > https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/connections/postgres.html#postgresql-connection
   
   @RosterIn I added the `Connection type` section in the documentation and also add two examples with sync and async execution. Can you check them?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586379037



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "default_airbyte_conn",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')

Review comment:
       ``job_object.json().get('job')`` can return `None` and we will have a AttributeError. We should use [] to access dict elements or add more checks for None values. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586933619



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "default_airbyte_conn",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')
+        if not job_id:

Review comment:
       I switched to the standard `[]`. Thinking about it, using `[]` will be more straightforward to help people in debugging.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584304954



##########
File path: docs/apache-airflow-providers-airbyte/index.rst
##########
@@ -0,0 +1,107 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+``apache-airflow-providers-airbyte``
+========================================

Review comment:
       ```suggestion
   ``apache-airflow-providers-airbyte``
   ============================
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587935083



##########
File path: airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the BashOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval=None,
+    start_date=days_ago(1),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_airbyte_synchronous]
+    sync_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_sync_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+    )
+    # [END howto_operator_airbyte_synchronous]
+
+    # [START howto_operator_airbyte_asynchronous]
+    async_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_async_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+        asynchronous=True,
+    )
+
+    airbyte_sensor = AirbyteJobSensor(
+        task_id='airbyte_sensor_source_dest_example',
+        airbyte_job_id=async_source_destination.output,

Review comment:
       > I could use the task_instance xcom_pull
   
   yeah that `output` thing is new... i think it may be only when you use `TaskFlow` api
   
   i cannot look more deeply until possibly this evening




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584094198



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_time: Number of seconds between checks
+        :type wait_time: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: int
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_time)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json().get("job").get("status")
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+
+        if state == self.ERROR:
+            raise AirflowException(f"Job failed:\n{job}")
+        if state == self.CANCELLED:
+            raise AirflowException(f"Job was cancelled:\n{job}")
+
+    def submit_job(self, connection_id: str) -> dict:
+        """
+        Submits a job to a Airbyte server.
+
+        :param connection_id: Required. The ConnectionId of the Airbyte Connection.
+        :type connectiond_id: str
+        """
+        return self.run(
+            endpoint="api/v1/connections/sync",

Review comment:
       Should we make the api version configurable? It may change in future I think




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586535877



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "default_airbyte_conn", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json().get("job").get("status")

Review comment:
       Got it!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584516947



##########
File path: airflow/providers/airbyte/sensors/airbyte.py
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+    """
+    Check for the state of a previously submitted Dataproc job.
+
+    :param project_id: The ID of the google cloud project in which
+        to create the cluster. (templated)
+    :type project_id: str
+    :param dataproc_job_id: The Dataproc job ID to poll. (templated)
+    :type dataproc_job_id: str
+    :param location: Required. The Cloud Dataproc region in which to handle the request. (templated)
+    :type location: str
+    :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform.
+    :type gcp_conn_id: str
+    """
+
+    template_fields = 'airbyte_job_id'
+    ui_color = '#f0eee4'

Review comment:
       ```suggestion
       ui_color = '#6C51FD'
   ```
   How about something more Airbyte-like?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r585728411



##########
File path: airflow/providers/airbyte/sensors/airbyte.py
##########
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+    """
+    Check for the state of a previously submitted Airbyte job.
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    """
+
+    template_fields = 'airbyte_job_id'
+    ui_color = '#6C51FD'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        airbyte_job_id: str,
+        airbyte_conn_id: str = 'default_airbyte_conn',
+        api_version: str = "v1",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.airbyte_job_id = airbyte_job_id
+        self.api_version = api_version
+
+    def poke(self, context: dict) -> bool:
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job = hook.get_job(job_id=self.airbyte_job_id)
+        status = job.json().get('job').get('status')
+
+        if status == hook.FAILED:
+            raise AirflowException(f'Job failed: \n{job}')
+        elif status == hook.CANCELLED:
+            raise AirflowException(f'Job was cancelled: \n{job}')
+        elif status == hook.SUCCEEDED:
+            self.log.debug("Job %s completed successfully.", self.airbyte_job_id)
+            return True
+        elif status == hook.ERROR:
+            self.log.debug("Job %s attempt has failed.", self.airbyte_job_id)

Review comment:
       ```suggestion
               self.log.info("Job %s attempt has failed.", self.airbyte_job_id)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584517335



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    :param airbyte_conn_id: Required. The name of the Airbyte connection to use
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination
+    :type connection_id: str
+
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        airbyte_conn_id: str,
+        connection_id: str,
+        asynchronous: bool = False,
+        api_version: str = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)

Review comment:
       How this operator will behave in case of submitting the same job two times?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586564036



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "default_airbyte_conn",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')
+        if not job_id:

Review comment:
       @dstandish so, can I remove the `if not job_id` logic and let's trow the default error when not find the key?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588720598



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+                continue
+
+        if state == self.ERROR:
+            raise AirflowException(f"Job failed:\n{job}")
+        if state == self.CANCELLED:
+            raise AirflowException(f"Job was cancelled:\n{job}")

Review comment:
       I already added :D




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584094840



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       yeah like an operator that is also a sensor? like it submits the job and immediately starts sensing it?  is there an example of this you can point me to @turbaszek?  i was gonna suggest this but you have to store job_id somewhere and i know folks ran into issues before where xcom clears itself when sensor wakes up again thus purging the job id.  i'm curious how it has been implemented with google.  maybe xcom persistence issue was resolved with introduction of xcom backends?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587809789



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "airbyte_default",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)

Review comment:
       I agreed, also changed in the Sensor class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584102199



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       >  is there an example of this you can point me to @turbaszek?
   
   Configurable waiting:
   https://github.com/apache/airflow/blob/13854c32a38787af6d8a52ab2465cb6185c0b74c/airflow/providers/google/cloud/operators/dataproc.py#L895
   https://github.com/apache/airflow/blob/13854c32a38787af6d8a52ab2465cb6185c0b74c/airflow/providers/google/cloud/operators/dlp.py#L263
   https://github.com/apache/airflow/blob/13854c32a38787af6d8a52ab2465cb6185c0b74c/airflow/providers/google/cloud/operators/dataflow.py#L93
   
   Waiting in operator:
   https://github.com/apache/airflow/blob/13854c32a38787af6d8a52ab2465cb6185c0b74c/airflow/providers/google/cloud/operators/datafusion.py#L104
   in general search for any `.result()` in GCP that does polling
   https://github.com/apache/airflow/blob/13854c32a38787af6d8a52ab2465cb6185c0b74c/airflow/providers/google/cloud/operators/bigquery.py#L2143
   
   > i'm curious how it has been implemented with google.
   
   I assume your question is mostly about cases like reruning tasks. So the answer is that idempotence is the key... some examples:
   https://github.com/apache/airflow/blob/13854c32a38787af6d8a52ab2465cb6185c0b74c/airflow/providers/google/cloud/operators/bigquery.py#L2030-L2037
   
   This of course requires an API that allows you do some idempotency handling and this of course is not always possible. As a rule of thumb this can be done by:
   1. Generating unique id which is deterministic using (dag_id, task_id, exec_date) and includes some hash of users input
   1. Reattaching to existing jobs/operations
   
   That of course has a lot of edge cases like "reattach only to running jobs" or let users override the id of operation.
   
   I'm personally leaning to `op >> sensor` approach but many users want to do "atomic" operations like `create` or `submit`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-791766204


   [The Workflow run](https://github.com/apache/airflow/actions/runs/625797986) is cancelling this PR. Building image for the PR has been cancelled


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587893347



##########
File path: airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the BashOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval=None,
+    start_date=days_ago(1),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_airbyte_synchronous]
+    sync_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_sync_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+    )
+    # [END howto_operator_airbyte_synchronous]
+
+    # [START howto_operator_airbyte_asynchronous]
+    async_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_async_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+        asynchronous=True,
+    )
+
+    airbyte_sensor = AirbyteJobSensor(
+        task_id='airbyte_sensor_source_dest_example',
+        airbyte_job_id=async_source_destination.output,

Review comment:
       @dstandish  and @tuanchris The CI broke in **Backport packages: wheel** step in this line. Complained that the `AirbyteTriggerSyncOperator` doesn't have `output` property. Any suggestions?
   I could use the `task_instance xcom_pull`.
   ```
           airbyte_job_id="{{task_instance.xcom_pull(task_ids='airbyte_async_source_dest_example')}}",
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586510009



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "default_airbyte_conn",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')
+        if not job_id:

Review comment:
       @mik-laj I kept the `.get()` method and create a raise exception when not found the job_id. Also added for another case in the **sensor** too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588156429



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)

Review comment:
       I kept the exception and add a `continue`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-791959947


   Nice work! 🚀 Merging as all comments were addressed and the CI failures seems to be unrelated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587934022



##########
File path: airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the BashOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval=None,
+    start_date=days_ago(1),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_airbyte_synchronous]
+    sync_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_sync_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+    )
+    # [END howto_operator_airbyte_synchronous]
+
+    # [START howto_operator_airbyte_asynchronous]
+    async_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_async_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+        asynchronous=True,
+    )
+
+    airbyte_sensor = AirbyteJobSensor(
+        task_id='airbyte_sensor_source_dest_example',
+        airbyte_job_id=async_source_destination.output,

Review comment:
       maybe you meant to tag @turbaszek 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r585738747



##########
File path: docs/apache-airflow-providers-airbyte/operators/airbyte.rst
##########
@@ -0,0 +1,38 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:AirbyteTriggerSyncOperator:
+
+AirbyteTriggerSyncOperator
+==========================
+
+Use the :class:`~airflow.providers.airbyte.operators.AirbyteTriggerSyncOperator` to
+trigger an existing ConnectionId sync job in Airbyte.
+
+WARNING: This operator triggers a synchronization job in Airbyte.

Review comment:
       After triggering the job using the `connectionId` at the moment it is only possible to follow the status of the job in progress using the `/jobs/get`. Trigger the Operator again will queue a new synchronization job with a new `job_id`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-790205365


   [The Workflow run](https://github.com/apache/airflow/actions/runs/619447959) is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586538602



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "default_airbyte_conn",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')
+        if not job_id:

Review comment:
       yeah i mean i think eiher approach is fine
   
   if i were doing it maybe i'd use `[]` because it's less code and does the same thing (and it doesn't cause people to double check the logic ;) ).  but yeah either way fine with me!
   
   _(after mik-laj brought it up i just wanted to chime in with agreement.... i had noticed all the usage of `get` too in my first round of review but did not mention it because it doesn't make too much difference)_




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588723220



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+                continue
+
+        if state == self.ERROR:
+            raise AirflowException(f"Job failed:\n{job}")
+        if state == self.CANCELLED:
+            raise AirflowException(f"Job was cancelled:\n{job}")

Review comment:
       > I already added :D
   
   as usual, nice :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586934907



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "default_airbyte_conn",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')

Review comment:
       @mik-laj using the `[]` should I use a try/except block to manage the error or can let it throw like this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588413058



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)

Review comment:
       sorry the continue was actually a bad suggestion.  in my mind this bit was inside the loop after your retry check
   ```python
   
           if state == self.ERROR:
               raise AirflowException(f"Job failed:\n{job}")
           if state == self.CANCELLED:
               raise AirflowException(f"Job was cancelled:\n{job}")
   ```
   but my view of the indents was obscured by all the comments.
   
   there's no point in using continue when it's already the end of the loop... 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584633201



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    :param airbyte_conn_id: Required. The name of the Airbyte connection to use
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination
+    :type connection_id: str
+
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        airbyte_conn_id: str,
+        connection_id: str,
+        asynchronous: bool = False,
+        api_version: str = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)

Review comment:
       It will trigger a new job to the connection. This will cause Airbyte to fetch new data. Airbyte is currently unable to guarantee idempotency through the API. But I already raised the case of being able to send other parameters and the problem of being idempotent.
   
   The user will need to guarantee idempotency through the construction of the DAG. Does writing this explicit in the documentation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586538602



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "default_airbyte_conn",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')
+        if not job_id:

Review comment:
       yeah i mean i think eiher approach is defensible here.
   if i were doing it maybe i'd use `[]` because it's less code and does the same thing (and it doesn't cause people to double check the logic ;) ).  but yeah either way fine :) ... at least with me
   
   i too noticed the `get` usage in first round of review but did not mention it but since it was brought up just wanted to chime in with agreement




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-790202277


   > Also I think it's important to add example dag & documentation about the provider itself. Maybe some information about setting up the connection like:
   > https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/connections/postgres.html#postgresql-connection
   
   I added the `Connection type` section in the documentation and also add two examples with sync and async execution. Can you check them?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r585729418



##########
File path: docs/apache-airflow-providers-airbyte/operators/airbyte.rst
##########
@@ -0,0 +1,38 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:AirbyteTriggerSyncOperator:
+
+AirbyteTriggerSyncOperator
+==========================
+
+Use the :class:`~airflow.providers.airbyte.operators.AirbyteTriggerSyncOperator` to
+trigger an existing ConnectionId sync job in Airbyte.
+
+WARNING: This operator triggers a synchronization job in Airbyte.

Review comment:
       Is there a possibility to reattach to already running job?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584094091



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       I think we may make it configurable (async/sync), that's how many of Google operators work. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588413058



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)

Review comment:
       sorry the continue was actually a bad suggestion.  in my mind this bit was inside the loop
   ```python
   
           if state == self.ERROR:
               raise AirflowException(f"Job failed:\n{job}")
           if state == self.CANCELLED:
               raise AirflowException(f"Job was cancelled:\n{job}")
   ```
   but my view of the indents was obscured by all the comments.
   
   there's no point in using continue when it's already the end of the loop... 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584094840



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       yeah like an operator that is also a sensor? like it submits the job and immediately starts sensing it?  is there an example of this you can point me to @turbaszek?  this came to mind here, but you have to store job_id somewhere and i know folks ran into issues before where xcom clears itself when sensor wakes up again thus purging the job id.  i'm curious how it has been implemented with google.  maybe xcom persistence issue was resolved with introduction of xcom backends?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584633201



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    :param airbyte_conn_id: Required. The name of the Airbyte connection to use
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination
+    :type connection_id: str
+
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        airbyte_conn_id: str,
+        connection_id: str,
+        asynchronous: bool = False,
+        api_version: str = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)

Review comment:
       It will trigger a new job to the connection, returning a new `job_id`. This will cause Airbyte to fetch new data. Airbyte is currently unable to guarantee idempotency through the API. But I already raised the case of being able to send other parameters and the problem of being idempotent.
   
   The user will need to guarantee idempotency through the construction of the DAG. Does writing this explicit in the documentation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587736397



##########
File path: airflow/__init__.py
##########
@@ -1,91 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-Authentication is implemented using flask_login and different environments can
-implement their own login mechanisms by providing an `airflow_login` module
-in their PYTHONPATH. airflow_login should be based off the
-`airflow.www.login`
-
-isort:skip_file

Review comment:
       I think this is unintentional




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588092881



##########
File path: airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the BashOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval=None,
+    start_date=days_ago(1),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_airbyte_synchronous]
+    sync_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_sync_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+    )
+    # [END howto_operator_airbyte_synchronous]
+
+    # [START howto_operator_airbyte_asynchronous]
+    async_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_async_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+        asynchronous=True,
+    )
+
+    airbyte_sensor = AirbyteJobSensor(
+        task_id='airbyte_sensor_source_dest_example',
+        airbyte_job_id=async_source_destination.output,

Review comment:
       Yes, in example DAGs we still use old approach to enable backporting. The XCromArgs do not work with Airflow 1.10.X




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588750313



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,106 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str

Review comment:
       just doing "last" once-over.... looks like these are actually optional but you've specified `Required` may want to double check all instances like this but other than that looks good to me 👍




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r585725808



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str, api_version: str = "v1") -> None:

Review comment:
       ```suggestion
       def __init__(self, airbyte_conn_id: str = default_airbyte_conn, api_version: str = "v1") -> None:
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r585069973



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    :param airbyte_conn_id: Required. The name of the Airbyte connection to use

Review comment:
       Totally agreed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584102199



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       >  is there an example of this you can point me to @turbaszek?
   
   Configurable waiting:
   https://github.com/apache/airflow/blob/13854c32a38787af6d8a52ab2465cb6185c0b74c/airflow/providers/google/cloud/operators/dataproc.py#L895
   https://github.com/apache/airflow/blob/13854c32a38787af6d8a52ab2465cb6185c0b74c/airflow/providers/google/cloud/operators/dlp.py#L263
   https://github.com/apache/airflow/blob/13854c32a38787af6d8a52ab2465cb6185c0b74c/airflow/providers/google/cloud/operators/dataflow.py#L93
   
   Waiting in operator:
   https://github.com/apache/airflow/blob/13854c32a38787af6d8a52ab2465cb6185c0b74c/airflow/providers/google/cloud/operators/datafusion.py#L104
   in general search for any `.result()` in GCP that does polling
   https://github.com/apache/airflow/blob/13854c32a38787af6d8a52ab2465cb6185c0b74c/airflow/providers/google/cloud/operators/bigquery.py#L2143
   
   > i'm curious how it has been implemented with google.
   
   I assume your question is mostly about cases like reruning tasks. So the answer is that idempotence is the key... some examples:
   https://github.com/apache/airflow/blob/13854c32a38787af6d8a52ab2465cb6185c0b74c/airflow/providers/google/cloud/operators/bigquery.py#L2030-L2037
   
   This of course requires an API that allows you do some idempotency handling and this of course is not always possible. As a rule of thumb we this can be done by:
   1. Generating unique id which is deterministic using (dag_id, task_id, exec_date) and includes some hash of users input
   1. Reattaching to existing jobs/operations
   
   That of course has a lot of edge cases like "reattach only to running jobs" or let users override the id of operation.
   
   I'm personally leaning to `op >> sensor` approach but many users want to do "atomic" operations like `create` or `submit`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584370491



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,91 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """Hook for Airbyte API"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str, api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json().get("job").get("status")
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+
+        if state == self.ERROR:
+            raise AirflowException(f"Job failed:\n{job}")
+        if state == self.CANCELLED:
+            raise AirflowException(f"Job was cancelled:\n{job}")
+
+    def submit_sync_connection(self, connection_id: str) -> dict:
+        """
+        Submits a job to a Airbyte server.
+
+        :param connection_id: Required. The ConnectionId of the Airbyte Connection.
+        :type connectiond_id: str
+        """
+        return self.run(
+            endpoint=f"api/{self.api_version}/connections/sync",
+            json={"connectionId": connection_id},
+            headers={"accept": "application/json"},
+        )
+
+    def get_job(self, job_id: str) -> dict:
+        """
+        Gets the resource representation for a job in Airbyte.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        """
+        return self.run(
+            endpoint=f"api/{self.api_version}/jobs/get",

Review comment:
       @turbaszek created the `api_version` this way. Users can inform from the **AirbyteOperator** the version they like to use.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587858660



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)

Review comment:
       if you want to leave it i'm good with it but i would suggest you add `continue` after retry scenario is encountered because there's no reason to finish the loop at that point




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-786981133


   [The Workflow run](https://github.com/apache/airflow/actions/runs/604491274) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586538602



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "default_airbyte_conn",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')
+        if not job_id:

Review comment:
       yeah i mean i think eiher approach is fine
   
   if i were doing it maybe i'd use `[]` because it's less code and does the same thing (and it doesn't cause people to double check the logic ;) ).  but yeah either way fine with me!
   
   _(after mik-laj brought it up i just wanted to chime in with agreement.... i had noticed all the usage of `get` too in my first round of review but did not mention it because it doesn't make much difference)_




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587949402



##########
File path: airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the BashOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval=None,
+    start_date=days_ago(1),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_airbyte_synchronous]
+    sync_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_sync_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+    )
+    # [END howto_operator_airbyte_synchronous]
+
+    # [START howto_operator_airbyte_asynchronous]
+    async_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_async_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+        asynchronous=True,
+    )
+
+    airbyte_sensor = AirbyteJobSensor(
+        task_id='airbyte_sensor_source_dest_example',
+        airbyte_job_id=async_source_destination.output,

Review comment:
       No problem @dstandish. I'd tested in a local setup the example_dag and ran without problem using the `output` with Airflow 2.0, I want to check with you how to proceed with this CI conflict. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588690394



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,100 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+                continue
+
+        if state == self.ERROR:
+            raise AirflowException(f"Job failed:\n{job}")
+        if state == self.CANCELLED:
+            raise AirflowException(f"Job was cancelled:\n{job}")

Review comment:
       I agree this approach is more straightforward.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584377006



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       > I worry a little about the async calls. Airbyte is a data integration/synchronization tool. It is strange that I want to trigger subsequent tasks before getting any results from AirbyteOperator. I agree that using the `op >> sensor` is more performative in terms of allocating Airflow resources, but I think it is good to leave these two options and create examples about them.
   
   right it's largely about about freeing up airflow resources.  but also by separating, there might be a case where you'd want to clear the sensor and not retrigger the submit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584689111



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    :param airbyte_conn_id: Required. The name of the Airbyte connection to use
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination
+    :type connection_id: str
+
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        airbyte_conn_id: str,
+        connection_id: str,
+        asynchronous: bool = False,
+        api_version: str = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)

Review comment:
       I think it would be good to mention the idempotency issue 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584512114



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -31,23 +31,36 @@ class AirbyteTriggerSyncOperator(BaseOperator):
     :type airbyte_conn_id: str
     :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination
     :type connection_id: str
+

Review comment:
       ```suggestion
   ```
   
   Unnecessary new line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587821411



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)

Review comment:
       The `get_job` function calls the run then check_response from HttpHook. This function already manages responses different from 2XX or 3XX. Maybe this try/exception could be removed.
   https://github.com/apache/airflow/blob/9f37af25ae7eb85fa8dbb70b7dbb23bbd5505323/airflow/providers/http/hooks/http.py#L143-L156




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587737276



##########
File path: airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the BashOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval=None,
+    start_date=days_ago(1),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_airbyte_synchronous]
+    sync_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_sync_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+    )
+    # [END howto_operator_airbyte_synchronous]
+
+    # [START howto_operator_airbyte_asynchronous]
+    async_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_async_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+        asynchronous=True,
+    )
+
+    airbyte_sensor = AirbyteJobSensor(
+        task_id='airbyte_sensor_source_dest_example',
+        airbyte_job_id=async_source_destination.output,
+        airbyte_conn_id='airbyte_default',
+    )
+    # [END howto_operator_airbyte_asynchronous]
+
+    sync_source_destination

Review comment:
       has no effect

##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,78 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "airbyte_default",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)

Review comment:
       super mega nit ... mightn't this be better described as a `response` object?  job object makes it sound like it's an instance of Job class. but this is requests.Response and i think calling it `response` might be more conventional.  

##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json()["job"]["status"]
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)

Review comment:
       one thing that sticks out about this is, how do you know it's an error that should be a retry?
   
   for example what if the requested job_id doesn't exist?
   
   maybe that's just what you have to accept when using the httphook?
   
   there is actually a `run_with_advanced_retry` which might be better and give you finer grain if desired.  but i don't think that needs to block this from being merged if you want to leave it 
   
   perhaps @turbaszek has a thought here?
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] RosterIn commented on pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
RosterIn commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-789570566


   Also I think it's important to add example dag & documentation about the provider itself. Maybe some information about setting up the connection like:
   https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/connections/postgres.html#postgresql-connection


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r588160607



##########
File path: airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the BashOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval=None,
+    start_date=days_ago(1),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_airbyte_synchronous]
+    sync_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_sync_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+    )
+    # [END howto_operator_airbyte_synchronous]
+
+    # [START howto_operator_airbyte_asynchronous]
+    async_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_async_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+        asynchronous=True,
+    )
+
+    airbyte_sensor = AirbyteJobSensor(
+        task_id='airbyte_sensor_source_dest_example',
+        airbyte_job_id=async_source_destination.output,

Review comment:
       I changed to support the backporting.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586932770



##########
File path: docs/apache-airflow-providers-airbyte/operators/airbyte.rst
##########
@@ -0,0 +1,39 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:AirbyteTriggerSyncOperator:
+
+AirbyteTriggerSyncOperator
+==========================
+
+Use the :class:`~airflow.providers.airbyte.operators.AirbyteTriggerSyncOperator` to
+trigger an existing ConnectionId sync job in Airbyte.
+
+.. warning::
+  This operator triggers a synchronization job in Airbyte.
+  If triggered again, this operator does not guarantee idempotency.
+  You must be aware of the source (database, API, etc) you are updating/sync and
+  the method applied to perform the operation in Airbyte.
+
+
+Using the Operator

Review comment:
       I added more documentation about using the Operator and explaining the sync/async flag linked with the example DAGs.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584092842



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_time: Number of seconds between checks
+        :type wait_time: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: int
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_time)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json().get("job").get("status")
+            except AirflowException as err:
+                self.log.info("Retrying. Airbyte API returned server error when waiting for job: %s", err)
+
+        if state == self.ERROR:
+            raise AirflowException(f"Job failed:\n{job}")
+        if state == self.CANCELLED:
+            raise AirflowException(f"Job was cancelled:\n{job}")
+
+    def submit_job(self, connection_id: str) -> dict:

Review comment:
       is `connections/sync` the only type of job?
   
   the name `submit_job` sounds way more generic than just syncing a defined connection.  e.g. with like EMR maybe `submit_job` lets you pass all kinds of parameters like jarfile and args etc.  But here you can really only do one thing.  So in that case better to name the method more clearly like `submit_sync_connection` perhaps.  presumably airbyte lets you submit more kinds of jobs than simply this one case? (or at least will likely soon)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584225044



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       Very generous of you to share so many examples -- thank you.
   
   > I'm personally leaning to op >> sensor approach but many users want to do "atomic" operations 
   
   Before airflow 2.0, I was really not a fan of sensors.  The task latency bothered me and I tended to try to combine more operations into a single operator.  (and this approach still has some appeal for me ) but with 2.0 and specifically the improved scheduler, there's definitely not as much reason to do so, and having the separation becomes relatively more compelling.  Although then you have the issue of coordination (e.g. xcom values...)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584518437



##########
File path: tests/providers/airbyte/hooks/test_airbyte.py
##########
@@ -0,0 +1,107 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+import unittest
+from unittest import mock
+
+import pytest
+import requests
+import requests_mock
+
+from airflow.exceptions import AirflowException
+from airflow.models import Connection
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+
+AIRBYTE_CONN_ID = 'test'
+CONNECTION_ID = {"connectionId": "test"}
+JOB_ID = 1
+
+
+def get_airbyte_connection(unused_conn_id=None):
+    return Connection(conn_id='test', conn_type='http', host='test:8001/')
+
+
+def mock_init(*args, **kwargs):
+    pass
+
+
+class TestAirbyteHook(unittest.TestCase):
+    """Test get, post and raise_for_status"""
+
+    def setUp(self):
+        session = requests.Session()
+        adapter = requests_mock.Adapter()
+        session.mount('mock', adapter)
+        get_airbyte_connection()
+        self.hook = AirbyteHook(airbyte_conn_id=AIRBYTE_CONN_ID)
+
+    def return_value_get_job(self, status):
+        response = mock.Mock()
+        response.json.return_value = {'job': {'status': status}}
+        return response
+
+    @requests_mock.mock()
+    def test_submit_job(self, m):
+        m.post(
+            'http://test:8001/api/v1/connections/sync', status_code=200, text='{"job":{"id": 1}}', reason='OK'
+        )
+        with mock.patch('airflow.hooks.base.BaseHook.get_connection', side_effect=get_airbyte_connection):
+            resp = self.hook.submit_sync_connection(connection_id=CONNECTION_ID)
+            assert resp.text == '{"job":{"id": 1}}'
+
+    @requests_mock.mock()
+    def test_get_job(self, m):
+        m.post(
+            'http://test:8001/api/v1/jobs/get',
+            status_code=200,
+            text='{"job":{"status": "succeeded"}}',
+            reason='OK',
+        )
+        with mock.patch('airflow.hooks.base.BaseHook.get_connection', side_effect=get_airbyte_connection):

Review comment:
       Should we use it as decorator as we do in other places?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#issuecomment-791765979


   [The Workflow run](https://github.com/apache/airflow/actions/runs/625877636) is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584514749



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    :param airbyte_conn_id: Required. The name of the Airbyte connection to use
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination
+    :type connection_id: str
+
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    @apply_defaults
+    def __init__(
+        self,
+        airbyte_conn_id: str,
+        connection_id: str,
+        asynchronous: bool = False,
+        api_version: str = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')
+        if not self.asynchronous:
+            self.log.info('Waiting for job %s to complete', job_id)

Review comment:
       How about adding one more line of log before the `if` to say "The job was submitted" or something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Added new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r584370130



##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteJobController:
+    """Airbyte job status"""
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "canceled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+
+class AirbyteHook(HttpHook, AirbyteJobController):
+    """Hook for Airbyte API"""
+
+    def __init__(self, airbyte_conn_id: str) -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+
+    def wait_for_job(self, job_id: str, wait_time: int = 3, timeout: Optional[int] = None) -> None:

Review comment:
       @turbaszek and @dstandish I created the **AirbyteSensor** class. At that moment I think it is working like **DataprocOperator**. I can send a parameter `asynchronous = True`. In this way, **AirbyteOperator** returns the `job_id` right after the job creation call. That way, the user can follow the job using AirbyteSensor.
   The DAG would look something: `op >> sensor`.
   
   For cases where performance and works slot aren't a problem, it's possible to use the `asynchronous = False` (default) parameter the operator will make calls to the `wait_for_job` function waiting for some result: error, canceled, completed.
   
   I worry a little about the async calls. Airbyte is a data integration/synchronization tool. It is strange that I want to trigger subsequent tasks before getting any results from AirbyteOperator. I agree that using the `op >> sensor` is more performative in terms of allocating Airflow resources, but I think it is good to leave these two options and create examples about them.
   
   Example Airbyte API returns from the `/connections/sync` responsible to trigger a job.
   ```json
   {
     "job": {
       "id": 0,
       "configType": "check_connection_source",
       "configId": "string",
       "createdAt": 0,
       "updatedAt": 0,
       "status": "pending"
     },
     "attempts": [
       {
         "attempt": {
           "id": 0,
           "status": "running",
           "createdAt": 0,
           "updatedAt": 0,
           "endedAt": 0,
           "bytesSynced": 0,
           "recordsSynced": 0
         },
         "logs": {
           "logLines": [
             "string"
           ]
         }
       }
     ]
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] marcosmarxm commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
marcosmarxm commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587893347



##########
File path: airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
##########
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the BashOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval=None,
+    start_date=days_ago(1),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_airbyte_synchronous]
+    sync_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_sync_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+    )
+    # [END howto_operator_airbyte_synchronous]
+
+    # [START howto_operator_airbyte_asynchronous]
+    async_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_async_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+        asynchronous=True,
+    )
+
+    airbyte_sensor = AirbyteJobSensor(
+        task_id='airbyte_sensor_source_dest_example',
+        airbyte_job_id=async_source_destination.output,

Review comment:
       @dstandish  and @tuanchris The CI broke in **Backport packages: wheel** step in this line. Complained that the `AirbyteTriggerSyncOperator` doesn't have `output` property. Any suggestions?
   I could use the `task_instance xcom_pull`.
   ```python
   airbyte_job_id="{{task_instance.xcom_pull(task_ids='airbyte_async_source_dest_example')}}",
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587808062



##########
File path: airflow/providers/airbyte/operators/airbyte.py
##########
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import Optional
+
+from airflow.models import BaseOperator
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteTriggerSyncOperator(BaseOperator):
+    """
+    This operator allows you to submit a job to an Airbyte server to run a integration
+    process between your source and destination.
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the guide:
+        :ref:`howto/operator:AirbyteTriggerSyncOperator`
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get connection
+        information for Airbyte.
+    :type airbyte_conn_id: str
+    :param connection_id: Required. The Airbyte ConnectionId UUID between a source and destination.
+    :type connection_id: str
+    :param asynchronous: Optional. Flag to get job_id after submitting the job to the Airbyte API.
+    :type asynchronous: bool
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
+    :type timeout: float
+    """
+
+    template_fields = ('connection_id',)
+
+    @apply_defaults
+    def __init__(
+        self,
+        connection_id: str,
+        airbyte_conn_id: str = "default_airbyte_conn",
+        asynchronous: Optional[bool] = False,
+        api_version: Optional[str] = "v1",
+        timeout: Optional[float] = 3600,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.connection_id = connection_id
+        self.timeout = timeout
+        self.api_version = api_version
+        self.asynchronous = asynchronous
+
+    def execute(self, context) -> None:
+        """Create Airbyte Job and wait to finish"""
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job_object = hook.submit_sync_connection(connection_id=self.connection_id)
+        job_id = job_object.json().get('job').get('id')

Review comment:
       @marcosmarxm  i think you only need to catch an error if you will change the behavior depending on the error.
   
   in general simply catching and re-raising is not helpful, except in rare cases where the error message is misleading or completely useless and you need to produce a more helpful message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587259628



##########
File path: airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the BashOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval='0 0 * * *',

Review comment:
       ```suggestion
       schedule_interval=None,
   ```
   In this way it will not be triggerded when users will turn on examples




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] turbaszek commented on a change in pull request #14492: Adds new community provider Airbyte

Posted by GitBox <gi...@apache.org>.
turbaszek commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r587259941



##########
File path: airflow/providers/airbyte/example_dags/example_airbyte_trigger_job.py
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Example DAG demonstrating the usage of the BashOperator."""
+
+from datetime import timedelta
+
+from airflow import DAG
+from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
+from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
+from airflow.utils.dates import days_ago
+
+args = {
+    'owner': 'airflow',
+}
+
+with DAG(
+    dag_id='example_airbyte_operator',
+    default_args=args,
+    schedule_interval='0 0 * * *',
+    start_date=days_ago(2),
+    dagrun_timeout=timedelta(minutes=60),
+    tags=['example'],
+) as dag:
+
+    # [START howto_operator_airbyte_synchronous]
+    sync_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_sync_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+    )
+    # [END howto_operator_airbyte_synchronous]
+
+    # [START howto_operator_airbyte_asynchronous]
+    async_source_destination = AirbyteTriggerSyncOperator(
+        task_id='airbyte_async_source_dest_example',
+        airbyte_conn_id='airbyte_default',
+        connection_id='15bc3800-82e4-48c3-a32d-620661273f28',
+        asynchronous=True,
+        do_xcom_push=True,

Review comment:
       Is it necessary, isn't it default?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org