You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/01 17:24:29 UTC
[4/6] impala git commit: IMPALA-6490: Reconnect shell when remote
restarts
IMPALA-6490: Reconnect shell when remote restarts
If the remote impalad died while the shell waited for a
command to complete, the shell disconnected. Previously
after restarting the remote impalad, we needed to run
"connect;" to reconnect, now the shell will automatically
reconnect.
Testing:
Added test_auto_connect_after_impalad_died in
test_shell_interactive_reconnect.py
Change-Id: Ia13365a9696886f01294e98054cf4e7cd66ab712
Reviewed-on: http://gerrit.cloudera.org:8080/10992
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/72db58ac
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/72db58ac
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/72db58ac
Branch: refs/heads/master
Commit: 72db58acd054785fa6f57ef775e13e07ea0920a6
Parents: 672a271
Author: Nghia Le <mi...@gmail.com>
Authored: Wed Jul 18 18:22:08 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 21:50:33 2018 +0000
----------------------------------------------------------------------
shell/impala_client.py | 14 +++++---
shell/impala_shell.py | 22 ++++++------
.../test_shell_interactive_reconnect.py | 37 ++++++++++++++++++--
3 files changed, 56 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/72db58ac/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 69c7699..2f2a5e9 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -30,7 +30,7 @@ from thrift.protocol import TBinaryProtocol
from thrift_sasl import TSaslClientTransport
from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport, TTransportException
-from thrift.Thrift import TApplicationException
+from thrift.Thrift import TApplicationException, TException
class RpcStatus:
"""Convenience enum to describe Rpc return statuses"""
@@ -229,11 +229,15 @@ class ImpalaClient(object):
output += first_child_output
return idx
- def test_connection(self):
- """Checks to see if the current Impala connection is still alive. If not, an exception
- will be raised."""
+ def is_connected(self):
+ """Returns True if the current Impala connection is alive and False otherwise."""
if self.connected:
- self.imp_service.PingImpalaService()
+ try:
+ return self.imp_service.PingImpalaService()
+ except TException:
+ return False
+ else:
+ return False
def connect(self):
"""Creates a connection to an Impalad instance
http://git-wip-us.apache.org/repos/asf/impala/blob/72db58ac/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index aab478d..4348a4e 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -43,7 +43,6 @@ from option_parser import get_option_parser, get_config_from_file
from shell_output import DelimitedOutputFormatter, OutputStream, PrettyOutputFormatter
from shell_output import OverwritingStdErrOutputStream
from subprocess import call
-from thrift.Thrift import TException
VERSION_FORMAT = "Impala Shell v%(version)s (%(git_hash)s) built on %(build_date)s"
VERSION_STRING = "build version not available"
@@ -231,6 +230,8 @@ class ImpalaShell(object, cmd.Cmd):
if options.impalad is not None:
self.do_connect(options.impalad)
+ # Check if the database in shell option exists
+ self._validate_database(immediately=True)
# We handle Ctrl-C ourselves, using an Event object to signal cancellation
# requests between the handler and the main shell thread.
@@ -568,6 +569,10 @@ class ImpalaShell(object, cmd.Cmd):
else:
return query
+ def set_prompt(self, db):
+ self.prompt = ImpalaShell.PROMPT_FORMAT.format(
+ host=self.impalad[0], port=self.impalad[1], db=db)
+
def precmd(self, args):
args = self.sanitise_input(args)
if not args: return args
@@ -581,9 +586,7 @@ class ImpalaShell(object, cmd.Cmd):
# If cmdqueue is populated, then commands are executed from the cmdqueue, and user
# input is ignored. Send an empty string as the user input just to be safe.
return str()
- try:
- self.imp_client.test_connection()
- except TException:
+ if not self.imp_client.is_connected():
print_to_stderr("Connection lost, reconnecting...")
self._connect()
self._validate_database(immediately=True)
@@ -812,8 +815,7 @@ class ImpalaShell(object, cmd.Cmd):
if self.imp_client.connected:
self._print_if_verbose('Connected to %s:%s' % self.impalad)
self._print_if_verbose('Server version: %s' % self.server_version)
- self.prompt = ImpalaShell.PROMPT_FORMAT.format(
- host=self.impalad[0], port=self.impalad[1], db=ImpalaShell.DEFAULT_DB)
+ self.set_prompt(ImpalaShell.DEFAULT_DB)
self._validate_database()
try:
self.imp_client.build_default_query_options_dict()
@@ -883,10 +885,12 @@ class ImpalaShell(object, cmd.Cmd):
If immediately is False, it appends the USE command to self.cmdqueue.
If immediately is True, it executes the USE command right away.
"""
+ if not self.imp_client.connected:
+ return
+ # Should only check if successfully connected.
if self.current_db:
self.current_db = self.current_db.strip('`')
use_current_db = ('use `%s`' % self.current_db)
-
if immediately:
self.onecmd(use_current_db)
else:
@@ -1185,9 +1189,7 @@ class ImpalaShell(object, cmd.Cmd):
query = self._create_beeswax_query(args)
if self._execute_stmt(query) is CmdStatus.SUCCESS:
self.current_db = args.strip('`').strip()
- self.prompt = ImpalaShell.PROMPT_FORMAT.format(host=self.impalad[0],
- port=self.impalad[1],
- db=self.current_db)
+ self.set_prompt(self.current_db)
elif args.strip('`') == self.current_db:
# args == current_db means -d option was passed but the "use [db]" operation failed.
# We need to set the current_db to None so that it does not show a database, which
http://git-wip-us.apache.org/repos/asf/impala/blob/72db58ac/tests/custom_cluster/test_shell_interactive_reconnect.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_shell_interactive_reconnect.py b/tests/custom_cluster/test_shell_interactive_reconnect.py
index 1f82468..c747139 100644
--- a/tests/custom_cluster/test_shell_interactive_reconnect.py
+++ b/tests/custom_cluster/test_shell_interactive_reconnect.py
@@ -18,11 +18,18 @@
import pytest
import tempfile
import socket
+import pexpect
+import os
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.impala_service import ImpaladService
from tests.common.skip import SkipIfBuildType
from tests.shell.util import ImpalaShell, move_shell_history, restore_shell_history
+# Follow tests/shell/test_shell_interactive.py naming.
+from shell.impala_shell import ImpalaShell as ImpalaShellClass
+
+SHELL_CMD = "%s/bin/impala-shell.sh" % os.environ['IMPALA_HOME']
+NUM_QUERIES = 'impala-server.num-queries'
class TestShellInteractiveReconnect(CustomClusterTestSuite):
""" Check if interactive shell is using the current DB after reconnecting """
@@ -54,8 +61,6 @@ class TestShellInteractiveReconnect(CustomClusterTestSuite):
@pytest.mark.execute_serially
def test_auto_reconnect(self):
- NUM_QUERIES = 'impala-server.num-queries'
-
impalad = ImpaladService(socket.getfqdn())
start_num_queries = impalad.get_metric_value(NUM_QUERIES)
@@ -72,3 +77,31 @@ class TestShellInteractiveReconnect(CustomClusterTestSuite):
result = p.get_result()
assert "alltypesaggmultifilesnopart" in result.stdout
+ @pytest.mark.execute_serially
+ def test_auto_reconnect_after_impalad_died(self):
+ """Test reconnect after restarting the remote impalad without using connect;"""
+ # Use pexpect instead of ImpalaShell() since after using get_result() in ImpalaShell()
+ # to check Disconnect, send_cmd() will no longer have any effect so we can not check
+ # reconnect.
+ impalad = ImpaladService(socket.getfqdn())
+ start_num_queries = impalad.get_metric_value(NUM_QUERIES)
+
+ proc = pexpect.spawn(' '.join([SHELL_CMD, "-i localhost:21000"]))
+ proc.expect("21000] default>")
+ proc.sendline("use tpch;")
+
+ # wait for the USE command to finish
+ impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 1)
+ impalad.wait_for_num_in_flight_queries(0)
+
+ # Disconnect
+ self.cluster.impalads[0].kill()
+ proc.sendline("show tables;")
+ # Search from [1:] since the square brackets "[]" are special characters in regex
+ proc.expect(ImpalaShellClass.DISCONNECTED_PROMPT[1:])
+ # Restarting Impalad
+ self.cluster.impalads[0].start()
+ # Check reconnect
+ proc.sendline("show tables;")
+ proc.expect("nation")
+ proc.expect("21000] tpch>")