You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2021/02/16 08:20:35 UTC

[GitHub] [airflow] ephraimbuddy commented on a change in pull request #14254: Features/sftp to wasb: Transfer to scan files from sftp source path and upload them to Azure Blob Storage

ephraimbuddy commented on a change in pull request #14254:
URL: https://github.com/apache/airflow/pull/14254#discussion_r576621861



##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_name")
+
+    @apply_defaults
+    def __init__(
+        self,

Review comment:
       ```suggestion
           self,
           *,
   ```

##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_name")
+
+    @apply_defaults
+    def __init__(
+        self,
+        sftp_source_path: str,
+        container_name: str,
+        blob_prefix: str = "",
+        sftp_conn_id: str = "sftp_default",
+        wasb_conn_id: str = 'wasb_default',
+        load_options: Optional[dict] = None,
+        move_object: bool = False,
+        *args,

Review comment:
       ```suggestion
   ```
   Operator arguments are now keyword only

##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_name")
+
+    @apply_defaults
+    def __init__(
+        self,
+        sftp_source_path: str,
+        container_name: str,
+        blob_prefix: str = "",
+        sftp_conn_id: str = "sftp_default",
+        wasb_conn_id: str = 'wasb_default',
+        load_options: Optional[dict] = None,
+        move_object: bool = False,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)

Review comment:
       ```suggestion
           super().__init__(**kwargs)
   ```

##########
File path: airflow/providers/microsoft/azure/transfers/sftp_to_wasb.py
##########
@@ -0,0 +1,187 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains SFTP to Azure Blob Storage operator."""
+import os
+from collections import namedtuple
+from tempfile import NamedTemporaryFile
+from typing import Any, Dict, List, Optional, Tuple
+
+from cached_property import cached_property
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
+from airflow.providers.sftp.hooks.sftp import SFTPHook
+from airflow.utils.decorators import apply_defaults
+
+WILDCARD = "*"
+SftpFile = namedtuple('SftpFile', 'sftp_file_path, blob_name')
+
+
+class SFTPToWasbOperator(BaseOperator):
+    """
+    Transfer files to Azure Blob Storage from SFTP server.
+
+    :param sftp_source_path: The sftp remote path. This is the specified file path
+        for downloading the single file or multiple files from the SFTP server.
+        You can use only one wildcard within your path. The wildcard can appear
+        inside the path or at the end of the path.
+    :type sftp_source_path: str
+    :param container_name: Name of the container.
+    :type container_name: str
+    :param blob_prefix: Prefix to name a blob.
+    :type blob_prefix: str
+    :param sftp_conn_id: The sftp connection id. The name or identifier for
+        establishing a connection to the SFTP server.
+    :type sftp_conn_id: str
+    :param wasb_conn_id: Reference to the wasb connection.
+    :type wasb_conn_id: str
+    :param load_options: Optional keyword arguments that
+        `WasbHook.load_file()` takes.
+    :type load_options: dict
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :type move_object: bool
+    """
+
+    template_fields = ("sftp_source_path", "container_name", "blob_name")
+
+    @apply_defaults
+    def __init__(
+        self,
+        sftp_source_path: str,
+        container_name: str,
+        blob_prefix: str = "",
+        sftp_conn_id: str = "sftp_default",
+        wasb_conn_id: str = 'wasb_default',
+        load_options: Optional[dict] = None,
+        move_object: bool = False,
+        *args,
+        **kwargs,
+    ) -> None:
+        super().__init__(*args, **kwargs)
+
+        self.sftp_source_path = sftp_source_path
+        self.blob_prefix = blob_prefix
+        self.sftp_conn_id = sftp_conn_id
+        self.wasb_conn_id = wasb_conn_id
+        self.container_name = container_name
+        self.wasb_conn_id = wasb_conn_id
+        self.load_options = load_options if load_options is not None else {}

Review comment:
       ```suggestion
           self.load_options = load_options or {}
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org