You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/10/18 21:32:41 UTC

[2/4] impala git commit: IMPALA-7555: Set socket timeout in impala-shell

IMPALA-7555: Set socket timeout in impala-shell

impala-shell does not set any socket timeout while connecting to the
impala server. This change sets a timeout on the socket before
connecting and unsets it back after successfully connecting. The default
timeout on this socket is 5 sec.
Usage: impala-shell --client_connect_timeout=<value in ms>

Testing:
1. Added a test where I create a random listening socket.
impala-shell (with ssl enabled) connects to this socket and
times out after 2 sec.

2. Created a kerberized impala cluster with ssl enabled and
connected to the impalad using an openssl client (block the
beeswax server thread to accept new connection) -

E.g. - openssl s_client -connect <IP Addr>:21000
Used impala-shell to connect to the same impalad later.
impala-shell timed out after the default of 5 sec.I verified
it manually.

Change-Id: I130fc47f7a83f591918d6842634b4e5787d00813
Reviewed-on: http://gerrit.cloudera.org:8080/11540
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2fb8ebae
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2fb8ebae
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2fb8ebae

Branch: refs/heads/master
Commit: 2fb8ebaef2beecd511e963fadbb41cbb11add138
Parents: 5af5456
Author: aphadke <ap...@cloudera.com>
Authored: Thu Sep 20 16:51:23 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Oct 18 01:41:42 2018 +0000

----------------------------------------------------------------------
 shell/impala_client.py                | 29 +++++++++++++++++++++++------
 shell/impala_shell.py                 | 17 +++++++++--------
 shell/impala_shell_config_defaults.py |  1 +
 shell/option_parser.py                |  5 ++++-
 tests/shell/test_shell_commandline.py | 15 +++++++++++++++
 5 files changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 2f2a5e9..e53637a 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -18,6 +18,7 @@
 # under the License.
 
 import sasl
+import sys
 import time
 
 from beeswaxd import BeeswaxService
@@ -57,11 +58,16 @@ class DisconnectedException(Exception):
 
 class QueryCancelledByShellException(Exception): pass
 
+
+def print_to_stderr(message):
+  print >> sys.stderr, message
+
 class ImpalaClient(object):
 
   def __init__(self, impalad, kerberos_host_fqdn, use_kerberos=False,
                kerberos_service_name="impala", use_ssl=False, ca_cert=None, user=None,
-               ldap_password=None, use_ldap=False):
+               ldap_password=None, use_ldap=False, client_connect_timeout_ms=5000,
+               verbose=True):
     self.connected = False
     self.impalad_host = impalad[0].encode('ascii', 'ignore')
     self.impalad_port = int(impalad[1])
@@ -74,6 +80,7 @@ class ImpalaClient(object):
     self.ca_cert = ca_cert
     self.user, self.ldap_password = user, ldap_password
     self.use_ldap = use_ldap
+    self.client_connect_timeout_ms = int(client_connect_timeout_ms)
     self.default_query_options = {}
     self.query_option_levels = {}
     self.query_state = QueryState._NAMES_TO_VALUES
@@ -82,6 +89,7 @@ class ImpalaClient(object):
     # from command line via CTRL+C. It is used to suppress error messages of
     # query cancellation.
     self.is_query_cancelled = False
+    self.verbose = verbose
 
   def _options_to_string_list(self, set_query_options):
     return ["%s=%s" % (k, v) for (k, v) in set_query_options.iteritems()]
@@ -250,8 +258,15 @@ class ImpalaClient(object):
       self.transport = None
 
     self.connected = False
-    self.transport = self._get_transport()
+    sock, self.transport = self._get_socket_and_transport()
+    if self.client_connect_timeout_ms > 0:
+      sock.setTimeout(self.client_connect_timeout_ms)
     self.transport.open()
+    if self.verbose:
+      print_to_stderr('Opened TCP connection to %s:%s' % (self.impalad_host,
+          self.impalad_port))
+    # Setting a timeout of None disables timeouts on sockets
+    sock.setTimeout(None)
     protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
     self.imp_service = ImpalaService.Client(protocol)
     result = self.ping_impala_service()
@@ -266,7 +281,7 @@ class ImpalaClient(object):
     if self.transport:
       self.transport.close()
 
-  def _get_transport(self):
+  def _get_socket_and_transport(self):
     """Create a Transport.
 
        A non-kerberized impalad just needs a simple buffered transport. For
@@ -274,6 +289,7 @@ class ImpalaClient(object):
 
        If SSL is enabled, a TSSLSocket underlies the transport stack; otherwise a TSocket
        is used.
+       This function returns the socket and the transport object.
     """
     if self.use_ssl:
       # TSSLSocket needs the ssl module, which may not be standard on all Operating
@@ -304,7 +320,8 @@ class ImpalaClient(object):
     else:
       sock = TSocket(sock_host, sock_port)
     if not (self.use_ldap or self.use_kerberos):
-      return TBufferedTransport(sock)
+      return sock, TBufferedTransport(sock)
+
     # Initializes a sasl client
     def sasl_factory():
       sasl_client = sasl.Client()
@@ -318,9 +335,9 @@ class ImpalaClient(object):
       return sasl_client
     # GSSASPI is the underlying mechanism used by kerberos to authenticate.
     if self.use_kerberos:
-      return TSaslClientTransport(sasl_factory, "GSSAPI", sock)
+      return sock, TSaslClientTransport(sasl_factory, "GSSAPI", sock)
     else:
