You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/10/18 21:32:41 UTC
[2/4] impala git commit: IMPALA-7555: Set socket timeout in
impala-shell
IMPALA-7555: Set socket timeout in impala-shell
impala-shell does not set any socket timeout while connecting to the
impala server. This change sets a timeout on the socket before
connecting and unsets it back after successfully connecting. The default
timeout on this socket is 5 sec.
Usage: impala-shell --client_connect_timeout=<value in ms>
Testing:
1. Added a test where I create a random listening socket.
impala-shell (with ssl enabled) connects to this socket and
times out after 2 sec.
2. Created a kerberized impala cluster with ssl enabled and
connected to the impalad using an openssl client (block the
beeswax server thread to accept new connection) -
E.g. - openssl s_client -connect <IP Addr>:21000
Used impala-shell to connect to the same impalad later.
impala-shell timed out after the default of 5 sec.I verified
it manually.
Change-Id: I130fc47f7a83f591918d6842634b4e5787d00813
Reviewed-on: http://gerrit.cloudera.org:8080/11540
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2fb8ebae
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2fb8ebae
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2fb8ebae
Branch: refs/heads/master
Commit: 2fb8ebaef2beecd511e963fadbb41cbb11add138
Parents: 5af5456
Author: aphadke <ap...@cloudera.com>
Authored: Thu Sep 20 16:51:23 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Oct 18 01:41:42 2018 +0000
----------------------------------------------------------------------
shell/impala_client.py | 29 +++++++++++++++++++++++------
shell/impala_shell.py | 17 +++++++++--------
shell/impala_shell_config_defaults.py | 1 +
shell/option_parser.py | 5 ++++-
tests/shell/test_shell_commandline.py | 15 +++++++++++++++
5 files changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 2f2a5e9..e53637a 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -18,6 +18,7 @@
# under the License.
import sasl
+import sys
import time
from beeswaxd import BeeswaxService
@@ -57,11 +58,16 @@ class DisconnectedException(Exception):
class QueryCancelledByShellException(Exception): pass
+
+def print_to_stderr(message):
+ print >> sys.stderr, message
+
class ImpalaClient(object):
def __init__(self, impalad, kerberos_host_fqdn, use_kerberos=False,
kerberos_service_name="impala", use_ssl=False, ca_cert=None, user=None,
- ldap_password=None, use_ldap=False):
+ ldap_password=None, use_ldap=False, client_connect_timeout_ms=5000,
+ verbose=True):
self.connected = False
self.impalad_host = impalad[0].encode('ascii', 'ignore')
self.impalad_port = int(impalad[1])
@@ -74,6 +80,7 @@ class ImpalaClient(object):
self.ca_cert = ca_cert
self.user, self.ldap_password = user, ldap_password
self.use_ldap = use_ldap
+ self.client_connect_timeout_ms = int(client_connect_timeout_ms)
self.default_query_options = {}
self.query_option_levels = {}
self.query_state = QueryState._NAMES_TO_VALUES
@@ -82,6 +89,7 @@ class ImpalaClient(object):
# from command line via CTRL+C. It is used to suppress error messages of
# query cancellation.
self.is_query_cancelled = False
+ self.verbose = verbose
def _options_to_string_list(self, set_query_options):
return ["%s=%s" % (k, v) for (k, v) in set_query_options.iteritems()]
@@ -250,8 +258,15 @@ class ImpalaClient(object):
self.transport = None
self.connected = False
- self.transport = self._get_transport()
+ sock, self.transport = self._get_socket_and_transport()
+ if self.client_connect_timeout_ms > 0:
+ sock.setTimeout(self.client_connect_timeout_ms)
self.transport.open()
+ if self.verbose:
+ print_to_stderr('Opened TCP connection to %s:%s' % (self.impalad_host,
+ self.impalad_port))
+ # Setting a timeout of None disables timeouts on sockets
+ sock.setTimeout(None)
protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
self.imp_service = ImpalaService.Client(protocol)
result = self.ping_impala_service()
@@ -266,7 +281,7 @@ class ImpalaClient(object):
if self.transport:
self.transport.close()
- def _get_transport(self):
+ def _get_socket_and_transport(self):
"""Create a Transport.
A non-kerberized impalad just needs a simple buffered transport. For
@@ -274,6 +289,7 @@ class ImpalaClient(object):
If SSL is enabled, a TSSLSocket underlies the transport stack; otherwise a TSocket
is used.
+ This function returns the socket and the transport object.
"""
if self.use_ssl:
# TSSLSocket needs the ssl module, which may not be standard on all Operating
@@ -304,7 +320,8 @@ class ImpalaClient(object):
else:
sock = TSocket(sock_host, sock_port)
if not (self.use_ldap or self.use_kerberos):
- return TBufferedTransport(sock)
+ return sock, TBufferedTransport(sock)
+
# Initializes a sasl client
def sasl_factory():
sasl_client = sasl.Client()
@@ -318,9 +335,9 @@ class ImpalaClient(object):
return sasl_client
# GSSASPI is the underlying mechanism used by kerberos to authenticate.
if self.use_kerberos:
- return TSaslClientTransport(sasl_factory, "GSSAPI", sock)
+ return sock, TSaslClientTransport(sasl_factory, "GSSAPI", sock)
else:
- return TSaslClientTransport(sasl_factory, "PLAIN", sock)
+ return sock, TSaslClientTransport(sasl_factory, "PLAIN", sock)
def create_beeswax_query(self, query_str, set_query_options):
"""Create a beeswax query object from a query string"""
http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index a7b1ac8..9277071 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -168,7 +168,7 @@ class ImpalaShell(object, cmd.Cmd):
self.ldap_password = options.ldap_password
self.ldap_password_cmd = options.ldap_password_cmd
self.use_ldap = options.use_ldap
-
+ self.client_connect_timeout_ms = options.client_connect_timeout_ms
self.verbose = options.verbose
self.prompt = ImpalaShell.DISCONNECTED_PROMPT
self.server_version = ImpalaShell.UNKNOWN_SERVER_VERSION
@@ -518,7 +518,7 @@ class ImpalaShell(object, cmd.Cmd):
return ImpalaClient(self.impalad, self.kerberos_host_fqdn, self.use_kerberos,
self.kerberos_service_name, self.use_ssl,
self.ca_cert, self.user, self.ldap_password,
- self.use_ldap)
+ self.use_ldap, self.client_connect_timeout_ms, self.verbose)
def _signal_handler(self, signal, frame):
"""Handles query cancellation on a Ctrl+C event"""
@@ -818,12 +818,13 @@ class ImpalaShell(object, cmd.Cmd):
print_to_stderr("Unable to import the python 'ssl' module. It is"
" required for an SSL-secured connection.")
sys.exit(1)
- except socket.error, (code, e):
+ except socket.error, e:
# if the socket was interrupted, reconnect the connection with the client
- if code == errno.EINTR:
+ if e.errno == errno.EINTR:
self._reconnect_cancellation()
else:
- print_to_stderr("Socket error %s: %s" % (code, e))
+ print_to_stderr("Socket error %s: %s" % (e.errno, e))
+ self.imp_client.close_connection()
self.prompt = self.DISCONNECTED_PROMPT
except Exception, e:
if self.ldap_password_cmd and \
@@ -1507,7 +1508,7 @@ def parse_variables(keyvals):
def replace_variables(set_variables, string):
"""Replaces variable within the string with their corresponding values using the
- given set_variables."""
+ given set_variables."""
errors = False
matches = set(map(lambda v: v.upper(), re.findall(r'(?<!\\)\${([^}]+)}', string)))
for name in matches:
@@ -1537,8 +1538,8 @@ def replace_variables(set_variables, string):
def get_var_name(name):
- """Look for a namespace:var_name pattern in an option name.
- Return the variable name if it's a match or None otherwise.
+ """Looks for a namespace:var_name pattern in an option name.
+ Returns the variable name if it's a match or None otherwise.
"""
ns_match = re.match(r'^([^:]*):(.*)', name)
if ns_match is not None:
http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/impala_shell_config_defaults.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell_config_defaults.py b/shell/impala_shell_config_defaults.py
index 260e93e..be7685b 100644
--- a/shell/impala_shell_config_defaults.py
+++ b/shell/impala_shell_config_defaults.py
@@ -51,4 +51,5 @@ impala_shell_defaults = {
'verbose': True,
'version': False,
'write_delimited': False,
+ 'client_connect_timeout_ms': 5000,
}
http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/shell/option_parser.py
----------------------------------------------------------------------
diff --git a/shell/option_parser.py b/shell/option_parser.py
index 000e319..ccae53b 100755
--- a/shell/option_parser.py
+++ b/shell/option_parser.py
@@ -221,7 +221,10 @@ def get_option_parser(defaults):
" It must follow the pattern \"KEY=VALUE\","
" KEY must be a valid query option. Valid query options "
" can be listed by command 'set'.")
-
+ parser.add_option("-t", "--client_connect_timeout_ms",
+ help="Timeout in milliseconds after which impala-shell will time out"
+ " if it fails to connect to Impala server. Set to 0 to disable any"
+ " timeout.")
# add default values to the help text
for option in parser.option_list:
# since the quiet flag is the same as the verbose flag
http://git-wip-us.apache.org/repos/asf/impala/blob/2fb8ebae/tests/shell/test_shell_commandline.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py
index b74fbc2..fd18230 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -32,6 +32,7 @@ from tests.common.skip import SkipIf
from time import sleep, time
from util import IMPALAD, SHELL_CMD
from util import assert_var_substitution, run_impala_shell_cmd, ImpalaShell
+from contextlib import closing
DEFAULT_QUERY = 'select 1'
QUERY_FILE_PATH = os.path.join(os.environ['IMPALA_HOME'], 'tests', 'shell')
@@ -757,3 +758,17 @@ class TestImpalaShell(ImpalaTestSuite):
find_query_option("duplicate", test_input)
with pytest.raises(AssertionError):
find_query_option("not_an_option", test_input)
+
+ def test_impala_shell_timeout(self):
+ """Tests that impala shell times out during connect.
+ This creates a random listening socket and we try to connect to this
+ socket through the impala-shell. The impala-shell should timeout and not hang
+ indefinitely while connecting
+ """
+ with closing(socket.socket()) as s:
+ s.bind(("", 0))
+ # maximum number of queued connections on this socket is 1.
+ s.listen(1)
+ test_port = s.getsockname()[1]
+ args = '-q "select foo; select bar;" --ssl -t 2000 -i localhost:%d' % (test_port)
+ run_impala_shell_cmd(args, expect_success=False)