You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2023/06/27 22:57:31 UTC
[airflow] branch main updated: D205 Support - Providers: Airbyte and Alibaba (#32214)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0bc689ee6d D205 Support - Providers: Airbyte and Alibaba (#32214)
0bc689ee6d is described below
commit 0bc689ee6d4b6967d7ae99a202031aac14d181a2
Author: D. Ferruzzi <fe...@amazon.com>
AuthorDate: Tue Jun 27 15:57:23 2023 -0700
D205 Support - Providers: Airbyte and Alibaba (#32214)
---
airflow/providers/airbyte/operators/airbyte.py | 3 +--
.../providers/alibaba/cloud/hooks/analyticdb_spark.py | 4 +++-
airflow/providers/alibaba/cloud/hooks/oss.py | 10 ++--------
.../providers/alibaba/cloud/log/oss_task_handler.py | 19 +++++++++----------
.../alibaba/cloud/operators/analyticdb_spark.py | 6 ++----
airflow/providers/alibaba/cloud/sensors/oss_key.py | 6 +++---
6 files changed, 20 insertions(+), 28 deletions(-)
diff --git a/airflow/providers/airbyte/operators/airbyte.py b/airflow/providers/airbyte/operators/airbyte.py
index 7137ca80f1..6d101662db 100644
--- a/airflow/providers/airbyte/operators/airbyte.py
+++ b/airflow/providers/airbyte/operators/airbyte.py
@@ -28,8 +28,7 @@ if TYPE_CHECKING:
class AirbyteTriggerSyncOperator(BaseOperator):
"""
- This operator allows you to submit a job to an Airbyte server to run a integration
- process between your source and destination.
+ Submits a job to an Airbyte server to run a integration process between your source and destination.
.. seealso::
For more information on how to use this operator, take a look at the guide:
diff --git a/airflow/providers/alibaba/cloud/hooks/analyticdb_spark.py b/airflow/providers/alibaba/cloud/hooks/analyticdb_spark.py
index bf3eca1722..49761f18b2 100644
--- a/airflow/providers/alibaba/cloud/hooks/analyticdb_spark.py
+++ b/airflow/providers/alibaba/cloud/hooks/analyticdb_spark.py
@@ -39,7 +39,9 @@ from airflow.utils.log.logging_mixin import LoggingMixin
class AppState(Enum):
"""
- AnalyticDB Spark application states doc:
+ AnalyticDB Spark application states.
+
+ See:
https://www.alibabacloud.com/help/en/analyticdb-for-mysql/latest/api-doc-adb-2021-12-01-api-struct
-sparkappinfo.
diff --git a/airflow/providers/alibaba/cloud/hooks/oss.py b/airflow/providers/alibaba/cloud/hooks/oss.py
index a5a862b384..19991cf350 100644
--- a/airflow/providers/alibaba/cloud/hooks/oss.py
+++ b/airflow/providers/alibaba/cloud/hooks/oss.py
@@ -35,10 +35,7 @@ T = TypeVar("T", bound=Callable)
def provide_bucket_name(func: T) -> T:
- """
- Function decorator that unifies bucket name and key taken from the key
- in case no bucket name and at least a key has been passed to the function.
- """
+ """Function decorator that unifies bucket name and key is a key is provided but not a bucket name."""
function_signature = signature(func)
@wraps(func)
@@ -56,10 +53,7 @@ def provide_bucket_name(func: T) -> T:
def unify_bucket_name_and_key(func: T) -> T:
- """
- Function decorator that unifies bucket name and key taken from the key
- in case no bucket name and at least a key has been passed to the function.
- """
+ """Function decorator that unifies bucket name and key is a key is provided but not a bucket name."""
function_signature = signature(func)
@wraps(func)
diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
index 7d35f0861c..484ebe4f8e 100644
--- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py
+++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py
@@ -45,9 +45,9 @@ def get_default_delete_local_copy():
class OSSTaskHandler(FileTaskHandler, LoggingMixin):
"""
- OSSTaskHandler is a python log handler that handles and reads
- task instance logs. It extends airflow FileTaskHandler and
- uploads to and reads from OSS remote storage.
+ OSSTaskHandler is a python log handler that handles and reads task instance logs.
+
+ Extends airflow FileTaskHandler and uploads to and reads from OSS remote storage.
"""
def __init__(self, base_log_folder, oss_log_folder, filename_template=None, **kwargs):
@@ -120,6 +120,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
def _read(self, ti, try_number, metadata=None):
"""
Read logs of given task instance and try_number from OSS remote storage.
+
If failed, read the log from task instance host machine.
:param ti: task instance object
@@ -128,8 +129,8 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
can be used for steaming log reading and auto-tailing.
"""
# Explicitly getting log relative path is necessary as the given
- # task instance might be different than task instance passed in
- # in set_context method.
+ # task instance might be different from task instance passed in
+ # set_context method.
log_relative_path = self._render_filename(ti, try_number)
remote_loc = log_relative_path
@@ -156,12 +157,11 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
def oss_read(self, remote_log_location, return_error=False):
"""
- Returns the log found at the remote_log_location. Returns '' if no
- logs are found or there is an error.
+ Returns the log at the remote_log_location. Returns '' if no logs are found or there is an error.
:param remote_log_location: the log's location in remote storage
:param return_error: if True, returns a string error message if an
- error occurs. Otherwise returns '' when an error occurs.
+ error occurs. Otherwise, returns '' when an error occurs.
"""
try:
oss_remote_log_location = f"{self.base_folder}/{remote_log_location}"
@@ -176,8 +176,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin):
def oss_write(self, log, remote_log_location, append=True) -> bool:
"""
- Writes the log to the remote_log_location and return `True` when done. Fails silently
- and return `False` if no log was created.
+ Write the log to remote_log_location and return `True`; fails silently and returns `False` on error.
:param log: the log to write to the remote_log_location
:param remote_log_location: the log's location in remote storage
diff --git a/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py b/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
index 6ddd47dab2..0fd65e1ac6 100644
--- a/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
+++ b/airflow/providers/alibaba/cloud/operators/analyticdb_spark.py
@@ -99,8 +99,7 @@ class AnalyticDBSparkBaseOperator(BaseOperator):
class AnalyticDBSparkSQLOperator(AnalyticDBSparkBaseOperator):
"""
- This operator warps the AnalyticDB Spark REST API, allowing to submit a Spark sql
- application to the underlying cluster.
+ Submits a Spark SQL application to the underlying cluster; wraps the AnalyticDB Spark REST API.
:param sql: The SQL query to execute.
:param conf: Spark configuration properties.
@@ -153,8 +152,7 @@ class AnalyticDBSparkSQLOperator(AnalyticDBSparkBaseOperator):
class AnalyticDBSparkBatchOperator(AnalyticDBSparkBaseOperator):
"""
- This operator warps the AnalyticDB Spark REST API, allowing to submit a Spark batch
- application to the underlying cluster.
+ Submits a Spark batch application to the underlying cluster; wraps the AnalyticDB Spark REST API.
:param file: path of the file containing the application to execute.
:param class_name: name of the application Java/Spark main class.
diff --git a/airflow/providers/alibaba/cloud/sensors/oss_key.py b/airflow/providers/alibaba/cloud/sensors/oss_key.py
index 1b218f2444..e798f38b5b 100644
--- a/airflow/providers/alibaba/cloud/sensors/oss_key.py
+++ b/airflow/providers/alibaba/cloud/sensors/oss_key.py
@@ -31,9 +31,9 @@ if TYPE_CHECKING:
class OSSKeySensor(BaseSensorOperator):
"""
- Waits for a key (a file-like instance on OSS) to be present in a OSS bucket.
- OSS being a key/value it does not support folders. The path is just a key
- a resource.
+ Waits for a key (a file-like instance on OSS) to be present in an OSS bucket.
+
+ OSS being a key/value, it does not support folders. The path is just a key resource.
:param bucket_key: The key being waited on. Supports full oss:// style url
or relative path from root level. When it's specified as a full oss://