-      return TSaslClientTransport(sasl_factory, "PLAIN", sock)
+      return sock, TSaslClientTransport(sasl_factory, "PLAIN", sock)
 
   def create_beeswax_query(self, query_str, set_query_options):
     """Create a beeswax query object from a query string"""

http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index a7b1ac8..9277071 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -168,7 +168,7 @@ class ImpalaShell(object, cmd.Cmd):
     self.ldap_password = options.ldap_password
     self.ldap_password_cmd = options.ldap_password_cmd
     self.use_ldap = options.use_ldap
-
+    self.client_connect_timeout_ms = options.client_connect_timeout_ms
     self.verbose = options.verbose
     self.prompt = ImpalaShell.DISCONNECTED_PROMPT
     self.server_version = ImpalaShell.UNKNOWN_SERVER_VERSION
@@ -518,7 +518,7 @@ class ImpalaShell(object, cmd.Cmd):
     return ImpalaClient(self.impalad, self.kerberos_host_fqdn, self.use_kerberos,
                         self.kerberos_service_name, self.use_ssl,
                         self.ca_cert, self.user, self.ldap_password,
-                        self.use_ldap)
+                        self.use_ldap, self.client_connect_timeout_ms, self.verbose)
 
   def _signal_handler(self, signal, frame):
     """Handles query cancellation on a Ctrl+C event"""
@@ -818,12 +818,13 @@ class ImpalaShell(object, cmd.Cmd):
       print_to_stderr("Unable to import the python 'ssl' module. It is"
       " required for an SSL-secured connection.")
       sys.exit(1)
-    except socket.error, (code, e):
+    except socket.error, e:
       # if the socket was interrupted, reconnect the connection with the client
-      if code == errno.EINTR:
+      if e.errno == errno.EINTR:
         self._reconnect_cancellation()
       else:
-        print_to_stderr("Socket error %s: %s" % (code, e))
+        print_to_stderr("Socket error %s: %s" % (e.errno, e))
+        self.imp_client.close_connection()
         self.prompt = self.DISCONNECTED_PROMPT
     except Exception, e:
       if self.ldap_password_cmd and \
@@ -1507,7 +1508,7 @@ def parse_variables(keyvals):
 
 def replace_variables(set_variables, string):
   """Replaces variable within the string with their corresponding values using the
-  given set_variables."""
+     given set_variables."""
   errors = False
   matches = set(map(lambda v: v.upper(), re.findall(r'(?<!\\)\${([^}]+)}', string)))
   for name in matches:
@@ -1537,8 +1538,8 @@ def replace_variables(set_variables, string):
 
 
 def get_var_name(name):
-  """Look for a namespace:var_name pattern in an option name.
-     Return the variable name if it's a match or None otherwise.
+  """Looks for a namespace:var_name pattern in an option name.
+     Returns the variable name if it's a match or None otherwise.
   """
   ns_match = re.match(r'^([^:]*):(.*)', name)
   if ns_match is not None:

http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/impala_shell_config_defaults.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell_config_defaults.py b/shell/impala_shell_config_defaults.py
index 260e93e..be7685b 100644
--- a/shell/impala_shell_config_defaults.py
+++ b/shell/impala_shell_config_defaults.py
@@ -51,4 +51,5 @@ impala_shell_defaults = {
             'verbose': True,
             'version': False,
             'write_delimited': False,
+            'client_connect_timeout_ms': 5000,
             }

http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/option_parser.py
----------------------------------------------------------------------
diff --git a/shell/option_parser.py b/shell/option_parser.py
index 000e319..ccae53b 100755
--- a/shell/option_parser.py
+++ b/shell/option_parser.py
@@ -221,7 +221,10 @@ def get_option_parser(defaults):
                          " It must follow the pattern \"KEY=VALUE\","
                          " KEY must be a valid query option. Valid query options "
                          " can be listed by command 'set'.")
-
+  parser.add_option("-t", "--client_connect_timeout_ms",
+                    help="Timeout in milliseconds after which impala-shell will time out"
+                    " if it fails to connect to Impala server. Set to 0 to disable any"
+                    " timeout.")
   # add default values to the help text
   for option in parser.option_list:
     # since the quiet flag is the same as the verbose flag

http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index b74fbc2..fd18230 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -32,6 +32,7 @@ from tests.common.skip import SkipIf
 from time import sleep, time
 from util import IMPALAD, SHELL_CMD
 from util import assert_var_substitution, run_impala_shell_cmd, ImpalaShell
+from contextlib import closing
 
 DEFAULT_QUERY = 'select 1'
 QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
@@ -757,3 +758,17 @@ class TestImpalaShell(ImpalaTestSuite):
       find_query_option("duplicate", test_input)
     with pytest.raises(AssertionError):
       find_query_option("not_an_option", test_input)
+
+  def test_impala_shell_timeout(self):
+    """Tests that impala shell times out during connect.
+       This creates a random listening socket and we try to connect to this
+       socket through the impala-shell. The impala-shell should timeout and not hang
+       indefinitely while connecting
+    """
+    with closing(socket.socket()) as s:
+      s.bind(("", 0))
+      # maximum number of queued connections on this socket is 1.
+      s.listen(1)
+      test_port = s.getsockname()[1]
+      args = '-q "select foo; select bar;" --ssl -t 2000 -i localhost:%d' % (test_port)
+      run_impala_shell_cmd(args, expect_success=False)