You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/03/03 15:51:45 UTC

[GitHub] [airflow] dstandish commented on a change in pull request #14492: Adds new community provider Airbyte

dstandish commented on a change in pull request #14492:
URL: https://github.com/apache/airflow/pull/14492#discussion_r586523021



##########
File path: airflow/providers/airbyte/sensors/airbyte.py
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+    """
+    Check for the state of a previously submitted Airbyte job.
+
+    :param job_id: Id of the Airbyte job
+    :type job_id: str
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    """
+
+    template_fields = ('airbyte_job_id',)
+    ui_color = '#6C51FD'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        airbyte_job_id: str,
+        airbyte_conn_id: str = 'default_airbyte_conn',

Review comment:
       i think `'airbyte_default'` is more conventional.... why not stick with that convention?

##########
File path: airflow/providers/airbyte/hooks/airbyte.py
##########
@@ -0,0 +1,99 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import time
+from typing import Any, Optional
+
+from airflow.exceptions import AirflowException
+from airflow.providers.http.hooks.http import HttpHook
+
+
+class AirbyteHook(HttpHook):
+    """
+    Hook for Airbyte API
+
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Required. Airbyte API version.
+    :type api_version: str
+    """
+
+    RUNNING = "running"
+    SUCCEEDED = "succeeded"
+    CANCELLED = "cancelled"
+    PENDING = "pending"
+    FAILED = "failed"
+    ERROR = "error"
+
+    def __init__(self, airbyte_conn_id: str = "default_airbyte_conn", api_version: str = "v1") -> None:
+        super().__init__(http_conn_id=airbyte_conn_id)
+        self.api_version: str = api_version
+
+    def wait_for_job(self, job_id: str, wait_seconds: int = 3, timeout: Optional[float] = None) -> None:
+        """
+        Helper method which polls a job to check if it finishes.
+
+        :param job_id: Id of the Airbyte job
+        :type job_id: str
+        :param wait_seconds: Number of seconds between checks
+        :type wait_seconds: int
+        :param timeout: How many seconds wait for job to be ready. Used only if ``asynchronous`` is False
+        :type timeout: float
+        """
+        state = None
+        start = time.monotonic()
+        while state not in (self.ERROR, self.SUCCEEDED, self.CANCELLED):
+            if timeout and start + timeout < time.monotonic():
+                raise AirflowException(f"Timeout: Airbyte job {job_id} is not ready after {timeout}s")
+            time.sleep(wait_seconds)
+            try:
+                job = self.get_job(job_id=job_id)
+                state = job.json().get("job").get("status")

Review comment:
       here's another instance where, as kamil pointed out, it does make more sense to access with `['job']` in this case.
   
   under certain circumstances you might end up with state = `None` here which could produce confusing errors

##########
File path: airflow/providers/airbyte/sensors/airbyte.py
##########
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains a Airbyte Job sensor."""
+
+from airflow.exceptions import AirflowException
+from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
+from airflow.sensors.base import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class AirbyteJobSensor(BaseSensorOperator):
+    """
+    Check for the state of a previously submitted Airbyte job.
+
+    :param job_id: Id of the Airbyte job
+    :type job_id: str
+    :param airbyte_conn_id: Required. The name of the Airflow connection to get
+        connection information for Airbyte.
+    :type airbyte_conn_id: str
+    :param api_version: Optional. Airbyte API version.
+    :type api_version: str
+    """
+
+    template_fields = ('airbyte_job_id',)
+    ui_color = '#6C51FD'
+
+    @apply_defaults
+    def __init__(
+        self,
+        *,
+        airbyte_job_id: str,
+        airbyte_conn_id: str = 'default_airbyte_conn',
+        api_version: str = "v1",
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.airbyte_conn_id = airbyte_conn_id
+        self.airbyte_job_id = airbyte_job_id
+        self.api_version = api_version
+
+    def poke(self, context: dict) -> bool:
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, api_version=self.api_version)
+        job = hook.get_job(job_id=self.airbyte_job_id)
+        status = job.json().get('job').get('status')

Review comment:
       use `[]`

##########
File path: docs/apache-airflow-providers-airbyte/operators/airbyte.rst
##########
@@ -0,0 +1,39 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+.. _howto/operator:AirbyteTriggerSyncOperator:
+
+AirbyteTriggerSyncOperator
+==========================
+
+Use the :class:`~airflow.providers.airbyte.operators.AirbyteTriggerSyncOperator` to
+trigger an existing ConnectionId sync job in Airbyte.
+
+.. warning::
+  This operator triggers a synchronization job in Airbyte.
+  If triggered again, this operator does not guarantee idempotency.
+  You must be aware of the source (database, API, etc) you are updating/sync and
+  the method applied to perform the operation in Airbyte.
+
+
+Using the Operator

Review comment:
       it is probably worth calling out here that you can use it in sync or async mode, and if async you pair it with the sensor
   
   also, don't see a doc for sensor. i'm not a familiar with provider conventions but i suspect there should be a minimal doc for each operator / sensor




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org