You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/06/27 22:57:31 UTC

[airflow] branch main updated: D205 Support - Providers: Airbyte and Alibaba (#32214)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0bc689ee6d D205 Support - Providers: Airbyte and Alibaba (#32214)
0bc689ee6d is described below

commit 0bc689ee6d4b6967d7ae99a202031aac14d181a2
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Tue Jun 27 15:57:23 2023 -0700

    D205 Support - Providers: Airbyte and Alibaba (#32214)
---
 airflow/providers/airbyte/operators/airbyte.py        |  3 +--
 .../providers/alibaba/cloud/hooks/analyticdb_spark.py |  4 +++-
 airflow/providers/alibaba/cloud/hooks/oss.py          | 10 ++--------
 .../providers/alibaba/cloud/log/oss_task_handler.py   | 19 +++++++++----------
 .../alibaba/cloud/operators/analyticdb_spark.py       |  6 ++----
 airflow/providers/alibaba/cloud/sensors/oss_key.py    |  6 +++---
 6 files changed, 20 insertions(+), 28 deletions(-)

diff --git a/airflow/providers/airbyte/operators/airbyte.py b/airflow/providers/airbyte/operators/airbyte.py
index 7137ca80f1..6d101662db 100644
--- a/airflow/providers/airbyte/operators/airbyte.py
+++ b/airflow/providers/airbyte/operators/airbyte.py
@@ -28,8 +28,7 @@ if TYPE_CHECKING:
 
 class AirbyteTriggerSyncOperator(BaseOperator):
     """
-    This operator allows you to submit a job to an Airbyte server to run a integration
-    process between your source and destination.
+    Submits a job to an Airbyte server to run a integration process between your source and destination.
 
     .. seealso::
         For more information on how to use this operator, take a look at the guide:
diff --git a/airflow/providers/alibaba/cloud/hooks/analyticdb_spark.py b/airflow/providers/alibaba/cloud/hooks/analyticdb_spark.py
index bf3eca1722..49761f18b2 100644
--- a/airflow/providers/alibaba/cloud/hooks/analyticdb_spark.py
+++ b/airflow/providers/alibaba/cloud/hooks/analyticdb_spark.py
@@ -39,7 +39,9 @@ from airflow.utils.log.logging_mixin import LoggingMixin
 
 class AppState(Enum):
     """
-    AnalyticDB Spark application states doc:
+    AnalyticDB Spark application states.
+
+    See:
     https://www.alibabacloud.com/help/en/analyticdb-for-mysql/latest/api-doc-adb-2021-12-01-api-struct
     -sparkappinfo.
 
diff --git a/airflow/providers/alibaba/cloud/hooks/oss.py b/airflow/providers/alibaba/cloud/hooks/oss.py
index a5a862b384..19991cf350 100644
--- a/airflow/providers/alibaba/cloud/hooks/oss.py
+++ b/airflow/providers/alibaba/cloud/hooks/oss.py
@@ -35,10 +35,7 @@ T = TypeVar("T", bound=Callable)
 
 
 def provide_bucket_name(func: T) -> T:
-    """
-    Function decorator that unifies bucket name and key taken from the key
-    in case no bucket name and at least a key has been passed to the function.
-    """
+    """Function decorator that unifies bucket name and key  is a key is provided but not a bucket name."""
     function_signature = signature(func)
 
     @wraps(func)
@@ -56,10 +53,7 @@ def provide_bucket_name(func: T) -> T:
 
 
 def unify_bucket_name_and_key(func: T) -> T:
-    """
-    Function decorator that unifies bucket name and key taken from the key
-    in case no bucket name and at least a key has been passed to the function.
-    """
+    """Function decorator that unifies bucket name and key  is a key is provided but not a bucket name."""
     function_signature = signature(func)
 
     @wraps(func)
diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
index 7d35f0861c..484ebe4f8e 100644
--- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py
+++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
@@ -45,9 +45,9 @@ def get_default_delete_local_copy():
 
 class OSSTaskHandler(FileTaskHandler, LoggingMixin):
     """
-    OSSTaskHandler is a python log handler that handles and reads
-    task instance logs. It extends airflow FileTaskHandler and
-    uploads to and reads from OSS remote storage.
+    OSSTaskHandler is a python log handler that handles and reads task instance logs.
+
+    Extends airflow FileTaskHandler and uploads to and reads from OSS remote storage.
     """
 
     def __init__(self, base_log_folder, oss_log_folder, filename_template=None, **kwargs):
@@ -120,6 +120,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
     def _read(self, ti, try_number, metadata=None):
         """
         Read logs of given task instance and try_number from OSS remote storage.
+
         If failed, read the log from task instance host machine.
 
         :param ti: task instance object
@@ -128,8 +129,8 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
                          can be used for steaming log reading and auto-tailing.
         """
         # Explicitly getting log relative path is necessary as the given
-        # task instance might be different than task instance passed in
-        # in set_context method.
+        # task instance might be different from task instance passed in
+        # set_context method.
         log_relative_path = self._render_filename(ti, try_number)
         remote_loc = log_relative_path
 
@@ -156,12 +157,11 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
 
     def oss_read(self, remote_log_location, return_error=False):
         """
-        Returns the log found at the remote_log_location. Returns '' if no
-        logs are found or there is an error.
+        Returns the log at the remote_log_location. Returns '' if no logs are found or there is an error.
 
         :param remote_log_location: the log's location in remote storage
         :param return_error: if True, returns a string error message if an
-            error occurs. Otherwise returns '' when an error occurs.
+            error occurs. Otherwise, returns '' when an error occurs.
         """
         try:
             oss_remote_log_location = f"{self.base_folder}/{remote_log_location}"
@@ -176,8 +176,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
 
     def oss_write(self, log, remote_log_location, append=True) -> bool:
         """
-        Writes the log to the remote_log_location and return `True` when done. Fails silently
-         and return `False` if no log was created.
+        Write the log to remote_log_location and return `True`; fails silently and returns `False` on error.
 
         :param log: the log to write to the remote_log_location
         :param remote_log_location: the log's location in remote storage
diff --git a/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py b/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
index 6ddd47dab2..0fd65e1ac6 100644
--- a/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
+++ b/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
@@ -99,8 +99,7 @@ class AnalyticDBSparkBaseOperator(BaseOperator):
 
 class AnalyticDBSparkSQLOperator(AnalyticDBSparkBaseOperator):
     """
-    This operator warps the AnalyticDB Spark REST API, allowing to submit a Spark sql
-    application to the underlying cluster.
+    Submits a Spark SQL application to the underlying cluster; wraps the AnalyticDB Spark REST API.
 
     :param sql: The SQL query to execute.
     :param conf: Spark configuration properties.
@@ -153,8 +152,7 @@ class AnalyticDBSparkSQLOperator(AnalyticDBSparkBaseOperator):
 
 class AnalyticDBSparkBatchOperator(AnalyticDBSparkBaseOperator):
     """
-    This operator warps the AnalyticDB Spark REST API, allowing to submit a Spark batch
-    application to the underlying cluster.
+    Submits a Spark batch application to the underlying cluster; wraps the AnalyticDB Spark REST API.
 
     :param file: path of the file containing the application to execute.
     :param class_name: name of the application Java/Spark main class.
diff --git a/airflow/providers/alibaba/cloud/sensors/oss_key.py b/airflow/providers/alibaba/cloud/sensors/oss_key.py
index 1b218f2444..e798f38b5b 100644
--- a/airflow/providers/alibaba/cloud/sensors/oss_key.py
+++ b/airflow/providers/alibaba/cloud/sensors/oss_key.py
@@ -31,9 +31,9 @@ if TYPE_CHECKING:
 
 class OSSKeySensor(BaseSensorOperator):
     """
-    Waits for a key (a file-like instance on OSS) to be present in a OSS bucket.
-    OSS being a key/value it does not support folders. The path is just a key
-    a resource.
+    Waits for a key (a file-like instance on OSS) to be present in an OSS bucket.
+
+    OSS being a key/value, it does not support folders. The path is just a key resource.
 
     :param bucket_key: The key being waited on. Supports full oss:// style url
         or relative path from root level. When it's specified as a full oss://