You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2022/02/24 08:06:45 UTC

[GitHub] [airflow] EricGao888 opened a new pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

EricGao888 opened a new pull request #21785:
URL: https://github.com/apache/airflow/pull/21785


   - In order to add more support for Alibaba Cloud(related: #17200), we could add oss_task_handler into alibaba-provider to enable remote-logging to OSS(alibaba cloud object storage service)
   - Currently there's no python lib directly providing mocked oss to use in test, we may find a way to mock oss and add more unit tests later but not in this pr. It will probably take some more time. related: #17617
   - This pr closes: #21748 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] EricGao888 commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1061504288


   > You need to rebase @EricGao888
   
   @potiuk Thanks for the reminder! I will rebase it, resolve the comments above and submit a new commit today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#discussion_r821967276



##########
File path: airflow/providers/alibaba/cloud/hooks/oss.py
##########
@@ -269,6 +279,73 @@ def create_bucket(
             self.log.error(e)
             raise AirflowException(f"Errors when create bucket: {bucket_name}")
 
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def append_string(self, bucket_name: Optional[str], content: str, key: str, pos: int) -> None:
+        """
+        Append string to a remote existing file
+
+        :param bucket_name: the name of the bucket
+        :param content: content to be appended
+        :param key: oss bucket key
+        :param pos: position of the existing file where the content will be appended
+        """
+        self.log.info("Write oss bucket. key: %s, pos: %s", key, pos)
+        try:
+            self.get_bucket(bucket_name).append_object(key, pos, content)
+        except Exception as e:
+            self.log.error(e)
+            raise AirflowException(f"Errors when append string for object: {key}")
+
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def read_key(self, bucket_name: Optional[str], key: str) -> str:
+        """
+        Read oss remote object content with the specified key
+
+        :param bucket_name: the name of the bucket
+        :param key: oss bucket key
+        """
+        self.log.info("Read oss key: %s", key)
+        try:
+            return self.get_bucket(bucket_name).get_object(key).read().decode("utf-8")
+        except Exception as e:
+            self.log.error(e)
+            raise AirflowException(f"Errors when read bucket object: {key}")
+
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def head_key(self, bucket_name: Optional[str], key: str) -> oss2.models.HeadObjectResult:
+        """
+        Get meta info of the specified remote object
+
+        :param bucket_name: the name of the bucket
+        :param key: oss bucket key
+        """
+        self.log.info("Head Object oss key: " + key)

Review comment:
       ```suggestion
           self.log.info("Head Object oss key: %s", key)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#discussion_r817117343



##########
File path: airflow/providers/alibaba/cloud/hooks/oss.py
##########
@@ -269,6 +279,74 @@ def create_bucket(
             self.log.error(e)
             raise AirflowException(f"Errors when create bucket: {bucket_name}")
 
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def append_string(self, bucket_name: Optional[str], content: str, key: str, pos: int):
+        """
+        Append string to a remote existing file
+
+        :param bucket_name: the name of the bucket
+        :param content: content to be appended
+        :param key: oss bucket key
+        :param pos: position of the existing file where the content will be appended
+        """
+        self.log.info("Write oss bucket key: " + key)
+        self.log.info("Write oss bucket pos: " + str(pos))

Review comment:
       ```suggestion
           self.log.info("Write oss bucket. key: %s, pos: %s", key, pos)
   ```
   Pass the text and arguments to the logger separately so that they can be formatted depending on the logger configuration. See: https://github.com/apache/airflow/blob/8505d2f0a4524313e3eff7a4f16b9a9439c7a79f/airflow/utils/log/colored_log.py#L39

##########
File path: airflow/providers/alibaba/cloud/hooks/oss.py
##########
@@ -269,6 +279,74 @@ def create_bucket(
             self.log.error(e)
             raise AirflowException(f"Errors when create bucket: {bucket_name}")
 
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def append_string(self, bucket_name: Optional[str], content: str, key: str, pos: int):
+        """
+        Append string to a remote existing file
+
+        :param bucket_name: the name of the bucket
+        :param content: content to be appended
+        :param key: oss bucket key
+        :param pos: position of the existing file where the content will be appended
+        """
+        self.log.info("Write oss bucket key: " + key)
+        self.log.info("Write oss bucket pos: " + str(pos))
+        try:
+            self.get_bucket(bucket_name).append_object(key, pos, content)
+        except Exception as e:
+            self.log.error(e)
+            raise AirflowException(f"Errors when append string for object: {key}")
+
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def read_key(self, bucket_name: Optional[str], key: str) -> str:
+        """
+        Read oss remote object content with the specified key
+
+        :param bucket_name: the name of the bucket
+        :param key: oss bucket key
+        """
+        self.log.info("Read oss key: " + key)

Review comment:
       ```suggestion
           self.log.info("Read oss key: %s", key)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#discussion_r821968801



##########
File path: airflow/providers/alibaba/cloud/log/oss_task_handler.py
##########
@@ -0,0 +1,186 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+import sys
+
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class OSSTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    OSSTaskHandler is a python log handler that handles and reads
+    task instance logs. It extends airflow FileTaskHandler and
+    uploads to and reads from OSS remote storage.
+    """
+
+    def __init__(self, base_log_folder, oss_log_folder, filename_template):
+        self.log.info("Using oss_task_handler for remote logging...")
+        super().__init__(base_log_folder, filename_template)
+        (self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder)
+        self.log_relative_path = ''
+        self._hook = None
+        self.closed = False
+        self.upload_on_close = True
+
+    @cached_property
+    def hook(self):
+        remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
+        self.log.info("remote_conn_id: %s", remote_conn_id)
+        try:
+            return OSSHook(oss_conn_id=remote_conn_id)
+        except Exception as e:
+            self.log.error(e, exc_info=True)
+            self.log.error(
+                'Could not create an OSSHook with connection id "%s". '
+                'Please make sure that airflow[oss] is installed and '
+                'the OSS connection exists.',
+                remote_conn_id,
+            )
+
+    def set_context(self, ti):
+        super().set_context(ti)
+        # Local location and remote location is needed to open and
+        # upload local log file to OSS remote storage.
+        self.log_relative_path = self._render_filename(ti, ti.try_number)
+        self.upload_on_close = not ti.raw
+
+        # Clear the file first so that duplicate data is not uploaded
+        # when re-using the same path (e.g. with rescheduled sensors)
+        if self.upload_on_close:
+            with open(self.handler.baseFilename, 'w'):
+                pass
+
+    def close(self):
+        """Close and upload local log file to remote storage OSS."""
+        # When application exit, system shuts down all handlers by
+        # calling close method. Here we check if logger is already
+        # closed to prevent uploading the log to remote storage multiple
+        # times when `logging.shutdown` is called.
+        if self.closed:
+            return
+
+        super().close()
+
+        if not self.upload_on_close:
+            return
+
+        local_loc = os.path.join(self.local_base, self.log_relative_path)
+        remote_loc = self.log_relative_path
+        if os.path.exists(local_loc):
+            # read log and remove old logs to get just the latest additions
+            with open(local_loc) as logfile:
+                log = logfile.read()
+            self.oss_write(log, remote_loc)
+
+        # Mark closed so we don't double write if close is called twice
+        self.closed = True
+
+    def _read(self, ti, try_number, metadata=None):
+        """
+        Read logs of given task instance and try_number from OSS remote storage.
+        If failed, read the log from task instance host machine.
+
+        :param ti: task instance object
+        :param try_number: task instance try_number to read logs from
+        :param metadata: log metadata,
+                         can be used for steaming log reading and auto-tailing.
+        """
+        # Explicitly getting log relative path is necessary as the given
+        # task instance might be different than task instance passed in
+        # in set_context method.
+        log_relative_path = self._render_filename(ti, try_number)
+        remote_loc = log_relative_path
+
+        if self.oss_log_exists(remote_loc):
+            # If OSS remote file exists, we do not fetch logs from task instance
+            # local machine even if there are errors reading remote logs, as
+            # returned remote_log will contain error messages.
+            remote_log = self.oss_read(remote_loc, return_error=True)
+            log = f'*** Reading remote log from {remote_loc}.\n{remote_log}\n'
+            return log, {'end_of_log': True}
+        else:
+            return super()._read(ti, try_number)
+
+    def oss_log_exists(self, remote_log_location):
+        """
+        Check if remote_log_location exists in remote storage
+
+        :param remote_log_location: log's location in remote storage
+        :return: True if location exists else False
+        """
+        oss_remote_log_location = self.base_folder + '/' + remote_log_location
+        try:
+            return self.hook.key_exist(self.bucket_name, oss_remote_log_location)
+        except Exception:
+            pass
+        return False
+
+    def oss_read(self, remote_log_location, return_error=False):
+        """
+        Returns the log found at the remote_log_location. Returns '' if no
+        logs are found or there is an error.
+
+        :param remote_log_location: the log's location in remote storage
+        :param return_error: if True, returns a string error message if an
+            error occurs. Otherwise returns '' when an error occurs.
+        """
+        try:
+            oss_remote_log_location = self.base_folder + '/' + remote_log_location
+            self.log.info("read remote log: " + oss_remote_log_location)

Review comment:
       ```suggestion
               self.log.info("read remote log: %s",  oss_remote_log_location)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] EricGao888 commented on a change in pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on a change in pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#discussion_r817324462



##########
File path: tests/providers/alibaba/cloud/hooks/test_oss.py
##########
@@ -35,6 +35,7 @@ def setUp(self):
         try:
             self.hook = OSSHook(region=TEST_REGION, oss_conn_id=TEST_CONN_ID)
             self.hook.object_exists(key='test-obj', bucket_name=TEST_BUCKET)
+            print(self.hook.read_key(TEST_BUCKET, 'test-obj'))

Review comment:
       Sure, will fix it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#discussion_r817118879



##########
File path: airflow/providers/alibaba/cloud/hooks/oss.py
##########
@@ -269,6 +279,74 @@ def create_bucket(
             self.log.error(e)
             raise AirflowException(f"Errors when create bucket: {bucket_name}")
 
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def append_string(self, bucket_name: Optional[str], content: str, key: str, pos: int):

Review comment:
       These new methods are not covered by tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#discussion_r817118117



##########
File path: tests/providers/alibaba/cloud/hooks/test_oss.py
##########
@@ -35,6 +35,7 @@ def setUp(self):
         try:
             self.hook = OSSHook(region=TEST_REGION, oss_conn_id=TEST_CONN_ID)
             self.hook.object_exists(key='test-obj', bucket_name=TEST_BUCKET)
+            print(self.hook.read_key(TEST_BUCKET, 'test-obj'))

Review comment:
       Can you add a real assertion here instead printing text?

##########
File path: tests/providers/alibaba/cloud/hooks/test_oss.py
##########
@@ -35,6 +35,7 @@ def setUp(self):
         try:
             self.hook = OSSHook(region=TEST_REGION, oss_conn_id=TEST_CONN_ID)
             self.hook.object_exists(key='test-obj', bucket_name=TEST_BUCKET)
+            print(self.hook.read_key(TEST_BUCKET, 'test-obj'))

Review comment:
       Can you add a real assertion here instead of printing text?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] EricGao888 commented on a change in pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on a change in pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#discussion_r822163072



##########
File path: airflow/providers/alibaba/cloud/log/oss_task_handler.py
##########
@@ -0,0 +1,186 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+import sys
+
+if sys.version_info >= (3, 8):
+    from functools import cached_property
+else:
+    from cached_property import cached_property
+
+from airflow.configuration import conf
+from airflow.providers.alibaba.cloud.hooks.oss import OSSHook
+from airflow.utils.log.file_task_handler import FileTaskHandler
+from airflow.utils.log.logging_mixin import LoggingMixin
+
+
+class OSSTaskHandler(FileTaskHandler, LoggingMixin):
+    """
+    OSSTaskHandler is a python log handler that handles and reads
+    task instance logs. It extends airflow FileTaskHandler and
+    uploads to and reads from OSS remote storage.
+    """
+
+    def __init__(self, base_log_folder, oss_log_folder, filename_template):
+        self.log.info("Using oss_task_handler for remote logging...")
+        super().__init__(base_log_folder, filename_template)
+        (self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder)
+        self.log_relative_path = ''
+        self._hook = None
+        self.closed = False
+        self.upload_on_close = True
+
+    @cached_property
+    def hook(self):
+        remote_conn_id = conf.get('logging', 'REMOTE_LOG_CONN_ID')
+        self.log.info("remote_conn_id: %s", remote_conn_id)
+        try:
+            return OSSHook(oss_conn_id=remote_conn_id)
+        except Exception as e:
+            self.log.error(e, exc_info=True)
+            self.log.error(
+                'Could not create an OSSHook with connection id "%s". '
+                'Please make sure that airflow[oss] is installed and '
+                'the OSS connection exists.',
+                remote_conn_id,
+            )
+
+    def set_context(self, ti):
+        super().set_context(ti)
+        # Local location and remote location is needed to open and
+        # upload local log file to OSS remote storage.
+        self.log_relative_path = self._render_filename(ti, ti.try_number)
+        self.upload_on_close = not ti.raw
+
+        # Clear the file first so that duplicate data is not uploaded
+        # when re-using the same path (e.g. with rescheduled sensors)
+        if self.upload_on_close:
+            with open(self.handler.baseFilename, 'w'):
+                pass
+
+    def close(self):
+        """Close and upload local log file to remote storage OSS."""
+        # When application exit, system shuts down all handlers by
+        # calling close method. Here we check if logger is already
+        # closed to prevent uploading the log to remote storage multiple
+        # times when `logging.shutdown` is called.
+        if self.closed:
+            return
+
+        super().close()
+
+        if not self.upload_on_close:
+            return
+
+        local_loc = os.path.join(self.local_base, self.log_relative_path)
+        remote_loc = self.log_relative_path
+        if os.path.exists(local_loc):
+            # read log and remove old logs to get just the latest additions
+            with open(local_loc) as logfile:
+                log = logfile.read()
+            self.oss_write(log, remote_loc)
+
+        # Mark closed so we don't double write if close is called twice
+        self.closed = True
+
+    def _read(self, ti, try_number, metadata=None):
+        """
+        Read logs of given task instance and try_number from OSS remote storage.
+        If failed, read the log from task instance host machine.
+
+        :param ti: task instance object
+        :param try_number: task instance try_number to read logs from
+        :param metadata: log metadata,
+                         can be used for steaming log reading and auto-tailing.
+        """
+        # Explicitly getting log relative path is necessary as the given
+        # task instance might be different than task instance passed in
+        # in set_context method.
+        log_relative_path = self._render_filename(ti, try_number)
+        remote_loc = log_relative_path
+
+        if self.oss_log_exists(remote_loc):
+            # If OSS remote file exists, we do not fetch logs from task instance
+            # local machine even if there are errors reading remote logs, as
+            # returned remote_log will contain error messages.
+            remote_log = self.oss_read(remote_loc, return_error=True)
+            log = f'*** Reading remote log from {remote_loc}.\n{remote_log}\n'
+            return log, {'end_of_log': True}
+        else:
+            return super()._read(ti, try_number)
+
+    def oss_log_exists(self, remote_log_location):
+        """
+        Check if remote_log_location exists in remote storage
+
+        :param remote_log_location: log's location in remote storage
+        :return: True if location exists else False
+        """
+        oss_remote_log_location = self.base_folder + '/' + remote_log_location
+        try:
+            return self.hook.key_exist(self.bucket_name, oss_remote_log_location)
+        except Exception:
+            pass
+        return False
+
+    def oss_read(self, remote_log_location, return_error=False):
+        """
+        Returns the log found at the remote_log_location. Returns '' if no
+        logs are found or there is an error.
+
+        :param remote_log_location: the log's location in remote storage
+        :param return_error: if True, returns a string error message if an
+            error occurs. Otherwise returns '' when an error occurs.
+        """
+        try:
+            oss_remote_log_location = self.base_folder + '/' + remote_log_location
+            self.log.info("read remote log: " + oss_remote_log_location)

Review comment:
       Have fixed in the latest commit, thx




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] EricGao888 commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1054940943


   @mik-laj Could you please take a look at this pr? BTW, I will fix the unit tests in alibaba provider in a separate pr, using unittest.mock to replace the real external storage during unit test. THX


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on a change in pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
mik-laj commented on a change in pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#discussion_r821967565



##########
File path: airflow/providers/alibaba/cloud/hooks/oss.py
##########
@@ -269,6 +279,73 @@ def create_bucket(
             self.log.error(e)
             raise AirflowException(f"Errors when create bucket: {bucket_name}")
 
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def append_string(self, bucket_name: Optional[str], content: str, key: str, pos: int) -> None:
+        """
+        Append string to a remote existing file
+
+        :param bucket_name: the name of the bucket
+        :param content: content to be appended
+        :param key: oss bucket key
+        :param pos: position of the existing file where the content will be appended
+        """
+        self.log.info("Write oss bucket. key: %s, pos: %s", key, pos)
+        try:
+            self.get_bucket(bucket_name).append_object(key, pos, content)
+        except Exception as e:
+            self.log.error(e)
+            raise AirflowException(f"Errors when append string for object: {key}")
+
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def read_key(self, bucket_name: Optional[str], key: str) -> str:
+        """
+        Read oss remote object content with the specified key
+
+        :param bucket_name: the name of the bucket
+        :param key: oss bucket key
+        """
+        self.log.info("Read oss key: %s", key)
+        try:
+            return self.get_bucket(bucket_name).get_object(key).read().decode("utf-8")
+        except Exception as e:
+            self.log.error(e)
+            raise AirflowException(f"Errors when read bucket object: {key}")
+
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def head_key(self, bucket_name: Optional[str], key: str) -> oss2.models.HeadObjectResult:
+        """
+        Get meta info of the specified remote object
+
+        :param bucket_name: the name of the bucket
+        :param key: oss bucket key
+        """
+        self.log.info("Head Object oss key: " + key)
+        try:
+            return self.get_bucket(bucket_name).head_object(key)
+        except Exception as e:
+            self.log.error(e)
+            raise AirflowException(f"Errors when head bucket object: {key}")
+
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def key_exist(self, bucket_name: Optional[str], key: str) -> bool:
+        """
+        Find out whether the specified key exists in the oss remote storage
+
+        :param bucket_name: the name of the bucket
+        :param key: oss bucket key
+        """
+        # full_path = None
+        self.log.info(f"Looking up oss bucket {bucket_name} for bucket key {key} ...")

Review comment:
       ```suggestion
           self.log.info(f"Looking up oss bucket %s for bucket key %s ...", bucket_name, key)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1062099048


   @EricGao888 - just the few suggestions from @mik-laj and we can merge it :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] mik-laj commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
mik-laj commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1064308372


   @EricGao888 I rebased this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] EricGao888 commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1053993101


   > Some static checks/docs are failing.
   
   I've fixed the checks/docs failures in the latest commit. Thx for the reminder.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] EricGao888 commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1061996094


   @potiuk @mik-laj  In the latest commit, I've resolved all the comments above, fixed minor bugs, added more unit tests for oss hook and oss task handler. Also, the unit tests in oss task handler are all switched to use mocks instead of connecting to a real OSS. PTAL. Thx : )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] EricGao888 commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1062379121


   > @EricGao888 - just the few suggestions from @mik-laj and we can merge it :)
   
   Done : ) Thx for the review @potiuk @mik-laj 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] EricGao888 commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1064008814


   > Seems docs build failed. Not sure whether should I rebase again? Looks like related: #22100
   
   @potiuk May I ask is there anything else I need to do to get this pr merged? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] boring-cyborg[bot] commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
boring-cyborg[bot] commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1049592532


   Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
   Here are some useful points:
   - Pay attention to the quality of your code (flake8, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#prerequisites-for-pre-commit-hooks) will help you with that.
   - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/docs/apache-airflow/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it.
   - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/BREEZE.rst) for testing locally, itโ€™s a heavy docker but it ships with a working Airflow and a lot of integrations.
   - Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
   - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
   - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst#coding-style-and-best-practices).
   Apache Airflow is a community-driven project and together we are making it better ๐Ÿš€.
   In case of doubts contact the developers at:
   Mailing List: dev@airflow.apache.org
   Slack: https://s.apache.org/airflow-slack
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] EricGao888 commented on a change in pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on a change in pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#discussion_r817324387



##########
File path: airflow/providers/alibaba/cloud/hooks/oss.py
##########
@@ -269,6 +279,74 @@ def create_bucket(
             self.log.error(e)
             raise AirflowException(f"Errors when create bucket: {bucket_name}")
 
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def append_string(self, bucket_name: Optional[str], content: str, key: str, pos: int):
+        """
+        Append string to a remote existing file
+
+        :param bucket_name: the name of the bucket
+        :param content: content to be appended
+        :param key: oss bucket key
+        :param pos: position of the existing file where the content will be appended
+        """
+        self.log.info("Write oss bucket key: " + key)
+        self.log.info("Write oss bucket pos: " + str(pos))

Review comment:
       Thx for the suggestions, will fix it in the following commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] EricGao888 commented on a change in pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on a change in pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#discussion_r817325120



##########
File path: airflow/providers/alibaba/cloud/hooks/oss.py
##########
@@ -269,6 +279,74 @@ def create_bucket(
             self.log.error(e)
             raise AirflowException(f"Errors when create bucket: {bucket_name}")
 
+    @provide_bucket_name
+    @unify_bucket_name_and_key
+    def append_string(self, bucket_name: Optional[str], content: str, key: str, pos: int):

Review comment:
       related: #17617 I think maybe I should create a new pr to fix the tests in oss hook and then fix this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1060721941


   You need to rebase @EricGao888 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] EricGao888 commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1065060866


   @potiuk @mik-laj Seems ready to be merged. Would you like to help me merge the commits? Thx!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] github-actions[bot] commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1053633406


   The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1053774118


   Some static checks/docs are failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] EricGao888 commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
EricGao888 commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1062696319


   Seems docs build failed. Not sure whether should I rebase again? Looks like related: #22100


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk commented on pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
potiuk commented on pull request #21785:
URL: https://github.com/apache/airflow/pull/21785#issuecomment-1062011628


   LGTM. @mik-laj ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [airflow] potiuk merged pull request #21785: Add oss_task_handler into alibaba-provider and enable remote logging to OSS

Posted by GitBox <gi...@apache.org>.
potiuk merged pull request #21785:
URL: https://github.com/apache/airflow/pull/21785


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@airflow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org