You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/07/20 20:08:54 UTC
incubator-airflow git commit: [AIRFLOW-756][AIRFLOW-751] Replace ssh
hook, operator & sftp operator with paramiko based
Repository: incubator-airflow
Updated Branches:
refs/heads/master 751e936ac -> fe0edeaab
[AIRFLOW-756][AIRFLOW-751] Replace ssh hook, operator & sftp operator with paramiko based
Closes #1999 from jhsenjaliya/AIRFLOW-756
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fe0edeaa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fe0edeaa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fe0edeaa
Branch: refs/heads/master
Commit: fe0edeaab5a23e6b0bcd67c22ed32e8303273840
Parents: 751e936
Author: Jay <jh...@gmail.com>
Authored: Thu Jul 20 22:07:30 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Jul 20 22:07:45 2017 +0200
----------------------------------------------------------------------
UPDATING.md | 7 +
airflow/contrib/hooks/ssh_hook.py | 268 +++++++++++--------
airflow/contrib/operators/__init__.py | 2 +-
airflow/contrib/operators/sftp_operator.py | 99 +++++++
.../contrib/operators/ssh_execute_operator.py | 159 -----------
airflow/contrib/operators/ssh_operator.py | 106 ++++++++
docs/code.rst | 2 +-
scripts/ci/requirements.txt | 1 +
setup.py | 7 +-
tests/contrib/hooks/test_ssh_hook.py | 70 +++++
tests/contrib/operators/test_sftp_operator.py | 158 +++++++++++
.../operators/test_ssh_execute_operator.py | 95 -------
tests/contrib/operators/test_ssh_operator.py | 89 ++++++
tests/core.py | 52 ----
14 files changed, 694 insertions(+), 421 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index c38c96d..aefed3e 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -117,6 +117,13 @@ If you experience problems connecting with your operator make sure you set the c
Also the old P12 key file type is not supported anymore and only the new JSON key files are supported as a service
account.
+### SSH Hook updates, along with new SSH Operator & SFTP Operator
+ SSH Hook now uses Paramiko library to create ssh client connection, instead of sub-process based ssh command execution previously (<1.9.0), so this is backward incompatible.
+ - update SSHHook constructor
+ - use SSHOperator class in place of SSHExecuteOperator which is removed now. Refer test_ssh_operator.py for usage info.
+ - SFTPOperator is added to perform secure file transfer from serverA to serverB. Refer test_sftp_operator.py.py for usage info.
+ - No updates are required if you are using ftpHook, it will continue work as is.
+
### Deprecated Features
These features are marked for deprecation. They may still work (and raise a `DeprecationWarning`), but are no longer
supported and will be removed entirely in Airflow 2.0
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/airflow/contrib/hooks/ssh_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py
index e63a65d..f1e25a6 100755
--- a/airflow/contrib/hooks/ssh_hook.py
+++ b/airflow/contrib/hooks/ssh_hook.py
@@ -14,125 +14,144 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
-# This is a port of Luigi's ssh implementation. All credits go there.
-import subprocess
-from contextlib import contextmanager
-
-from airflow.hooks.base_hook import BaseHook
-from airflow.exceptions import AirflowException
+import getpass
import logging
+import os
+import paramiko
-class SSHHook(BaseHook):
- """
- Light-weight remote execution library and utilities.
-
- Using this hook (which is just a convenience wrapper for subprocess),
- is created to let you stream data from a remotely stored file.
+from contextlib import contextmanager
+from airflow.exceptions import AirflowException
+from airflow.hooks.base_hook import BaseHook
- As a bonus, :class:`SSHHook` also provides a really cool feature that let's you
- set up ssh tunnels super easily using a python context manager (there is an example
- in the integration part of unittests).
- :param key_file: Typically the SSHHook uses the keys that are used by the user
- airflow is running under. This sets the behavior to use another file instead.
+class SSHHook(BaseHook):
+ """
+ Hook for ssh remote execution using Paramiko.
+ ref: https://github.com/paramiko/paramiko
+ This hook also lets you create ssh tunnel and serve as basis for SFTP file transfer
+
+ :param ssh_conn_id: connection id from airflow Connections from where all the required
+ parameters can be fetched like username, password or key_file.
+ Thought the priority is given to the param passed during init
+ :type ssh_conn_id: str
+ :param remote_host: remote host to connect
+ :type remote_host: str
+ :param username: username to connect to the remote_host
+ :type username: str
+ :param password: password of the username to connect to the remote_host
+ :type password: str
+ :param key_file: key file to use to connect to the remote_host.
:type key_file: str
- :param connect_timeout: sets the connection timeout for this connection.
- :type connect_timeout: int
- :param no_host_key_check: whether to check to host key. If True host keys will not
- be checked, but are also not stored in the current users's known_hosts file.
- :type no_host_key_check: bool
- :param tty: allocate a tty.
- :type tty: bool
- :param sshpass: Use to non-interactively perform password authentication by using
- sshpass.
- :type sshpass: bool
+ :param timeout: timeout for the attempt to connect to the remote_host.
+ :type timeout: int
"""
- def __init__(self, conn_id='ssh_default'):
- conn = self.get_connection(conn_id)
- self.key_file = conn.extra_dejson.get('key_file', None)
- self.connect_timeout = conn.extra_dejson.get('connect_timeout', None)
- self.tcp_keepalive = conn.extra_dejson.get('tcp_keepalive', False)
- self.server_alive_interval = conn.extra_dejson.get('server_alive_interval', 60)
- self.no_host_key_check = conn.extra_dejson.get('no_host_key_check', False)
- self.tty = conn.extra_dejson.get('tty', False)
- self.sshpass = conn.extra_dejson.get('sshpass', False)
- self.conn = conn
-
- def get_conn(self):
- pass
-
- def _host_ref(self):
- if self.conn.login:
- return "{0}@{1}".format(self.conn.login, self.conn.host)
- else:
- return self.conn.host
-
- def _prepare_command(self, cmd):
- connection_cmd = ["ssh", self._host_ref(), "-o", "ControlMaster=no"]
- if self.sshpass:
- connection_cmd = ["sshpass", "-e"] + connection_cmd
- else:
- connection_cmd += ["-o", "BatchMode=yes"] # no password prompts
-
- if self.conn.port:
- connection_cmd += ["-p", str(self.conn.port)]
-
- if self.connect_timeout:
- connection_cmd += ["-o", "ConnectionTimeout={}".format(self.connect_timeout)]
-
- if self.tcp_keepalive:
- connection_cmd += ["-o", "TCPKeepAlive=yes"]
- connection_cmd += ["-o", "ServerAliveInterval={}".format(self.server_alive_interval)]
- if self.no_host_key_check:
- connection_cmd += ["-o", "UserKnownHostsFile=/dev/null",
- "-o", "StrictHostKeyChecking=no"]
+ def __init__(self,
+ ssh_conn_id=None,
+ remote_host=None,
+ username=None,
+ password=None,
+ key_file=None,
+ timeout=10
+ ):
+ super(SSHHook, self).__init__(ssh_conn_id)
+ self.ssh_conn_id = ssh_conn_id
+ self.remote_host = remote_host
+ self.username = username
+ self.password = password
+ self.key_file = key_file
+ self.timeout = timeout
+ # Default values, overridable from Connection
+ self.compress = True
+ self.no_host_key_check = True
+ self.client = None
- if self.key_file:
- connection_cmd += ["-i", self.key_file]
-
- if self.tty:
- connection_cmd += ["-t"]
-
- connection_cmd += cmd
- logging.debug("SSH cmd: {} ".format(connection_cmd))
-
- return connection_cmd
-
- def Popen(self, cmd, **kwargs):
- """
- Remote Popen
-
- :param cmd: command to remotely execute
- :param kwargs: extra arguments to Popen (see subprocess.Popen)
- :return: handle to subprocess
- """
- prefixed_cmd = self._prepare_command(cmd)
- return subprocess.Popen(prefixed_cmd, **kwargs)
-
- def check_output(self, cmd):
- """
- Executes a remote command and returns the stdout a remote process.
- Simplified version of Popen when you only want the output as a string and detect any errors.
-
- :param cmd: command to remotely execute
- :return: stdout
- """
- p = self.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- output, stderr = p.communicate()
-
- if p.returncode != 0:
- # I like this better: RemoteCalledProcessError(p.returncode, cmd, self.host, output=output)
- raise AirflowException("Cannot execute {} on {}. Error code is: {}. Output: {}, Stderr: {}".format(
- cmd, self.conn.host, p.returncode, output, stderr))
-
- return output
+ def get_conn(self):
+ if not self.client:
+ logging.debug('creating ssh client for conn_id: {0}'.format(self.ssh_conn_id))
+ if self.ssh_conn_id is not None:
+ conn = self.get_connection(self.ssh_conn_id)
+ if self.username is None:
+ self.username = conn.login
+ if self.password is None:
+ self.password = conn.password
+ if self.remote_host is None:
+ self.remote_host = conn.host
+ if conn.extra is not None:
+ extra_options = conn.extra_dejson
+ self.key_file = extra_options.get("key_file")
+
+ if "timeout" in extra_options:
+ self.timeout = int(extra_options["timeout"], 10)
+
+ if "compress" in extra_options \
+ and extra_options["compress"].lower() == 'false':
+ self.compress = False
+ if "no_host_key_check" in extra_options \
+ and extra_options["no_host_key_check"].lower() == 'false':
+ self.no_host_key_check = False
+
+ if not self.remote_host:
+ raise AirflowException("Missing required param: remote_host")
+
+ # Auto detecting username values from system
+ if not self.username:
+ logging.debug("username to ssh to host: {0} is not specified, using "
+ "system's default provided by getpass.getuser()"
+ .format(self.remote_host, self.ssh_conn_id))
+ self.username = getpass.getuser()
+
+ host_proxy = None
+ user_ssh_config_filename = os.path.expanduser('~/.ssh/config')
+ if os.path.isfile(user_ssh_config_filename):
+ ssh_conf = paramiko.SSHConfig()
+ ssh_conf.parse(open(user_ssh_config_filename))
+ host_info = ssh_conf.lookup(self.remote_host)
+ if host_info and host_info.get('proxycommand'):
+ host_proxy = paramiko.ProxyCommand(host_info.get('proxycommand'))
+
+ if not (self.password or self.key_file):
+ if host_info and host_info.get('identityfile'):
+ self.key_file = host_info.get('identityfile')[0]
+
+ try:
+ client = paramiko.SSHClient()
+ client.load_system_host_keys()
+ if self.no_host_key_check:
+ # Default is RejectPolicy
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+
+ if self.password and self.password.strip():
+ client.connect(hostname=self.remote_host,
+ username=self.username,
+ password=self.password,
+ timeout=self.timeout,
+ compress=self.compress,
+ sock=host_proxy)
+ else:
+ client.connect(hostname=self.remote_host,
+ username=self.username,
+ key_filename=self.key_file,
+ timeout=self.timeout,
+ compress=self.compress,
+ sock=host_proxy)
+
+ self.client = client
+ except paramiko.AuthenticationException as auth_error:
+ logging.error("Auth failed while connecting to host: {0}, error: {1}"
+ .format(self.remote_host, auth_error))
+ except paramiko.SSHException as ssh_error:
+ logging.error("Failed connecting to host: {0}, error: {1}"
+ .format(self.remote_host, ssh_error))
+ except Exception as error:
+ logging.error("Error connecting to host: {0}, error: {1}"
+ .format(self.remote_host, error))
+ return self.client
@contextmanager
- def tunnel(self, local_port, remote_port=None, remote_host="localhost"):
+ def create_tunnel(self, local_port, remote_port=None, remote_host="localhost"):
"""
Creates a tunnel between two hosts. Like ssh -L <LOCAL_PORT>:host:<REMOTE_PORT>.
Remember to close() the returned "tunnel" object in order to clean up
@@ -146,13 +165,40 @@ class SSHHook(BaseHook):
:type remote_host: str
:return:
"""
+
+ import subprocess
+ # this will ensure the connection to the ssh.remote_host from where the tunnel
+ # is getting created
+ self.get_conn()
+
tunnel_host = "{0}:{1}:{2}".format(local_port, remote_host, remote_port)
- proc = self.Popen(["-L", tunnel_host, "echo -n ready && cat"],
- stdin=subprocess.PIPE, stdout=subprocess.PIPE)
+ ssh_cmd = ["ssh", "{0}@{1}".format(self.username, self.remote_host),
+ "-o", "ControlMaster=no",
+ "-o", "UserKnownHostsFile=/dev/null",
+ "-o", "StrictHostKeyChecking=no"]
+
+ ssh_tunnel_cmd = ["-L", tunnel_host,
+ "echo -n ready && cat"
+ ]
+
+ ssh_cmd += ssh_tunnel_cmd
+ logging.debug("creating tunnel with cmd: {0}".format(ssh_cmd))
+
+ proc = subprocess.Popen(ssh_cmd,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE)
ready = proc.stdout.read(5)
- assert ready == b"ready", "Did not get 'ready' from remote"
+ assert ready == b"ready", \
+ "Did not get 'ready' from remote, got '{0}' instead".format(ready)
yield
proc.communicate()
- assert proc.returncode == 0, "Tunnel process did unclean exit (returncode {}".format(proc.returncode)
+ assert proc.returncode == 0, \
+ "Tunnel process did unclean exit (returncode {}".format(proc.returncode)
+
+ def __enter__(self):
+ return self
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if self.client is not None:
+ self.client.close()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/airflow/contrib/operators/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/__init__.py b/airflow/contrib/operators/__init__.py
index 4ea6c17..b731373 100644
--- a/airflow/contrib/operators/__init__.py
+++ b/airflow/contrib/operators/__init__.py
@@ -32,7 +32,7 @@ import sys
#
# ------------------------------------------------------------------------
_operators = {
- 'ssh_execute_operator': ['SSHExecuteOperator'],
+ 'ssh_operator': ['SSHOperator'],
'vertica_operator': ['VerticaOperator'],
'vertica_to_hive': ['VerticaToHiveTransfer'],
'qubole_operator': ['QuboleOperator'],
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/airflow/contrib/operators/sftp_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/sftp_operator.py b/airflow/contrib/operators/sftp_operator.py
new file mode 100644
index 0000000..b9f07d5
--- /dev/null
+++ b/airflow/contrib/operators/sftp_operator.py
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.ssh_hook import SSHHook
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class SFTPOperation(object):
+ PUT = 'put'
+ GET = 'get'
+
+
+class SFTPOperator(BaseOperator):
+ """
+ SFTPOperator for transferring files from remote host to local or vice a versa.
+ This operator uses ssh_hook to open sftp trasport channel that serve as basis
+ for file transfer.
+
+ :param ssh_hook: predefined ssh_hook to use for remote execution
+ :type ssh_hook: :class:`SSHHook`
+ :param ssh_conn_id: connection id from airflow Connections
+ :type ssh_conn_id: str
+ :param remote_host: remote host to connect
+ :type remote_host: str
+ :param local_filepath: local file path to get or put
+ :type local_filepath: str
+ :param remote_filepath: remote file path to get or put
+ :type remote_filepath: str
+ :param operation: specify operation 'get' or 'put', defaults to get
+ :type get: bool
+ """
+ template_fields = ('local_filepath', 'remote_filepath')
+
+ @apply_defaults
+ def __init__(self,
+ ssh_hook=None,
+ ssh_conn_id=None,
+ remote_host=None,
+ local_filepath=None,
+ remote_filepath=None,
+ operation=SFTPOperation.PUT,
+ *args,
+ **kwargs):
+ super(SFTPOperator, self).__init__(*args, **kwargs)
+ self.ssh_hook = ssh_hook
+ self.ssh_conn_id = ssh_conn_id
+ self.remote_host = remote_host
+ self.local_filepath = local_filepath
+ self.remote_filepath = remote_filepath
+ self.operation = operation
+ if not (self.operation.lower() == SFTPOperation.GET or self.operation.lower() == SFTPOperation.PUT):
+ raise TypeError("unsupported operation value {0}, expected {1} or {2}"
+ .format(self.operation, SFTPOperation.GET, SFTPOperation.PUT))
+
+ def execute(self, context):
+ file_msg = None
+ try:
+ if self.ssh_conn_id and not self.ssh_hook:
+ self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id)
+
+ if not self.ssh_hook:
+ raise AirflowException("can not operate without ssh_hook or ssh_conn_id")
+
+ if self.remote_host is not None:
+ self.ssh_hook.remote_host = self.remote_host
+
+ ssh_client = self.ssh_hook.get_conn()
+ sftp_client = ssh_client.open_sftp()
+ if self.operation.lower() == SFTPOperation.GET:
+ file_msg = "from {0} to {1}".format(self.remote_filepath,
+ self.local_filepath)
+ logging.debug("Starting to transfer {0}".format(file_msg))
+ sftp_client.get(self.remote_filepath, self.local_filepath)
+ else:
+ file_msg = "from {0} to {1}".format(self.local_filepath,
+ self.remote_filepath)
+ logging.debug("Starting to transfer file {0}".format(file_msg))
+ sftp_client.put(self.local_filepath, self.remote_filepath)
+
+ except Exception as e:
+ raise AirflowException("Error while transferring {0}, error: {1}"
+ .format(file_msg, str(e)))
+
+ return None
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/airflow/contrib/operators/ssh_execute_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ssh_execute_operator.py b/airflow/contrib/operators/ssh_execute_operator.py
deleted file mode 100644
index 3bd8f09..0000000
--- a/airflow/contrib/operators/ssh_execute_operator.py
+++ /dev/null
@@ -1,159 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from builtins import bytes
-import logging
-import subprocess
-from subprocess import STDOUT
-
-from airflow.models import BaseOperator
-from airflow.utils.decorators import apply_defaults
-from airflow.exceptions import AirflowException
-
-
-class SSHTempFileContent(object):
- """This class prvides a functionality that creates tempfile
- with given content at remote host.
- Use like::
-
- with SSHTempFileContent(ssh_hook, content) as tempfile:
- ...
-
- In this case, a temporary file ``tempfile``
- with content ``content`` is created where ``ssh_hook`` designate.
-
- Note that this isn't safe because other processes
- at remote host can read and write that tempfile.
-
- :param ssh_hook: A SSHHook that indicates a remote host
- where you want to create tempfile
- :param content: Initial content of creating temporary file
- :type content: string
- :param prefix: The prefix string you want to use for the temporary file
- :type prefix: string
- """
-
- def __init__(self, ssh_hook, content, prefix="tmp"):
- self._ssh_hook = ssh_hook
- self._content = content
- self._prefix = prefix
-
- def __enter__(self):
- ssh_hook = self._ssh_hook
- string = self._content
- prefix = self._prefix
-
- pmktemp = ssh_hook.Popen(["-q",
- "mktemp", "-t", prefix + "_XXXXXX"],
- stdout=subprocess.PIPE,
- stderr=STDOUT)
- tempfile = pmktemp.communicate()[0].rstrip()
- pmktemp.wait()
- if pmktemp.returncode:
- raise AirflowException("Failed to create remote temp file")
-
- ptee = ssh_hook.Popen(["-q", "tee", tempfile],
- stdin=subprocess.PIPE,
- # discard stdout
- stderr=STDOUT)
- ptee.stdin.write(bytes(string, 'utf_8'))
- ptee.stdin.close()
- ptee.wait()
- if ptee.returncode:
- raise AirflowException("Failed to write to remote temp file")
-
- self._tempfile = tempfile
- return tempfile
-
- def __exit__(self, type, value, traceback):
- sp = self._ssh_hook.Popen(["-q", "rm", "-f", "--", self._tempfile])
- sp.communicate()
- sp.wait()
- if sp.returncode:
- raise AirflowException("Failed to remove to remote temp file")
- return False
-
-
-class SSHExecuteOperator(BaseOperator):
- """
- Execute a Bash script, command or set of commands at remote host.
-
- :param ssh_hook: A SSHHook that indicates the remote host
- you want to run the script
- :type ssh_hook: string
- :param bash_command: The command, set of commands or reference to a
- bash script (must be '.sh') to be executed.
- :type bash_command: string
- :param env: If env is not None, it must be a mapping that defines the
- environment variables for the new process; these are used instead
- of inheriting the current process environment, which is the default
- behavior.
- :type env: dict
- """
-
- template_fields = ("bash_command", "env",)
- template_ext = (".sh", ".bash",)
-
- @apply_defaults
- def __init__(self,
- ssh_hook,
- bash_command,
- xcom_push=False,
- env=None,
- *args, **kwargs):
- super(SSHExecuteOperator, self).__init__(*args, **kwargs)
- self.bash_command = bash_command
- self.env = env
- self.hook = ssh_hook
- self.xcom_push = xcom_push
-
- def execute(self, context):
- bash_command = self.bash_command
- hook = self.hook
- host = hook._host_ref()
-
- with SSHTempFileContent(self.hook,
- self.bash_command,
- self.task_id) as remote_file_path:
- logging.info("Temporary script "
- "location : {0}:{1}".format(host, remote_file_path))
- logging.info("Running command: " + bash_command)
- if self.env is not None:
- logging.info("env: " + str(self.env))
-
- sp = hook.Popen(
- ['-q', 'bash', remote_file_path],
- stdout=subprocess.PIPE, stderr=STDOUT,
- env=self.env)
-
- self.sp = sp
-
- logging.info("Output:")
- line = ''
- for line in iter(sp.stdout.readline, b''):
- line = line.decode('utf_8').strip()
- logging.info(line)
- sp.wait()
- logging.info("Command exited with "
- "return code {0}".format(sp.returncode))
- if sp.returncode:
- raise AirflowException("Bash command failed")
- if self.xcom_push:
- return line
-
- def on_kill(self):
- # TODO: Cleanup remote tempfile
- # TODO: kill `mktemp` or `tee` too when they are alive.
- logging.info('Sending SIGTERM signal to bash subprocess')
- self.sp.terminate()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/airflow/contrib/operators/ssh_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/ssh_operator.py b/airflow/contrib/operators/ssh_operator.py
new file mode 100644
index 0000000..ff874da
--- /dev/null
+++ b/airflow/contrib/operators/ssh_operator.py
@@ -0,0 +1,106 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from airflow.contrib.hooks.ssh_hook import SSHHook
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class SSHOperator(BaseOperator):
+
+ """
+ SSHOperator to execute commands on given remote host using the ssh_hook.
+
+ :param ssh_hook: predefined ssh_hook to use for remote execution
+ :type ssh_hook: :class:`SSHHook`
+ :param ssh_conn_id: connection id from airflow Connections
+ :type ssh_conn_id: str
+ :param remote_host: remote host to connect
+ :type remote_host: str
+ :param command: command to execute on remote host
+ :type command: str
+ :param timeout: timeout for executing the command.
+ :type timeout: int
+ :param do_xcom_push: return the stdout which also get set in xcom by airflow platform
+ :type do_xcom_push: bool
+ """
+
+ template_fields = ('command',)
+
+ @apply_defaults
+ def __init__(self,
+ ssh_hook=None,
+ ssh_conn_id=None,
+ remote_host=None,
+ command=None,
+ timeout=10,
+ do_xcom_push=False,
+ *args,
+ **kwargs):
+ super(SSHOperator, self).__init__(*args, **kwargs)
+ self.ssh_hook = ssh_hook
+ self.ssh_conn_id = ssh_conn_id
+ self.remote_host = remote_host
+ self.command = command
+ self.timeout = timeout
+ self.do_xcom_push = do_xcom_push
+
+ def execute(self, context):
+ try:
+ if self.ssh_conn_id and not self.ssh_hook:
+ self.ssh_hook = SSHHook(ssh_conn_id=self.ssh_conn_id)
+
+ if not self.ssh_hook:
+ raise AirflowException("can not operate without ssh_hook or ssh_conn_id")
+
+ if self.remote_host is not None:
+ self.ssh_hook.remote_host = self.remote_host
+
+ ssh_client = self.ssh_hook.get_conn()
+
+ if not self.command:
+ raise AirflowException("no command specified so nothing to execute here.")
+
+ # Auto apply tty when its required in case of sudo
+ get_pty = False
+ if self.command.startswith('sudo'):
+ get_pty = True
+
+ # set timeout taken as params
+ stdin, stdout, stderr = ssh_client.exec_command(command=self.command,
+ get_pty=get_pty,
+ timeout=self.timeout
+ )
+ exit_status = stdout.channel.recv_exit_status()
+ if exit_status is 0:
+ # only returning on output if do_xcom_push is set
+ # otherwise its not suppose to be disclosed
+ if self.do_xcom_push:
+ return stdout.read()
+ else:
+ error_msg = stderr.read()
+ raise AirflowException("error running cmd: {0}, error: {1}"
+ .format(self.command, error_msg))
+
+ except Exception as e:
+ raise AirflowException("SSH operator error: {0}".format(str(e)))
+
+ return True
+
+ def tunnel(self):
+ ssh_client = self.ssh_hook.get_conn()
+ ssh_client.get_transport()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index b17c3fe..a1980f2 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -91,7 +91,7 @@ Community-contributed Operators
.. automodule:: airflow.contrib.operators
:show-inheritance:
:members:
- SSHExecuteOperator,
+ SSHOperator,
VerticaOperator,
VerticaToHiveTransfer
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/scripts/ci/requirements.txt
----------------------------------------------------------------------
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index 0e5dbaf..670335c 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -63,6 +63,7 @@ oauth2client>=2.0.2,<2.1.0
pandas
pandas-gbq
parameterized
+paramiko>=2.1.1
psutil>=4.2.0, <5.0.0
psycopg2
pydruid
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index e4689d2..dedcf76 100644
--- a/setup.py
+++ b/setup.py
@@ -160,6 +160,7 @@ mysql = ['mysqlclient>=1.3.6']
rabbitmq = ['librabbitmq>=1.6.1']
oracle = ['cx_Oracle>=5.1.2']
postgres = ['psycopg2>=2.7.1']
+ssh = ['paramiko>=2.1.1']
salesforce = ['simple-salesforce>=0.72']
s3 = [
'boto>=2.36.0',
@@ -196,11 +197,12 @@ devel = [
'nose-ignore-docstring==0.2',
'nose-timer',
'parameterized',
- 'rednose'
+ 'rednose',
+ 'paramiko'
]
devel_minreq = devel + mysql + doc + password + s3 + cgroups
devel_hadoop = devel_minreq + hive + hdfs + webhdfs + kerberos
-devel_all = devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker
+devel_all = devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docker + ssh
def do_setup():
@@ -284,6 +286,7 @@ def do_setup():
'salesforce': salesforce,
'samba': samba,
'slack': slack,
+ 'ssh': ssh,
'statsd': statsd,
'vertica': vertica,
'webhdfs': webhdfs,
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/tests/contrib/hooks/test_ssh_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_ssh_hook.py b/tests/contrib/hooks/test_ssh_hook.py
new file mode 100644
index 0000000..a556332
--- /dev/null
+++ b/tests/contrib/hooks/test_ssh_hook.py
@@ -0,0 +1,70 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from airflow import configuration
+
+
+HELLO_SERVER_CMD = """
+import socket, sys
+listener = socket.socket()
+listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+listener.bind(('localhost', 2134))
+listener.listen(1)
+sys.stdout.write('ready')
+sys.stdout.flush()
+conn = listener.accept()[0]
+conn.sendall(b'hello')
+"""
+
+
+class SSHHookTest(unittest.TestCase):
+ def setUp(self):
+ configuration.load_test_config()
+ from airflow.contrib.hooks.ssh_hook import SSHHook
+ self.hook = SSHHook(ssh_conn_id='ssh_default')
+ self.hook.no_host_key_check = True
+
+ def test_ssh_connection(self):
+ ssh_hook = self.hook.get_conn()
+ self.assertIsNotNone(ssh_hook)
+
+ def test_tunnel(self):
+ print("Setting up remote listener")
+ import subprocess
+ import socket
+
+ self.server_handle = subprocess.Popen(["python", "-c", HELLO_SERVER_CMD],
+ stdout=subprocess.PIPE)
+ print("Setting up tunnel")
+ with self.hook.create_tunnel(2135, 2134):
+ print("Tunnel up")
+ server_output = self.server_handle.stdout.read(5)
+ self.assertEqual(server_output, b"ready")
+ print("Connecting to server via tunnel")
+ s = socket.socket()
+ s.connect(("localhost", 2135))
+ print("Receiving...", )
+ response = s.recv(5)
+ self.assertEqual(response, b"hello")
+ print("Closing connection")
+ s.close()
+ print("Waiting for listener...")
+ output, _ = self.server_handle.communicate()
+ self.assertEqual(self.server_handle.returncode, 0)
+ print("Closing tunnel")
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/tests/contrib/operators/test_sftp_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_sftp_operator.py b/tests/contrib/operators/test_sftp_operator.py
new file mode 100644
index 0000000..3d31414
--- /dev/null
+++ b/tests/contrib/operators/test_sftp_operator.py
@@ -0,0 +1,158 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import unittest
+from datetime import datetime
+
+from airflow import configuration
+from airflow import models
+from airflow.contrib.operators.sftp_operator import SFTPOperator, SFTPOperation
+from airflow.contrib.operators.ssh_operator import SSHOperator
+from airflow.models import DAG, TaskInstance
+from airflow.settings import Session
+
+TEST_DAG_ID = 'unit_tests'
+DEFAULT_DATE = datetime(2017, 1, 1)
+
+
+def reset(dag_id=TEST_DAG_ID):
+ session = Session()
+ tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
+ tis.delete()
+ session.commit()
+ session.close()
+
+reset()
+
+
+class SFTPOperatorTest(unittest.TestCase):
+ def setUp(self):
+ configuration.load_test_config()
+ from airflow.contrib.hooks.ssh_hook import SSHHook
+ hook = SSHHook(ssh_conn_id='ssh_default')
+ hook.no_host_key_check = True
+ args = {
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE,
+ 'provide_context': True
+ }
+ dag = DAG(TEST_DAG_ID + 'test_schedule_dag_once', default_args=args)
+ dag.schedule_interval = '@once'
+ self.hook = hook
+ self.dag = dag
+ self.test_dir = "/tmp"
+ self.test_local_filename = 'test_local_file'
+ self.test_remote_filename = 'test_remote_file'
+ self.test_local_filepath = '{0}/{1}'.format(self.test_dir,
+ self.test_local_filename)
+ self.test_remote_filepath = '{0}/{1}'.format(self.test_dir,
+ self.test_remote_filename)
+
+ def test_file_transfer_put(self):
+ test_local_file_content = \
+ b"This is local file content \n which is multiline " \
+ b"continuing....with other character\nanother line here \n this is last line"
+ # create a test file locally
+ with open(self.test_local_filepath, 'wb') as f:
+ f.write(test_local_file_content)
+
+ # put test file to remote
+ put_test_task = SFTPOperator(
+ task_id="test_sftp",
+ ssh_hook=self.hook,
+ local_filepath=self.test_local_filepath,
+ remote_filepath=self.test_remote_filepath,
+ operation=SFTPOperation.PUT,
+ dag=self.dag
+ )
+ self.assertIsNotNone(put_test_task)
+ ti2 = TaskInstance(task=put_test_task, execution_date=datetime.now())
+ ti2.run()
+
+ # check the remote file content
+ check_file_task = SSHOperator(
+ task_id="test_check_file",
+ ssh_hook=self.hook,
+ command="cat {0}".format(self.test_remote_filepath),
+ do_xcom_push=True,
+ dag=self.dag
+ )
+ self.assertIsNotNone(check_file_task)
+ ti3 = TaskInstance(task=check_file_task, execution_date=datetime.now())
+ ti3.run()
+ self.assertEqual(
+ ti3.xcom_pull(task_ids='test_check_file', key='return_value').strip(),
+ test_local_file_content)
+
+ def test_file_transfer_get(self):
+ test_remote_file_content = \
+ "This is remote file content \n which is also multiline " \
+ "another line here \n this is last line. EOF"
+
+ # create a test file remotely
+ create_file_task = SSHOperator(
+ task_id="test_create_file",
+ ssh_hook=self.hook,
+ command="echo '{0}' > {1}".format(test_remote_file_content,
+ self.test_remote_filepath),
+ do_xcom_push=True,
+ dag=self.dag
+ )
+ self.assertIsNotNone(create_file_task)
+ ti1 = TaskInstance(task=create_file_task, execution_date=datetime.now())
+ ti1.run()
+
+ # get remote file to local
+ get_test_task = SFTPOperator(
+ task_id="test_sftp",
+ ssh_hook=self.hook,
+ local_filepath=self.test_local_filepath,
+ remote_filepath=self.test_remote_filepath,
+ operation=SFTPOperation.GET,
+ dag=self.dag
+ )
+ self.assertIsNotNone(get_test_task)
+ ti2 = TaskInstance(task=get_test_task, execution_date=datetime.now())
+ ti2.run()
+
+ # test the received content
+ content_received = None
+ with open(self.test_local_filepath, 'r') as f:
+ content_received = f.read()
+ self.assertEqual(content_received.strip(), test_remote_file_content)
+
+ def delete_local_resource(self):
+ if os.path.exists(self.test_local_filepath):
+ os.remove(self.test_local_filepath)
+
+ def delete_remote_resource(self):
+ # check the remote file content
+ remove_file_task = SSHOperator(
+ task_id="test_check_file",
+ ssh_hook=self.hook,
+ command="rm {0}".format(self.test_remote_filepath),
+ do_xcom_push=True,
+ dag=self.dag
+ )
+ self.assertIsNotNone(remove_file_task)
+ ti3 = TaskInstance(task=remove_file_task, execution_date=datetime.now())
+ ti3.run()
+
+ def tearDown(self):
+ self.delete_local_resource() and self.delete_remote_resource()
+
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/tests/contrib/operators/test_ssh_execute_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_ssh_execute_operator.py b/tests/contrib/operators/test_ssh_execute_operator.py
deleted file mode 100644
index 0c2b9f2..0000000
--- a/tests/contrib/operators/test_ssh_execute_operator.py
+++ /dev/null
@@ -1,95 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import unittest
-import os
-import sys
-from datetime import datetime
-from io import StringIO
-
-import mock
-
-from airflow import configuration
-from airflow.settings import Session
-from airflow import models, DAG
-from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
-
-
-TEST_DAG_ID = 'unit_tests'
-DEFAULT_DATE = datetime(2015, 1, 1)
-configuration.load_test_config()
-
-
-def reset(dag_id=TEST_DAG_ID):
- session = Session()
- tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
- tis.delete()
- session.commit()
- session.close()
-
-reset()
-
-
-class SSHExecuteOperatorTest(unittest.TestCase):
-
- def setUp(self):
-
- if sys.version_info[0] == 3:
- raise unittest.SkipTest('SSHExecuteOperatorTest won\'t work with '
- 'python3. No need to test anything here')
-
- configuration.load_test_config()
- from airflow.contrib.hooks.ssh_hook import SSHHook
- hook = mock.MagicMock(spec=SSHHook)
- hook.no_host_key_check = True
- hook.Popen.return_value.stdout = StringIO(u'stdout')
- hook.Popen.return_value.returncode = False
- args = {
- 'owner': 'airflow',
- 'start_date': DEFAULT_DATE,
- 'provide_context': True
- }
- dag = DAG(TEST_DAG_ID+'test_schedule_dag_once', default_args=args)
- dag.schedule_interval = '@once'
- self.hook = hook
- self.dag = dag
-
- @mock.patch('airflow.contrib.operators.ssh_execute_operator.SSHTempFileContent')
- def test_simple(self, temp_file):
- temp_file.return_value.__enter__ = lambda x: 'filepath'
- task = SSHExecuteOperator(
- task_id="test",
- bash_command="echo airflow",
- ssh_hook=self.hook,
- dag=self.dag,
- )
- task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
- @mock.patch('airflow.contrib.operators.ssh_execute_operator.SSHTempFileContent')
- def test_with_env(self, temp_file):
- temp_file.return_value.__enter__ = lambda x: 'filepath'
- test_env = os.environ.copy()
- test_env['AIRFLOW_test'] = "test"
- task = SSHExecuteOperator(
- task_id="test",
- bash_command="echo $AIRFLOW_HOME",
- ssh_hook=self.hook,
- env=test_env['AIRFLOW_test'],
- dag=self.dag,
- )
- task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
-
-
-if __name__ == '__main__':
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/tests/contrib/operators/test_ssh_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_ssh_operator.py b/tests/contrib/operators/test_ssh_operator.py
new file mode 100644
index 0000000..21433d3
--- /dev/null
+++ b/tests/contrib/operators/test_ssh_operator.py
@@ -0,0 +1,89 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime
+
+from airflow import configuration
+from airflow import models
+from airflow.contrib.operators.ssh_operator import SSHOperator
+from airflow.models import DAG, TaskInstance
+from airflow.settings import Session
+
+TEST_DAG_ID = 'unit_tests'
+DEFAULT_DATE = datetime(2017, 1, 1)
+
+
+def reset(dag_id=TEST_DAG_ID):
+ session = Session()
+ tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
+ tis.delete()
+ session.commit()
+ session.close()
+
+reset()
+
+
+class SSHOperatorTest(unittest.TestCase):
+ def setUp(self):
+ configuration.load_test_config()
+ from airflow.contrib.hooks.ssh_hook import SSHHook
+ hook = SSHHook(ssh_conn_id='ssh_default')
+ hook.no_host_key_check = True
+ args = {
+ 'owner': 'airflow',
+ 'start_date': DEFAULT_DATE,
+ 'provide_context': True
+ }
+ dag = DAG(TEST_DAG_ID + 'test_schedule_dag_once', default_args=args)
+ dag.schedule_interval = '@once'
+ self.hook = hook
+ self.dag = dag
+
+ def test_command_execution(self):
+ task = SSHOperator(
+ task_id="test",
+ ssh_hook=self.hook,
+ command="echo -n airflow",
+ do_xcom_push=True,
+ dag=self.dag,
+ )
+
+ self.assertIsNotNone(task)
+
+ ti = TaskInstance(
+ task=task, execution_date=datetime.now())
+ ti.run()
+ self.assertIsNotNone(ti.duration)
+ self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), b'airflow')
+
+ def test_command_execution_with_env(self):
+ task = SSHOperator(
+ task_id="test",
+ ssh_hook=self.hook,
+ command="echo -n airflow",
+ do_xcom_push=True,
+ dag=self.dag,
+ )
+
+ self.assertIsNotNone(task)
+
+ ti = TaskInstance(
+ task=task, execution_date=datetime.now())
+ ti.run()
+ self.assertIsNotNone(ti.duration)
+ self.assertEqual(ti.xcom_pull(task_ids='test', key='return_value'), b'airflow')
+
+if __name__ == '__main__':
+ unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fe0edeaa/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 259b61d..923e0c3 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -2381,58 +2381,6 @@ class S3HookTest(unittest.TestCase):
"Incorrect parsing of the s3 url")
-HELLO_SERVER_CMD = """
-import socket, sys
-listener = socket.socket()
-listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-listener.bind(('localhost', 2134))
-listener.listen(1)
-sys.stdout.write('ready')
-sys.stdout.flush()
-conn = listener.accept()[0]
-conn.sendall(b'hello')
-"""
-
-
-class SSHHookTest(unittest.TestCase):
- def setUp(self):
- configuration.load_test_config()
- from airflow.contrib.hooks.ssh_hook import SSHHook
- self.hook = SSHHook()
- self.hook.no_host_key_check = True
-
- def test_remote_cmd(self):
- output = self.hook.check_output(["echo", "-n", "airflow"])
- self.assertEqual(output, b"airflow")
-
- def test_tunnel(self):
- print("Setting up remote listener")
- import subprocess
- import socket
-
- self.handle = self.hook.Popen([
- "python", "-c", '"{0}"'.format(HELLO_SERVER_CMD)
- ], stdout=subprocess.PIPE)
-
- print("Setting up tunnel")
- with self.hook.tunnel(2135, 2134):
- print("Tunnel up")
- server_output = self.handle.stdout.read(5)
- self.assertEqual(server_output, b"ready")
- print("Connecting to server via tunnel")
- s = socket.socket()
- s.connect(("localhost", 2135))
- print("Receiving...", )
- response = s.recv(5)
- self.assertEqual(response, b"hello")
- print("Closing connection")
- s.close()
- print("Waiting for listener...")
- output, _ = self.handle.communicate()
- self.assertEqual(self.handle.returncode, 0)
- print("Closing tunnel")
-
-
send_email_test = mock.Mock()