You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2018/12/18 04:22:54 UTC

[GitHub] stale[bot] closed pull request #3526: [AIRFLOW-2651] Add file system hooks with a common interface

stale[bot] closed pull request #3526: [AIRFLOW-2651] Add file system hooks with a common interface
URL: https://github.com/apache/incubator-airflow/pull/3526
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/hooks/fs_hooks/__init__.py b/airflow/hooks/fs_hooks/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/airflow/hooks/fs_hooks/base.py b/airflow/hooks/fs_hooks/base.py
new file mode 100644
index 0000000000..8e4c4c8243
--- /dev/null
+++ b/airflow/hooks/fs_hooks/base.py
@@ -0,0 +1,264 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from builtins import super
+import errno
+import fnmatch
+import importlib
+import posixpath
+import re
+import shutil
+
+from airflow.hooks.base_hook import BaseHook
+
+_FS_BASE_MODULE = '.'.join(__name__.split('.')[:-1])
+
+
+class FsHook(BaseHook):
+    """Base FsHook defining the FsHook interface and providing some basic
+       functionality built on this interface.
+    """
+
+    _conn_classes = {
+        'ftp': _FS_BASE_MODULE + '.ftp.FtpHook',
+        'hdfs': _FS_BASE_MODULE + '.hdfs3.Hdfs3Hook',
+        'local': _FS_BASE_MODULE + '.local.LocalFsHook',
+        's3': _FS_BASE_MODULE + '.s3.S3FsHook',
+        'sftp': _FS_BASE_MODULE + '.sftp.SftpHook'
+    }
+
+    sep = posixpath.sep
+
+    def __init__(self, conn_id=None):
+        super().__init__(source=None)
+        self._conn_id = conn_id
+
+    @classmethod
+    def for_connection(cls, conn_id=None):
+        """Return appropriate hook for the given connection."""
+
+        if conn_id is None or conn_id == 'local':
+            conn_type = 'local'
+        else:
+            conn_type = cls.get_connection(conn_id).conn_type
+
+        try:
+            class_ = cls._conn_classes[conn_type]
+        except KeyError:
+            raise ValueError('Conn type {!r} is not supported'
+                             .format(conn_type))
+
+        if isinstance(class_, str):
+            # conn_class is a string identifier, import
+            # class from the indicated module.
+            split = class_.split('.')
+            module_name = '.'.join(split[:-1])
+            class_name = split[-1]
+
+            module = importlib.import_module(module_name)
+            class_ = getattr(module, class_name)
+
+        return class_(conn_id=conn_id)
+
+    @classmethod
+    def register_hook(cls, conn_type, class_):
+        """Register FsHook subclass for the given connection type.
+
+        Registered FsHook subclasses are used by `for_connection` when
+        instantiating the appropriate hook for a given connection, based
+        on its connection type.
+
+        :param str conn_type: Connection type.
+        :param class_: FsHook to register. Can either be the class itself
+            or a string specifying the full module path for the class.
+        """
+        cls._conn_classes[conn_type] = class_
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.disconnect()
+
+    def disconnect(self):
+        """Closes fs connection (if applicable)."""
+        pass
+
+    # Interface methods (should be implemented by sub-classes).
+
+    def open(self, file_path, mode='rb'):
+        """Returns file_obj for given file path.
+
+        :param str file_path: Path to the file to open.
+        :param str mode: Mode to open the file in.
+
+        :returns: An opened file object.
+        """
+        raise NotImplementedError()
+
+    def exists(self, file_path):
+        """Checks whether the given file path exists.
+
+        :param str file_path: File path.
+
+        :returns: True if the file exists, else False.
+        :rtype: bool
+        """
+        raise NotImplementedError()
+
+    def isdir(self, path):
+        """Returns true if the given path points to a directory.
+
+        :param str path: File or directory path.
+        """
+        raise NotImplementedError()
+
+    def listdir(self, dir_path):
+        """Lists names of entries in the given path."""
+        raise NotImplementedError()
+
+    def walk(self, dir_path):
+        """Generates file names in the given directory tree."""
+        raise NotImplementedError()
+
+    def makedir(self, dir_path, mode=0o755, exist_ok=True):
+        """Creates the directory, without creating intermediate directories."""
+        raise NotImplementedError()
+
+    def makedirs(self, dir_path, mode=0o755, exist_ok=True):
+        """Creates directory, creating intermediate directories if needed.
+
+        :param str dir_path: Path to the directory to create.
+        :param int mode: Mode to use for directory (if created).
+        :param bool exist_ok: Whether the directory is already allowed to exist.
+            If false, an IOError is raised if the directory exists.
+        """
+        raise NotImplementedError()
+
+    def rm(self, file_path):
+        """Deletes the given file path.
+
+        :param str file_path: Path to file:
+        """
+        raise NotImplementedError()
+
+    def rmtree(self, dir_path):
+        """Deletes given directory tree recursively.
+
+        :param str dir_path: Path to directory to delete.
+        """
+        raise NotImplementedError()
+
+    @staticmethod
+    def _raise_dir_exists(dir_path):
+        raise IOError(errno.EEXIST,
+                      'Directory exists: {!r}'.format(dir_path))
+
+    # General utility methods built on the above interface.
+
+    # These methods can be overridden in sub-classes if more efficient
+    # implementations are available for a specific file system.
+
+    def glob(self, pattern, only_files=True):
+        """Returns list of file paths matching glob pattern.
+
+        Recursive globbing is not supported.
+
+        :param str pattern: Pattern to match against file name.
+        :param bool only_files: If true, only files are returned
+            in the result (no directories).
+
+        :returns: List of matched file paths.
+        :rtype: list[str]
+        """
+
+        root = posixpath.dirname(pattern)
+        file_pattern = posixpath.basename(pattern)
+
+        matches = (posixpath.join(root, match) for match in
+                   fnmatch.filter(self.listdir(root), file_pattern))
+
+        if only_files:
+            matches = (match for match in matches if not self.isdir(match))
+
+        for match in matches:
+            yield match
+
+    def upload(self, src_path, dst_path, src_conn_id='local'):
+        """Uploads files to the given file system.
+
+        Supports copying multiple files via globbing in the `src_path`,
+        in which case `dst_path` is considered to be the destination
+        directory for these files.
+        """
+
+        with FsHook.for_connection(src_conn_id) as src_hook:
+            for src, dst in self._generate_paths(src_path, dst_path, src_hook):
+                with src_hook.open(src, 'rb') as src_file, \
+                     self.open(dst, 'wb') as dst_file:
+                    shutil.copyfileobj(src_file, dst_file)
+
+    def _generate_paths(self, src_path, dst_path, src_hook):
+        """Expands glob file path if given, else returns a list containing
+           a single tuple of (src_path, dst_path).
+        """
+
+        if self._is_glob_pattern(src_path):
+            paths = []
+
+            for src_file_path in src_hook.glob(src_path):
+                base_name = posixpath.basename(src_file_path)
+                dst_file_path = posixpath.join(dst_path, base_name)
+
+                paths.append((src_file_path, dst_file_path))
+        else:
+            paths = [(src_path, dst_path)]
+
+        return paths
+
+    @staticmethod
+    def _is_glob_pattern(path):
+        return re.search(r'\?|\[|\*', path) is not None
+
+    def upload_fileobj(self, file_obj, dst_path):
+        """Uploads a file object in the given file system."""
+
+        with self.open(dst_path, 'rb') as dst_file:
+            shutil.copyfileobj(file_obj, dst_file)
+
+    def download(self, src_path, dst_path, dst_conn_id='local'):
+        """Downloads files from the given file systems.
+
+        Supports copying multiple files via globbing in the `src_path`,
+        in which case `dst_path` is considered to be the destination
+        directory for these files.
+        """
+
+        with FsHook.for_connection(dst_conn_id) as dst_hook:
+            for src, dst in self._generate_paths(src_path, dst_path, self):
+                with self.open(src, 'wb') as src_file, \
+                     dst_hook.open(dst, 'wb') as dst_file:
+                    shutil.copyfileobj(src_file, dst_file)
+
+class NotSupportedError(NotImplementedError):
+    """Exception that may be raised by FsHooks if the don't support
+       the given operation.
+    """
+    pass
diff --git a/airflow/hooks/fs_hooks/ftp.py b/airflow/hooks/fs_hooks/ftp.py
new file mode 100644
index 0000000000..33b179c881
--- /dev/null
+++ b/airflow/hooks/fs_hooks/ftp.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from builtins import super
+import ftplib
+
+import ftputil
+from ftputil import session as ftp_session
+
+from .base import FsHook
+
+
+class FtpHook(FsHook):
+    """Hook for interacting with files over FTP."""
+
+    def __init__(self, conn_id):
+        super().__init__(conn_id=conn_id)
+        self._conn = None
+
+    def get_conn(self):
+        if self._conn is None:
+            config = self.get_connection(self._conn_id)
+
+            secure = config.extra_dejson.get('tls', False)
+            base_class = ftplib.FTP_TLS if secure else ftplib.FTP
+
+            session_factory = ftp_session.session_factory(
+                base_class=base_class,
+                port=config.port or 21,
+                encrypt_data_channel=secure)
+
+            self._conn = ftputil.FTPHost(
+                config.host,
+                config.login,
+                config.password,
+                session_factory=session_factory)
+
+        return self._conn
+
+    def disconnect(self):
+        if self._conn is not None:
+            self._conn.close()
+            self._conn = None
+
+    def open(self, file_path, mode='rb'):
+        return self.get_conn().open(file_path, mode=mode)
+
+    def isdir(self, path):
+        return self.get_conn().isdir(path)
+
+    def exists(self, file_path):
+        return self.get_conn().path.exists(file_path)
+
+    def makedir(self, dir_path, mode=0o755, exist_ok=True):
+        if not exist_ok and self.exists(dir_path):
+            self._raise_dir_exists(dir_path)
+        self.get_conn().mkdir(dir_path, mode=mode)
+
+    def makedirs(self, dir_path, mode=0o755, exist_ok=True):
+        if not exist_ok and self.exists(dir_path):
+            self._raise_dir_exists(dir_path)
+        self.get_conn().makedirs(dir_path, mode=mode)
+
+    def listdir(self, dir_path):
+        return self.get_conn().listdir(dir_path)
+
+    def walk(self, dir_path):
+        for tup in self.get_conn().walk(dir_path):
+            yield tup
+
+    def rm(self, file_path):
+        self.get_conn().remove(file_path)
+
+    def rmtree(self, dir_path):
+        self.get_conn().rmtree(dir_path, ignore_errors=False)
diff --git a/airflow/hooks/fs_hooks/hdfs3.py b/airflow/hooks/fs_hooks/hdfs3.py
new file mode 100644
index 0000000000..8c64fe162c
--- /dev/null
+++ b/airflow/hooks/fs_hooks/hdfs3.py
@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from builtins import super
+
+import hdfs3
+from hdfs3.utils import MyNone
+
+from .base import FsHook
+
+
+class Hdfs3Hook(FsHook):
+    """Hook for interacting with files over HDFS."""
+
+    def __init__(self, conn_id=None):
+        super().__init__(conn_id=conn_id)
+        self._conn = None
+
+    def get_conn(self):
+        if self._conn is None:
+            if self._conn_id is None:
+                self._conn = hdfs3.HDFileSystem()
+            else:
+                config = self.get_connection(self._conn_id)
+                config_extra = config.extra_dejson
+
+                # Extract hadoop parameters from extra.
+                pars = config_extra.get('pars', {})
+
+                # Collect extra parameters to pass to kwargs.
+                extra_kws = {}
+                if config.login:
+                    extra_kws['user'] = config.login
+
+                # Build connection.
+                self._conn = hdfs3.HDFileSystem(
+                    host=config.host or MyNone,
+                    port=config.port or MyNone,
+                    pars=pars,
+                    **extra_kws)
+
+        return self._conn
+
+    def disconnect(self):
+        if self._conn is not None:
+            self._conn.disconnect()
+        self._conn = None
+
+    def open(self, file_path, mode='rb'):
+        return self.get_conn().open(file_path, mode=mode)
+
+    def isdir(self, path):
+        return self.get_conn().isdir(path)
+
+    def exists(self, file_path):
+        return self.get_conn().exists(file_path)
+
+    def makedir(self, dir_path, mode=0e755, exist_ok=True):
+        conn = self.get_conn()
+
+        if conn.exists(dir_path):
+            if not exist_ok:
+                self._raise_dir_exists(dir_path)
+        else:
+            conn.mkdir(dir_path)
+            conn.chmod(dir_path, mode=mode)
+
+    def makedirs(self, dir_path, mode=0o755, exist_ok=True):
+        if not exist_ok and self.exists(dir_path):
+            self._raise_dir_exists(dir_path)
+        self.get_conn().makedirs(dir_path, mode=mode)
+
+    def listdir(self, dir_path):
+        return self.get_conn().ls(dir_path)
+
+    def walk(self, dir_path):
+        for tup in self.get_conn().walk(dir_path):
+            yield tup
+
+    def rm(self, file_path):
+        self.get_conn().rm(file_path, recursive=False)
+
+    def rmtree(self, dir_path):
+        self.get_conn().rm(dir_path, recursive=True)
diff --git a/airflow/hooks/fs_hooks/local.py b/airflow/hooks/fs_hooks/local.py
new file mode 100644
index 0000000000..557f134efb
--- /dev/null
+++ b/airflow/hooks/fs_hooks/local.py
@@ -0,0 +1,78 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from builtins import open, str
+import os
+import shutil
+
+from .base import FsHook
+
+
+class LocalFsHook(FsHook):
+    """Hook for interacting with local files on the local file system."""
+
+    sep = os.sep
+
+    def get_conn(self):
+        return None
+
+    def open(self, file_path, mode='rb'):
+        return open(str(file_path), mode=mode)
+
+    def exists(self, file_path):
+        return os.path.exists(str(file_path))
+
+    def isdir(self, path):
+        return os.path.isdir(path)
+
+    def makedir(self, dir_path, mode=0o755, exist_ok=True):
+        if os.path.exists(dir_path):
+            if not exist_ok:
+                self._raise_dir_exists(dir_path)
+        else:
+            os.mkdir(dir_path, mode)
+
+    def makedirs(self, dir_path, mode=0o755, exist_ok=True):
+        if os.path.exists(dir_path):
+            if not exist_ok:
+                self._raise_dir_exists(dir_path)
+        else:
+            os.makedirs(str(dir_path), mode=mode)
+
+    def listdir(self, dir_path):
+        return os.listdir(dir_path)
+
+    def walk(self, dir_path):
+        for tup in os.walk(dir_path):
+            yield tup
+
+    def rm(self, file_path):
+        os.unlink(str(file_path))
+
+    def rmtree(self, dir_path):
+        shutil.rmtree(str(dir_path))
+
+    @staticmethod
+    def join(path, *paths):
+        return os.path.join(path, *paths)
+
+    @staticmethod
+    def split(path):
+        return os.path.split(path)
diff --git a/airflow/hooks/fs_hooks/s3.py b/airflow/hooks/fs_hooks/s3.py
new file mode 100644
index 0000000000..24638d512d
--- /dev/null
+++ b/airflow/hooks/fs_hooks/s3.py
@@ -0,0 +1,132 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from builtins import super
+import posixpath
+
+import s3fs
+
+from .base import FsHook
+
+
+class S3FsHook(FsHook):
+    """Hook for interacting with files in S3."""
+
+    def __init__(self, conn_id=None):
+        super().__init__(conn_id=conn_id)
+        self._conn = None
+
+    def get_conn(self):
+        if self._conn is None:
+            if self._conn_id is None:
+                self._conn = s3fs.S3FileSystem()
+            else:
+                # TODO: Use same logic as existing S3/AWS hooks.
+                config = self.get_connection(self._conn_id)
+
+                extra_kwargs = {}
+                if 'encryption' in config.extra_dejson:
+                    extra_kwargs['ServerSideEncryption'] = \
+                        config.extra_dejson['encryption']
+
+                self._conn = s3fs.S3FileSystem(
+                    key=config.login,
+                    secret=config.password,
+                    s3_additional_kwargs=extra_kwargs)
+
+        return self._conn
+
+    def disconnect(self):
+        self._conn = None
+
+    def open(self, file_path, mode='rb'):
+        return self.get_conn().open(file_path, mode=mode)
+
+    def exists(self, file_path):
+        return self.get_conn().exists(file_path)
+
+    def isdir(self, path):
+        path = _remove_s3_prefix(path)
+
+        if self.sep not in path:
+            # Path is bucket name.
+            return True
+
+        parent_dir = posixpath.dirname(path)
+
+        for child in self.get_conn().ls(parent_dir, detail=True):
+            if child['Key'] == path and \
+                    child['StorageClass'] == 'DIRECTORY':
+                return True
+
+        return False
+
+    def makedir(self, dir_path, mode=0o755, exist_ok=True):
+        self.makedirs(dir_path, mode=mode, exist_ok=exist_ok)
+
+    def makedirs(self, dir_path, mode=0o755, exist_ok=True):
+        if not exist_ok and self.exists(dir_path):
+            self._raise_dir_exists(dir_path)
+
+    def listdir(self, dir_path):
+        return self.get_conn().ls(dir_path, details=False)
+
+    def walk(self, dir_path):
+        dir_path = _remove_s3_prefix(dir_path)
+        dir_path = _remove_trailing_slash(dir_path)
+
+        # Yield contents of current directory.
+        dir_names, file_names = [], []
+        for child in self.get_conn().ls(dir_path, detail=True):
+            # Get relative path by removing dir_path + trailing slash.
+            rel_path = child['Key'][len(dir_path) + 1:]
+            if child['StorageClass'] == 'DIRECTORY':
+                dir_names.append(rel_path)
+            else:
+                file_names.append(rel_path)
+
+        yield dir_path, dir_names, file_names
+
+        # Walk over sub-directories, in top-down fashion.
+        for dir_name in dir_names:
+            for tup in self.walk(posixpath.join(dir_path, dir_name)):
+                yield tup
+
+    def glob(self, pattern):
+        pattern = _remove_s3_prefix(pattern)
+        return super().glob(pattern=pattern)
+
+    def rm(self, file_path):
+        self.get_conn().rm(file_path, recursive=False)
+
+    def rmtree(self, dir_path):
+        self.get_conn().rm(dir_path, recursive=True)
+
+
+def _remove_s3_prefix(path):
+    if path.startswith('s3://'):
+        path = path[len('s3://'):]
+    return path
+
+
+def _remove_trailing_slash(path):
+    if path.endswith('/'):
+        return path[:-1]
+    return path
diff --git a/airflow/hooks/fs_hooks/sftp.py b/airflow/hooks/fs_hooks/sftp.py
new file mode 100644
index 0000000000..163fd31be7
--- /dev/null
+++ b/airflow/hooks/fs_hooks/sftp.py
@@ -0,0 +1,125 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from builtins import super
+import posixpath
+
+import pysftp
+
+from .base import FsHook
+
+
+class SftpHook(FsHook):
+    """Hook for interacting with files over SFTP."""
+
+    def __init__(self, conn_id):
+        super().__init__(conn_id=conn_id)
+        self._conn = None
+
+    def get_conn(self):
+        if self._conn is None:
+            params = self.get_connection(self._conn_id)
+
+            cnopts = pysftp.CnOpts()
+
+            if params.extra_dejson.get('ignore_hostkey_verification', False):
+                cnopts.hostkeys = None
+
+            private_key = params.extra_dejson.get('private_key', None)
+
+            if not private_key:
+                self._conn = pysftp.Connection(
+                    params.host,
+                    username=params.login,
+                    password=params.password)
+            elif private_key and params.password:
+                self._conn = pysftp.Connection(
+                    params.host,
+                    username=params.login,
+                    private_key=private_key,
+                    private_key_pass=params.password)
+            else:
+                self._conn = pysftp.Connection(
+                    params.host,
+                    username=params.login,
+                    private_key=private_key)
+
+        return self._conn
+
+    def disconnect(self):
+        if self._conn is not None:
+            self._conn.close()
+        self._conn = None
+
+    def open(self, file_path, mode='rb'):
+        return self.get_conn().open(file_path, mode=mode)
+
+    def exists(self, file_path):
+        return self.get_conn().exists(file_path)
+
+    def isdir(self, path):
+        return self.get_conn().isdir(path)
+
+    def makedir(self, dir_path, mode=0o755, exist_ok=True):
+        if not exist_ok and self.exists(dir_path):
+            self._raise_dir_exists(dir_path)
+        self.get_conn().mkdir(dir_path, mode=int(oct(mode)[2:]))
+
+    def makedirs(self, dir_path, mode=0o755, exist_ok=True):
+        if not exist_ok and self.exists(dir_path):
+            self._raise_dir_exists(dir_path)
+        self.get_conn().makedirs(dir_path, mode=int(oct(mode)[2:]))
+
+    def listdir(self, dir_path):
+        return self.get_conn().listdir(dir_path)
+
+    def walk(self, dir_path):
+        from stat import S_ISDIR, S_ISREG
+
+        conn = self.get_conn()
+        client = conn.sftp_client
+
+        # Yield contents of current directory.
+        dir_names, file_names = [], []
+        for entry in conn.listdir(dir_path):
+            full_path = posixpath.join(dir_path, entry)
+            mode = client.stat(full_path).st_mode
+
+            if S_ISDIR(mode):
+                dir_names.append(entry)
+            elif S_ISREG(mode):
+                file_names.append(entry)
+
+        yield dir_path, dir_names, file_names
+
+        # Walk over sub-directories, in top-down fashion.
+        for dir_name in dir_names:
+            for tup in self.walk(posixpath.join(dir_path, dir_name)):
+                yield tup
+
+    def rm(self, file_path):
+        self.get_conn().remove(file_path)
+
+    def rmtree(self, dir_path):
+        result = self.get_conn().execute('rm -r {!r}'.format(dir_path))
+
+        if result:
+            message = b'\n'.join(result)
+            raise OSError(message.decode())
diff --git a/setup.py b/setup.py
index 2882c88522..94752e2436 100644
--- a/setup.py
+++ b/setup.py
@@ -143,6 +143,7 @@ def write_version(filename=os.path.join(*['airflow',
     'elasticsearch-dsl>=5.0.0,<6.0.0'
 ]
 emr = ['boto3>=1.0.0']
+ftp = ['ftputil>=3.4']
 gcp_api = [
     'httplib2>=0.9.2',
     'google-api-python-client>=1.6.0, <2.0.0dev',
@@ -153,7 +154,7 @@ def write_version(filename=os.path.join(*['airflow',
     'pandas-gbq'
 ]
 github_enterprise = ['Flask-OAuthlib>=0.9.1']
-hdfs = ['snakebite>=2.7.8']
+hdfs = ['snakebite>=2.7.8','hdfs3>=0.3.0']
 hive = [
     'hmsclient>=0.1.0',
     'pyhive>=0.6.0',
@@ -180,7 +181,7 @@ def write_version(filename=os.path.join(*['airflow',
 qds = ['qds-sdk>=1.9.6']
 rabbitmq = ['librabbitmq>=1.6.1']
 redis = ['redis>=2.10.5']
-s3 = ['boto3>=1.7.0']
+s3 = ['boto3>=1.7.0', 's3fs>=0.1.5']
 salesforce = ['simple-salesforce>=0.72']
 samba = ['pysmbclient>=0.1.3']
 segment = ['analytics-python>=1.2.9']
@@ -224,7 +225,7 @@ def write_version(filename=os.path.join(*['airflow',
              docker + ssh + kubernetes + celery + azure_blob_storage + redis + gcp_api +
              datadog + zendesk + jdbc + ldap + kerberos + password + webhdfs + jenkins +
              druid + pinot + segment + snowflake + elasticsearch + azure_data_lake +
-             atlas)
+             atlas + ftp)
 
 # Snakebite & Google Cloud Dataflow are not Python 3 compatible :'(
 if PY3:
@@ -312,6 +313,7 @@ def do_setup():
             'druid': druid,
             'elasticsearch': elasticsearch,
             'emr': emr,
+            'ftp': ftp,
             'gcp_api': gcp_api,
             'github_enterprise': github_enterprise,
             'hdfs': hdfs,
diff --git a/tests/hooks/fs_hooks/__init__.py b/tests/hooks/fs_hooks/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/tests/hooks/fs_hooks/test_base.py b/tests/hooks/fs_hooks/test_base.py
new file mode 100644
index 0000000000..b817e2ef18
--- /dev/null
+++ b/tests/hooks/fs_hooks/test_base.py
@@ -0,0 +1,38 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+
+class TestBaseHook(unittest.TestCase):
+    """
+    Tests for the BaseHook class.
+
+    Note most concrete behaviours are tested in the LocalFsHook or the S3FsHook
+    tests, as these have (mock) file systems to test against.
+    """
+
+    def test_for_connection(self):
+        # TODO: Add test for the for_connection method.
+        pass
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/hooks/fs_hooks/test_ftp.py b/tests/hooks/fs_hooks/test_ftp.py
new file mode 100644
index 0000000000..da596200c9
--- /dev/null
+++ b/tests/hooks/fs_hooks/test_ftp.py
@@ -0,0 +1,211 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import ftplib
+import unittest
+
+import mock
+
+from airflow.hooks.fs_hooks.ftp import FtpHook, ftputil, ftp_session
+
+
+class TestFtpHook(unittest.TestCase):
+    """
+    Tests for the FtpHook class.
+
+    Note that the FTP session is mocked in most of these tests to avoid the
+    requirement of having a local FTP server for testing.
+    """
+
+    def setUp(self):
+        self._mock_fs = mock.Mock()
+
+        self._mocked_hook = FtpHook(conn_id='ftp_default')
+        self._mocked_hook._conn = self._mock_fs
+
+    @mock.patch.object(ftp_session, 'session_factory')
+    @mock.patch.object(ftputil, 'FTPHost')
+    @mock.patch.object(FtpHook, 'get_connection')
+    def test_get_conn(self, conn_mock, host_mock, session_mock):
+        """Tests get_conn call with unsecured connection."""
+
+        conn_mock.return_value = mock.Mock(
+            host='example',
+            login='user',
+            password='password',
+            port=2121,
+            extra_dejson={'tls': False})
+
+        with FtpHook(conn_id='ftp_default') as hook:
+            hook.get_conn()
+
+        conn_mock.assert_called_once_with('ftp_default')
+
+        host_mock.assert_called_once_with(
+            'example',
+            'user',
+            'password',
+            session_factory=mock.ANY)
+
+        session_mock.assert_called_once_with(
+            base_class=ftplib.FTP,
+            port=2121,
+            encrypt_data_channel=False)
+
+    @mock.patch.object(ftp_session, 'session_factory')
+    @mock.patch.object(ftputil, 'FTPHost')
+    @mock.patch.object(FtpHook, 'get_connection')
+    def test_get_conn_tls(self, conn_mock, host_mock, session_mock):
+        """Tests get_conn call with a secured connection."""
+
+        conn_mock.return_value = mock.Mock(
+            host='example',
+            login='user',
+            password='password',
+            port=2121,
+            extra_dejson={'tls': True})
+
+        with FtpHook(conn_id='ftp_default') as hook:
+            hook.get_conn()
+
+        conn_mock.assert_called_once_with('ftp_default')
+
+        host_mock.assert_called_once_with(
+            'example',
+            'user',
+            'password',
+            session_factory=mock.ANY)
+
+        session_mock.assert_called_once_with(
+            base_class=ftplib.FTP_TLS,
+            port=2121,
+            encrypt_data_channel=True)
+
+    def test_open(self):
+        """Tests the `open` method."""
+
+        with self._mocked_hook as hook:
+            hook.open('test.txt', mode='rb')
+
+        self._mock_fs.open.assert_called_once_with('test.txt', mode='rb')
+
+    def test_exists(self):
+        """Tests the `exists` method."""
+
+        with self._mocked_hook as hook:
+            hook.exists('test.txt')
+
+        self._mock_fs.path.exists.assert_called_once_with('test.txt')
+
+    def test_isdir(self):
+        """Tests the `isdir` method."""
+
+        with self._mocked_hook as hook:
+            hook.isdir('test.txt')
+
+        self._mock_fs.isdir.assert_called_once_with('test.txt')
+
+    def test_makedir(self):
+        """Tests the `makedir` method with a non-existing dir."""
+
+        self._mock_fs.exists.return_value = False
+
+        with self._mocked_hook as hook:
+            hook.makedir('path/to/dir', mode=0o755)
+
+        self._mock_fs.mkdir.assert_called_once_with('path/to/dir', mode=0o755)
+
+    def test_makedir_existing(self):
+        """Tests the `makedir` method with an existing dir
+           and exist_ok = False.
+        """
+
+        self._mock_fs.exists.return_value = True
+
+        with self._mocked_hook as hook:
+            with self.assertRaises(IOError):
+                hook.makedir('path/to/dir', mode=0o755, exist_ok=False)
+
+    def test_makedir_existing_ok(self):
+        """Tests the `makedir` method with an existing dir
+           and exist_ok = True.
+        """
+
+        self._mock_fs.exists.return_value = True
+
+        with self._mocked_hook as hook:
+            hook.makedir('path/to/dir', mode=0o755, exist_ok=True)
+
+        self._mock_fs.chmod.assert_not_called()
+
+    def test_makedirs(self):
+        """Tests the `makedirs` method with a non-existing dir."""
+
+        self._mock_fs.exists.return_value = False
+
+        with self._mocked_hook as hook:
+            hook.makedirs('path/to/dir', mode=0o755)
+
+        self._mock_fs.makedirs.assert_called_once_with(
+            'path/to/dir', mode=0o755)
+
+    def test_makedirs_existing(self):
+        """Tests the `makedirs` method with an existing dir
+           and exist_ok = False.
+        """
+
+        self._mock_fs.exists.return_value = True
+
+        with self._mocked_hook as hook:
+            with self.assertRaises(IOError):
+                hook.makedirs('path/to/dir', mode=0o755, exist_ok=False)
+
+        self._mock_fs.makedirs.assert_not_called()
+
+    def test_makedirs_existing_ok(self):
+        """Tests the `makedir` method with an existing dir
+           and exist_ok = True.
+        """
+
+        self._mock_fs.exists.return_value = True
+
+        with self._mocked_hook as hook:
+            hook.makedirs('path/to/dir', mode=0o755, exist_ok=True)
+
+    def test_rm(self):
+        """Tests the `rm` method."""
+
+        with self._mocked_hook as hook:
+            hook.rm('test_dir')
+
+        self._mock_fs.remove.assert_called_once_with('test_dir')
+
+    def test_rmtree(self):
+        """Tests the `rmtree` method."""
+
+        with self._mocked_hook as hook:
+            hook.rmtree('test_dir')
+
+        self._mock_fs.rmtree.assert_called_once_with(
+            'test_dir', ignore_errors=False)
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/hooks/fs_hooks/test_hdfs3.py b/tests/hooks/fs_hooks/test_hdfs3.py
new file mode 100644
index 0000000000..3637ca0f1d
--- /dev/null
+++ b/tests/hooks/fs_hooks/test_hdfs3.py
@@ -0,0 +1,205 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import mock
+from hdfs3.utils import MyNone
+
+from airflow.hooks.fs_hooks.hdfs3 import Hdfs3Hook, hdfs3
+
+
+class TestHdfs3Hook(unittest.TestCase):
+    """
+    Tests for the Hdfs3Hook class.
+
+    Note that the HDFileSystem class is mocked in most of these tests
+    to avoid the requirement of having a local HDFS instance for testing.
+    """
+
+    def setUp(self):
+        self._mock_fs = mock.Mock()
+
+        self._mocked_hook = Hdfs3Hook()
+        self._mocked_hook._conn = self._mock_fs
+
+    def test_open(self):
+        """Tests the `open` method."""
+
+        with self._mocked_hook as hook:
+            hook.open('test.txt', mode='rb')
+
+        self._mock_fs.open.assert_called_once_with('test.txt', mode='rb')
+
+    def test_exists(self):
+        """Tests the `exists` method."""
+
+        with self._mocked_hook as hook:
+            hook.exists('test.txt')
+
+        self._mock_fs.exists.assert_called_once_with('test.txt')
+
+    def test_isdir(self):
+        """Tests the `isdir` method."""
+
+        with self._mocked_hook as hook:
+            hook.isdir('test.txt')
+
+        self._mock_fs.isdir.assert_called_once_with('test.txt')
+
+    def test_makedir(self):
+        """Tests the `makedir` method with a non-existing dir."""
+
+        self._mock_fs.exists.return_value = False
+
+        with self._mocked_hook as hook:
+            hook.makedir('path/to/dir', mode=0o755)
+
+        self._mock_fs.mkdir.assert_called_once_with('path/to/dir')
+        self._mock_fs.chmod.assert_called_once_with('path/to/dir', mode=0o755)
+
+    def test_makedir_existing(self):
+        """Tests the `makedir` method with an existing dir
+           and exist_ok = False.
+        """
+
+        self._mock_fs.exists.return_value = True
+
+        with self._mocked_hook as hook:
+            with self.assertRaises(IOError):
+                hook.makedir('path/to/dir', mode=0o755, exist_ok=False)
+
+    def test_makedir_existing_ok(self):
+        """Tests the `makedir` method with an existing dir
+           and exist_ok = True.
+        """
+
+        self._mock_fs.exists.return_value = True
+
+        with self._mocked_hook as hook:
+            hook.makedir('path/to/dir', mode=0o755, exist_ok=True)
+
+        self._mock_fs.chmod.assert_not_called()
+
+    def test_makedirs(self):
+        """Tests the `makedirs` method with a non-existing dir."""
+
+        self._mock_fs.exists.return_value = False
+
+        with self._mocked_hook as hook:
+            hook.makedirs('path/to/dir', mode=0o755)
+
+        self._mock_fs.makedirs.assert_called_once_with(
+            'path/to/dir', mode=0o755)
+
+    def test_makedirs_existing(self):
+        """Tests the `makedirs` method with an existing dir
+           and exist_ok = False.
+        """
+
+        self._mock_fs.exists.return_value = True
+
+        with self._mocked_hook as hook:
+            with self.assertRaises(IOError):
+                hook.makedirs('path/to/dir', mode=0o755, exist_ok=False)
+
+        self._mock_fs.makedirs.assert_not_called()
+
+    def test_makedirs_existing_ok(self):
+        """Tests the `makedir` method with an existing dir
+           and exist_ok = True.
+        """
+
+        self._mock_fs.exists.return_value = True
+
+        with self._mocked_hook as hook:
+            hook.makedirs('path/to/dir', mode=0o755, exist_ok=True)
+
+    def test_rm(self):
+        """Tests the `rm` method."""
+
+        with self._mocked_hook as hook:
+            hook.rm('test_dir')
+
+        self._mock_fs.rm.assert_called_once_with(
+            'test_dir', recursive=False)
+
+    def test_rmtree(self):
+        """Tests the `rmtree` method."""
+
+        with self._mocked_hook as hook:
+            hook.rmtree('test_dir')
+
+        self._mock_fs.rm.assert_called_once_with(
+            'test_dir', recursive=True)
+
+    @mock.patch.object(hdfs3, 'HDFileSystem')
+    @mock.patch.object(Hdfs3Hook, 'get_connection')
+    def test_get_conn(self, conn_mock, hdfs3_mock):
+        """Tests get_conn call without ID."""
+
+        with Hdfs3Hook() as hook:
+            hook.get_conn()
+
+        conn_mock.assert_not_called()
+        hdfs3_mock.assert_called_once_with()
+
+    @mock.patch.object(hdfs3, 'HDFileSystem')
+    @mock.patch.object(Hdfs3Hook, 'get_connection')
+    def test_get_conn_with_conn(self, conn_mock, hdfs3_mock):
+        """Tests get_conn call with ID."""
+
+        conn_mock.return_value = mock.Mock(
+            host='namenode',
+            login='hdfs_user',
+            port=8020,
+            extra_dejson={'pars': {'dfs.namenode.logging.level': 'info'}})
+
+        with Hdfs3Hook(conn_id='hdfs_default') as hook:
+            hook.get_conn()
+
+        conn_mock.assert_called_once_with('hdfs_default')
+
+        hdfs3_mock.assert_called_once_with(
+            host='namenode',
+            port=8020,
+            pars={'dfs.namenode.logging.level': 'info'},
+            user='hdfs_user')
+
+    @mock.patch.object(hdfs3, 'HDFileSystem')
+    @mock.patch.object(Hdfs3Hook, 'get_connection')
+    def test_get_conn_with_empty_conn(self, conn_mock, hdfs3_mock):
+        """Tests get_conn call with empty connection."""
+
+        conn_mock.return_value = mock.Mock(
+            host='',
+            login='',
+            port='',
+            extra_dejson={})
+
+        with Hdfs3Hook(conn_id='hdfs_default') as hook:
+            hook.get_conn()
+
+        conn_mock.assert_called_once_with('hdfs_default')
+        hdfs3_mock.assert_called_once_with(host=MyNone, port=MyNone, pars={})
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/hooks/fs_hooks/test_local.py b/tests/hooks/fs_hooks/test_local.py
new file mode 100644
index 0000000000..d0f9a58130
--- /dev/null
+++ b/tests/hooks/fs_hooks/test_local.py
@@ -0,0 +1,199 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from builtins import open
+
+import os
+from os import path
+import unittest
+import shutil
+import tempfile
+
+from airflow.hooks.fs_hooks.local import LocalFsHook
+
+
+class TestLocalFsHook(unittest.TestCase):
+    """Tests for the LocalFsHook."""
+
+    def setUp(self):
+        self._tmp_dir = tempfile.mkdtemp()
+
+        # Bootstrap some files.
+        with open(path.join(self._tmp_dir, 'hello.txt'), 'wb') as file_:
+            file_.write(b'Hello world!\n')
+
+        with open(path.join(self._tmp_dir, 'hello.csv'), 'wb') as file_:
+            file_.write(b'Hello world!\n')
+
+        nested_dir = path.join(self._tmp_dir, 'test')
+        os.mkdir(nested_dir)
+
+        with open(path.join(nested_dir, 'nested.txt'), 'wb') as file_:
+            file_.write(b'Nested\n')
+
+    def tearDown(self):
+        shutil.rmtree(self._tmp_dir)
+
+    def test_open(self):
+        """Tests the open method."""
+
+        with LocalFsHook() as hook:
+            # Try to write file.
+            new_path = path.join(self._tmp_dir, 'new.txt')
+            self.assertFalse(hook.exists(new_path))
+
+            with hook.open(new_path, 'wb') as file_:
+                file_.write(b'Hello world!')
+
+            # Check file exists.
+            self.assertTrue(hook.exists(new_path))
+
+            # Check reading file.
+            with hook.open(new_path, 'rb') as file_:
+                self.assertEqual(file_.read(), b'Hello world!')
+
+    def test_exists(self):
+        """Tests the exists method."""
+
+        with LocalFsHook() as hook:
+            self.assertTrue(
+                hook.exists(path.join(self._tmp_dir, 'hello.txt')))
+            self.assertFalse(
+                hook.exists(path.join(self._tmp_dir, 'random.txt')))
+
+    def test_isdir(self):
+        """Tests the isdir method."""
+
+        with LocalFsHook() as hook:
+            self.assertTrue(hook.isdir(path.join(self._tmp_dir, 'test')))
+            self.assertFalse(hook.isdir(path.join(self._tmp_dir, 'test.txt')))
+
+    def test_makedir(self):
+        """Tests the makedir method."""
+
+        with LocalFsHook() as hook:
+            # Test non-existing directory.
+            dir_path = path.join(self._tmp_dir, 'new')
+            self.assertFalse(hook.exists(dir_path))
+            hook.makedir(dir_path)
+            self.assertTrue(hook.exists(dir_path))
+
+            # Test existing directory with exist_ok = True.
+            dir_path = path.join(self._tmp_dir, 'test')
+            self.assertTrue(hook.exists(dir_path))
+            hook.makedir(dir_path, exist_ok=True)
+
+            # Test existing directory with exist_ok = False.
+            with self.assertRaises(IOError):
+                hook.makedir(dir_path, exist_ok=False)
+
+            # Test nested directory (should fail).
+            dir_path = path.join(self._tmp_dir, 'new2', 'nested')
+            with self.assertRaises(IOError):
+                hook.makedir(dir_path)
+
+    def test_makedirs(self):
+        """Tests the makedirs method."""
+
+        with LocalFsHook() as hook:
+            # Test non-existing directory.
+            dir_path = path.join(self._tmp_dir, 'new')
+            self.assertFalse(hook.exists(dir_path))
+            hook.makedirs(dir_path)
+            self.assertTrue(hook.exists(dir_path))
+
+            # Test existing directory with exist_ok = True.
+            dir_path = path.join(self._tmp_dir, 'test')
+            self.assertTrue(hook.exists(dir_path))
+            hook.makedirs(dir_path, exist_ok=True)
+
+            # Test existing directory with exist_ok = False.
+            with self.assertRaises(IOError):
+                hook.makedirs(dir_path, exist_ok=False)
+
+            # Test nested directory (should fail).
+            dir_path = path.join(self._tmp_dir, 'new2', 'nested')
+            hook.makedirs(dir_path)
+            self.assertTrue(hook.exists(dir_path))
+
+    def test_walk(self):
+        """Tests the walk method."""
+
+        expected = [(self._tmp_dir, ['test'], ['hello.csv', 'hello.txt']),
+                    (path.join(self._tmp_dir, 'test'), [], ['nested.txt'])]
+
+        with LocalFsHook() as hook:
+            result = list(hook.walk(self._tmp_dir))
+
+        for res_item, exp_item in zip(result, expected):
+            self.assertEqual(res_item[0], exp_item[0])
+            self.assertEqual(sorted(res_item[1]), sorted(exp_item[1]))
+            self.assertEqual(sorted(res_item[2]), sorted(exp_item[2]))
+
+    def test_glob(self):
+        """Tests glob method."""
+
+        with LocalFsHook() as hook:
+            self.assertEqual(
+                hook.glob(path.join(self._tmp_dir, '*.txt')),
+                [path.join(self._tmp_dir, 'hello.txt')])
+
+            self.assertEqual(
+                hook.glob(path.join(self._tmp_dir, '**/*.txt')),
+                [path.join(self._tmp_dir, 'test', 'nested.txt')])
+
+            self.assertEqual(hook.glob(path.join(self._tmp_dir, '*.xml')), [])
+
+    def test_rm(self):
+        """Tests rm method."""
+
+        with LocalFsHook() as hook:
+            file_path = path.join(self._tmp_dir, 'hello.txt')
+            self.assertTrue(hook.exists(file_path))
+
+            hook.rm(file_path)
+            self.assertFalse(hook.exists(file_path))
+
+    def test_rmtree(self):
+        """Tests the rmtree method."""
+
+        with LocalFsHook() as hook:
+            dir_path = path.join(self._tmp_dir, 'test')
+            self.assertTrue(hook.exists(dir_path))
+
+            hook.rmtree(dir_path)
+            self.assertFalse(hook.exists(dir_path))
+
+    def test_join(self):
+        """Tests the join method."""
+
+        self.assertEqual(LocalFsHook.join('test', 'example.csv'),
+                         os.path.join('test', 'example.csv'))
+
+    def test_split(self):
+        """Tests the split method."""
+
+        file_path = os.path.join('test', 'example.csv')
+        self.assertEqual(LocalFsHook.split(file_path),
+                         os.path.split(file_path))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/hooks/fs_hooks/test_s3.py b/tests/hooks/fs_hooks/test_s3.py
new file mode 100644
index 0000000000..94234e3081
--- /dev/null
+++ b/tests/hooks/fs_hooks/test_s3.py
@@ -0,0 +1,216 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import io
+import posixpath
+import unittest
+
+import mock
+
+import boto3
+from moto import mock_s3
+
+from airflow.hooks.fs_hooks.s3 import S3FsHook, s3fs
+
+
+class TestS3FsHook(unittest.TestCase):
+    """Tests for the S3FsHook."""
+
+    def setUp(self):
+        self._mock_s3 = mock_s3()
+        self._mock_s3.start()
+
+        # Create bucket.
+        conn = boto3.resource('s3')
+        self._bucket = conn.create_bucket(Bucket='test_bucket')
+
+        # Bootstrap some files.
+        buffer = io.BytesIO(b'Hello world!\n')
+        self._bucket.upload_fileobj(buffer, 'hello.txt')
+
+        buffer = io.BytesIO(b'Hello world!\n')
+        self._bucket.upload_fileobj(buffer, 'hello.csv')
+
+        buffer = io.BytesIO(b'Nested\n')
+        self._bucket.upload_fileobj(buffer, 'test/nested.txt')
+
+    def tearDown(self):
+        self._mock_s3.stop()
+
+    @mock.patch.object(s3fs, 'S3FileSystem')
+    @mock.patch.object(S3FsHook, 'get_connection')
+    def test_get_conn(self, conn_mock, s3fs_mock):
+        """Tests get_conn call without a connection."""
+
+        with S3FsHook() as hook:
+            hook.get_conn()
+
+        conn_mock.assert_not_called()
+        s3fs_mock.assert_called_once_with()
+
+    @mock.patch.object(s3fs, 'S3FileSystem')
+    @mock.patch.object(S3FsHook, 'get_connection')
+    def test_get_conn_with_conn(self, conn_mock, s3fs_mock):
+        """Tests get_conn call with a connection."""
+
+        conn_mock.return_value = mock.Mock(
+            login='s3_id',
+            password='s3_access_key',
+            extra_dejson={})
+
+        with S3FsHook(conn_id='s3_default') as hook:
+            hook.get_conn()
+
+        s3fs_mock.assert_called_once_with(
+            key='s3_id',
+            secret='s3_access_key',
+            s3_additional_kwargs={})
+
+    @mock.patch.object(s3fs, 'S3FileSystem')
+    @mock.patch.object(S3FsHook, 'get_connection')
+    def test_get_conn_with_encr(self, conn_mock, s3fs_mock):
+        """Tests get_conn call with an encrypted connection."""
+
+        conn_mock.return_value = mock.Mock(
+            login='s3_id',
+            password='s3_access_key',
+            extra_dejson={'encryption': 'AES256'})
+
+        with S3FsHook(conn_id='s3_default') as hook:
+            hook.get_conn()
+
+        conn_mock.assert_called_once_with('s3_default')
+
+        s3fs_mock.assert_called_once_with(
+            key='s3_id',
+            secret='s3_access_key',
+            s3_additional_kwargs={'ServerSideEncryption': 'AES256'})
+
+    def test_with(self):
+        """Tests if context manager closes the connection."""
+
+        with mock.patch.object(S3FsHook, 'disconnect') as mock_disconnect:
+            with S3FsHook() as hook:
+                pass
+
+        mock_disconnect.assert_called_once()
+        self.assertIsNone(hook._conn)
+
+    def test_open(self):
+        """Tests the open method."""
+
+        with S3FsHook() as hook:
+            # Try to write file.
+            with hook.open('s3://test_bucket/new.txt', 'wb') as file_:
+                file_.write(b'Hello world!')
+
+            # Check file exists.
+            self.assertTrue(hook.exists('s3://test_bucket/new.txt'))
+
+            # Check reading file.
+            with hook.open('s3://test_bucket/new.txt', 'rb') as file_:
+                self.assertEqual(file_.read(), b'Hello world!')
+
+    def test_exists(self):
+        """Tests the exists method."""
+
+        with S3FsHook() as hook:
+            self.assertTrue(hook.exists('s3://test_bucket/hello.txt'))
+            self.assertFalse(hook.exists('s3://test_bucket/random.txt'))
+
+    def test_isdir(self):
+        """Tests the isdir method."""
+
+        with S3FsHook() as hook:
+            self.assertTrue(hook.isdir('s3://test_bucket/test'))
+            self.assertFalse(hook.isdir('s3://test_bucket/hello.csv'))
+
+    def test_makedir(self):
+        """Tests the makedirs method (effectively a no-op)."""
+
+        with S3FsHook() as hook:
+            hook.makedir('s3://test_bucket/test/nested')
+
+    def test_makedirs(self):
+        """Tests the makedirs method (effectively a no-op)."""
+
+        with S3FsHook() as hook:
+            hook.makedirs('s3://test_bucket/test/nested')
+
+    def test_walk(self):
+        """Tests the walk method."""
+
+        expected = [('test_bucket', ['test'], ['hello.csv', 'hello.txt']),
+                    ('test_bucket/test', [], ['nested.txt'])]
+
+        with S3FsHook() as hook:
+            result = list(hook.walk('s3://test_bucket'))
+
+        for res_item, exp_item in zip(result, expected):
+            self.assertEqual(res_item[0], exp_item[0])
+            self.assertEqual(sorted(res_item[1]), sorted(exp_item[1]))
+            self.assertEqual(sorted(res_item[2]), sorted(exp_item[2]))
+
+    def test_glob(self):
+        """Tests glob method."""
+
+        with S3FsHook() as hook:
+            self.assertEqual(
+                hook.glob('s3://test_bucket/*.txt'),
+                ['test_bucket/hello.txt'])
+
+            self.assertEqual(
+                hook.glob('s3://test_bucket/**/*.txt'),
+                ['test_bucket/test/nested.txt'])
+
+            self.assertEqual(hook.glob('s3://test_bucket/*.xml'), [])
+
+    def test_rm(self):
+        """Tetts rm method."""
+
+        with S3FsHook() as hook:
+            self.assertTrue(hook.exists('s3://test_bucket/hello.txt'))
+            hook.rm('s3://test_bucket/hello.txt')
+            self.assertFalse(hook.exists('s3://test_bucket/hello.txt'))
+
+    def test_rmtree(self):
+        """Tests the rmtree method."""
+
+        with S3FsHook() as hook:
+            self.assertTrue(hook.exists('s3://test_bucket/test/nested.txt'))
+            hook.rmtree('s3://test_bucket/test')
+            self.assertFalse(hook.exists('s3://test_bucket/test/nested.txt'))
+
+    def test_join(self):
+        """Tests the join method."""
+
+        self.assertEqual(S3FsHook.join('test', 'example.csv'),
+                         posixpath.join('test', 'example.csv'))
+
+    def test_split(self):
+        """Tests the split method."""
+
+        file_path = posixpath.join('test', 'example.csv')
+        self.assertEqual(S3FsHook.split(file_path),
+                         posixpath.split(file_path))
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/tests/hooks/fs_hooks/test_sftp.py b/tests/hooks/fs_hooks/test_sftp.py
new file mode 100644
index 0000000000..2db8d5763a
--- /dev/null
+++ b/tests/hooks/fs_hooks/test_sftp.py
@@ -0,0 +1,221 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import unittest
+
+import mock
+
+from airflow.hooks.fs_hooks.sftp import SftpHook, pysftp
+
+
+class TestFtpHook(unittest.TestCase):
+    """
+    Tests for the FtpHook class.
+
+    Note that the FTP session is mocked in most of these tests to avoid the
+    requirement of having a local FTP server for testing.
+    """
+
+    def setUp(self):
+        self._mock_fs = mock.Mock()
+
+        self._mocked_hook = SftpHook(conn_id='sftp_default')
+        self._mocked_hook._conn = self._mock_fs
+
+    @mock.patch.object(pysftp, 'Connection')
+    @mock.patch.object(SftpHook, 'get_connection')
+    def test_get_conn_pass(self, conn_mock, pysftp_mock):
+        """Tests get_conn with a password."""
+
+        conn_mock.return_value = mock.Mock(
+            host='example',
+            login='user',
+            password='password',
+            extra_dejson={})
+
+        with SftpHook(conn_id='sftp_default') as hook:
+            hook.get_conn()
+
+        conn_mock.assert_called_once_with('sftp_default')
+
+        pysftp_mock.assert_called_once_with(
+            'example',
+            username='user',
+            password='password')
+
+    @mock.patch.object(pysftp, 'Connection')
+    @mock.patch.object(SftpHook, 'get_connection')
+    def test_get_conn_key(self, conn_mock, pysftp_mock):
+        """Tests get_conn with a private key."""
+
+        conn_mock.return_value = mock.Mock(
+            host='example',
+            login='user',
+            password=None,
+            extra_dejson={'private_key': 'id_rsa'})
+
+        with SftpHook(conn_id='sftp_default') as hook:
+            hook.get_conn()
+
+        conn_mock.assert_called_once_with('sftp_default')
+
+        pysftp_mock.assert_called_once_with(
+            'example',
+            username='user',
+            private_key='id_rsa')
+
+    @mock.patch.object(pysftp, 'Connection')
+    @mock.patch.object(SftpHook, 'get_connection')
+    def test_get_conn_key_pass(self, conn_mock, pysftp_mock):
+        """Tests get_conn with a private key + password."""
+
+        conn_mock.return_value = mock.Mock(
+            host='example',
+            login='user',
+            password='key_pass',
+            extra_dejson={'private_key': 'id_rsa'})
+
+        with SftpHook(conn_id='sftp_default') as hook:
+            hook.get_conn()
+
+        conn_mock.assert_called_once_with('sftp_default')
+
+        pysftp_mock.assert_called_once_with(
+            'example',
+            username='user',
+            private_key='id_rsa',
+            private_key_pass='key_pass')
+
+    def test_open(self):
+        """Tests the `open` method."""
+
+        with self._mocked_hook as hook:
+            hook.open('test.txt', mode='rb')
+
+        self._mock_fs.open.assert_called_once_with('test.txt', mode='rb')
+
+    def test_exists(self):
+        """Tests the `exists` method."""
+
+        with self._mocked_hook as hook:
+            hook.exists('test.txt')
+
+        self._mock_fs.exists.assert_called_once_with('test.txt')
+
+    def test_isdir(self):
+        """Tests the `isdir` method."""
+
+        with self._mocked_hook as hook:
+            hook.isdir('test.txt')
+
+        self._mock_fs.isdir.assert_called_once_with('test.txt')
+
+    def test_makedir(self):
+        """Tests the `makedir` method with a non-existing dir."""
+
+        self._mock_fs.exists.return_value = False
+
+        with self._mocked_hook as hook:
+            hook.makedir('path/to/dir', mode=0o755)
+
+        self._mock_fs.mkdir.assert_called_once_with('path/to/dir', mode=755)
+
+    def test_makedir_existing(self):
+        """Tests the `makedir` method with an existing dir
+           and exist_ok = False.
+        """
+
+        self._mock_fs.exists.return_value = True
+
+        with self._mocked_hook as hook:
+            with self.assertRaises(IOError):
+                hook.makedir('path/to/dir', mode=0o755, exist_ok=False)
+
+    def test_makedir_existing_ok(self):
+        """Tests the `makedir` method with an existing dir
+           and exist_ok = True.
+        """
+
+        self._mock_fs.exists.return_value = True
+
+        with self._mocked_hook as hook:
+            hook.makedir('path/to/dir', mode=0o755, exist_ok=True)
+
+    def test_makedirs(self):
+        """Tests the `makedirs` method with a non-existing dir."""
+
+        self._mock_fs.exists.return_value = False
+
+        with self._mocked_hook as hook:
+            hook.makedirs('path/to/dir', mode=0o755)
+
+        self._mock_fs.makedirs.assert_called_once_with(
+            'path/to/dir', mode=755)
+
+    def test_makedirs_existing(self):
+        """Tests the `makedirs` method with an existing dir
+           and exist_ok = False.
+        """
+
+        self._mock_fs.exists.return_value = True
+
+        with self._mocked_hook as hook:
+            with self.assertRaises(IOError):
+                hook.makedirs('path/to/dir', mode=0o755, exist_ok=False)
+
+    def test_makedirs_existing_ok(self):
+        """Tests the `makedir` method with an existing dir
+           and exist_ok = True.
+        """
+
+        self._mock_fs.exists.return_value = True
+
+        with self._mocked_hook as hook:
+            hook.makedirs('path/to/dir', mode=0o755, exist_ok=True)
+
+    # def test_glob(self):
+    #     """Tests the `glob` method."""
+
+    #     with self._mocked_hook as hook:
+    #         hook.glob('*.txt')
+
+    #     self._mock_fs.glob.assert_called_once_with('*.txt')
+
+    def test_rm(self):
+        """Tests the `rm` method."""
+
+        with self._mocked_hook as hook:
+            hook.rm('test_file')
+
+        self._mock_fs.remove.assert_called_once_with('test_file')
+
+    def test_rmtree(self):
+        """Tests the `rmtree` method."""
+
+        self._mock_fs.execute.return_value = None
+
+        with self._mocked_hook as hook:
+            hook.rmtree('test_dir')
+
+        self._mock_fs.execute.assert_called_once_with("rm -r 'test_dir'")
+
+
+if __name__ == '__main__':
+    unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services