You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by bo...@apache.org on 2017/11/02 17:57:15 UTC

incubator-airflow git commit: [AIRFLOW-1779] Add keepalive packets to ssh hook

Repository: incubator-airflow
Updated Branches:
  refs/heads/master 2fef9152b -> 1bde78338


[AIRFLOW-1779] Add keepalive packets to ssh hook

Make use of paramiko's set_keepalive method to
send keepalive packets every
keepalive_interval seconds.  This will prevent
long running queries with no terminal
output from being termanated as idle, for example
by an intermediate NAT.

Set on by default with a 30 second interval.

Closes #2749 from RJKeevil/add-sshhook-keepalive


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1bde7833
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1bde7833
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1bde7833

Branch: refs/heads/master
Commit: 1bde7833854210676dcd6e8da8b6d9567e12031a
Parents: 2fef915
Author: Rob Keevil <ro...@gmail.com>
Authored: Thu Nov 2 18:57:09 2017 +0100
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Thu Nov 2 18:57:09 2017 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/ssh_hook.py    | 9 ++++++++-
 tests/contrib/hooks/test_ssh_hook.py | 2 +-
 2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1bde7833/airflow/contrib/hooks/ssh_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/ssh_hook.py b/airflow/contrib/hooks/ssh_hook.py
index b061fd7..a85911b 100755
--- a/airflow/contrib/hooks/ssh_hook.py
+++ b/airflow/contrib/hooks/ssh_hook.py
@@ -46,6 +46,8 @@ class SSHHook(BaseHook, LoggingMixin):
     :type key_file: str
     :param timeout: timeout for the attempt to connect to the remote_host.
     :type timeout: int
+    :param keepalive_interval: send a keepalive packet to remote host every keepalive_interval seconds
+    :type keepalive_interval: int
     """
 
     def __init__(self,
@@ -54,7 +56,8 @@ class SSHHook(BaseHook, LoggingMixin):
                  username=None,
                  password=None,
                  key_file=None,
-                 timeout=10
+                 timeout=10,
+                 keepalive_interval=30
                  ):
         super(SSHHook, self).__init__(ssh_conn_id)
         self.ssh_conn_id = ssh_conn_id
@@ -63,6 +66,7 @@ class SSHHook(BaseHook, LoggingMixin):
         self.password = password
         self.key_file = key_file
         self.timeout = timeout
+        self.keepalive_interval = keepalive_interval
         # Default values, overridable from Connection
         self.compress = True
         self.no_host_key_check = True
@@ -140,6 +144,9 @@ class SSHHook(BaseHook, LoggingMixin):
                                    compress=self.compress,
                                    sock=host_proxy)
 
+                if self.keepalive_interval:
+                    client.get_transport().set_keepalive(self.keepalive_interval)
+
                 self.client = client
             except paramiko.AuthenticationException as auth_error:
                 self.log.error(

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1bde7833/tests/contrib/hooks/test_ssh_hook.py
----------------------------------------------------------------------
diff --git a/tests/contrib/hooks/test_ssh_hook.py b/tests/contrib/hooks/test_ssh_hook.py
index a556332..6f35431 100644
--- a/tests/contrib/hooks/test_ssh_hook.py
+++ b/tests/contrib/hooks/test_ssh_hook.py
@@ -33,7 +33,7 @@ class SSHHookTest(unittest.TestCase):
     def setUp(self):
         configuration.load_test_config()
         from airflow.contrib.hooks.ssh_hook import SSHHook
-        self.hook = SSHHook(ssh_conn_id='ssh_default')
+        self.hook = SSHHook(ssh_conn_id='ssh_default', keepalive_interval=10)
         self.hook.no_host_key_check = True
 
     def test_ssh_connection(self):