You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/01 17:24:29 UTC

[4/6] impala git commit: IMPALA-6490: Reconnect shell when remote restarts

IMPALA-6490: Reconnect shell when remote restarts

If the remote impalad died while the shell waited for a
command to complete, the shell disconnected. Previously
after restarting the remote impalad, we needed to run
"connect;" to reconnect, now the shell will automatically
reconnect.

Testing:
Added test_auto_connect_after_impalad_died in
test_shell_interactive_reconnect.py

Change-Id: Ia13365a9696886f01294e98054cf4e7cd66ab712
Reviewed-on: http://gerrit.cloudera.org:8080/10992
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/72db58ac
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/72db58ac
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/72db58ac

Branch: refs/heads/master
Commit: 72db58acd054785fa6f57ef775e13e07ea0920a6
Parents: 672a271
Author: Nghia Le <mi...@gmail.com>
Authored: Wed Jul 18 18:22:08 2018 +0200
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 21:50:33 2018 +0000

----------------------------------------------------------------------
 shell/impala_client.py                          | 14 +++++---
 shell/impala_shell.py                           | 22 ++++++------
 .../test_shell_interactive_reconnect.py         | 37 ++++++++++++++++++--
 3 files changed, 56 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/72db58ac/shell/impala_client.py
----------------------------------------------------------------------
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 69c7699..2f2a5e9 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -30,7 +30,7 @@ from thrift.protocol import TBinaryProtocol
 from thrift_sasl import TSaslClientTransport
 from thrift.transport.TSocket import TSocket
 from thrift.transport.TTransport import TBufferedTransport, TTransportException
-from thrift.Thrift import TApplicationException
+from thrift.Thrift import TApplicationException, TException
 
 class RpcStatus:
   """Convenience enum to describe Rpc return statuses"""
@@ -229,11 +229,15 @@ class ImpalaClient(object):
       output += first_child_output
     return idx
 
-  def test_connection(self):
-    """Checks to see if the current Impala connection is still alive. If not, an exception
-    will be raised."""
+  def is_connected(self):
+    """Returns True if the current Impala connection is alive and False otherwise."""
     if self.connected:
-      self.imp_service.PingImpalaService()
+      try:
+        return self.imp_service.PingImpalaService()
+      except TException:
+        return False
+    else:
+      return False
 
   def connect(self):
     """Creates a connection to an Impalad instance

http://git-wip-us.apache.org/repos/asf/impala/blob/72db58ac/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index aab478d..4348a4e 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -43,7 +43,6 @@ from option_parser import get_option_parser, get_config_from_file
 from shell_output import DelimitedOutputFormatter, OutputStream, PrettyOutputFormatter
 from shell_output import OverwritingStdErrOutputStream
 from subprocess import call
-from thrift.Thrift import TException
 
 VERSION_FORMAT = "Impala Shell v%(version)s (%(git_hash)s) built on %(build_date)s"
 VERSION_STRING = "build version not available"
@@ -231,6 +230,8 @@ class ImpalaShell(object, cmd.Cmd):
 
     if options.impalad is not None:
       self.do_connect(options.impalad)
+      # Check if the database in shell option exists
+      self._validate_database(immediately=True)
 
     # We handle Ctrl-C ourselves, using an Event object to signal cancellation
     # requests between the handler and the main shell thread.
@@ -568,6 +569,10 @@ class ImpalaShell(object, cmd.Cmd):
     else:
       return query
 
+  def set_prompt(self, db):
+    self.prompt = ImpalaShell.PROMPT_FORMAT.format(
+        host=self.impalad[0], port=self.impalad[1], db=db)
+
   def precmd(self, args):
     args = self.sanitise_input(args)
     if not args: return args
@@ -581,9 +586,7 @@ class ImpalaShell(object, cmd.Cmd):
       # If cmdqueue is populated, then commands are executed from the cmdqueue, and user
       # input is ignored. Send an empty string as the user input just to be safe.
       return str()
-    try:
-      self.imp_client.test_connection()
-    except TException:
+    if not self.imp_client.is_connected():
       print_to_stderr("Connection lost, reconnecting...")
       self._connect()
       self._validate_database(immediately=True)
@@ -812,8 +815,7 @@ class ImpalaShell(object, cmd.Cmd):
     if self.imp_client.connected:
       self._print_if_verbose('Connected to %s:%s' % self.impalad)
       self._print_if_verbose('Server version: %s' % self.server_version)
-      self.prompt = ImpalaShell.PROMPT_FORMAT.format(
-        host=self.impalad[0], port=self.impalad[1], db=ImpalaShell.DEFAULT_DB)
+      self.set_prompt(ImpalaShell.DEFAULT_DB)
       self._validate_database()
     try:
       self.imp_client.build_default_query_options_dict()
@@ -883,10 +885,12 @@ class ImpalaShell(object, cmd.Cmd):
     If immediately is False, it appends the USE command to self.cmdqueue.
     If immediately is True, it executes the USE command right away.
     """
+    if not self.imp_client.connected:
+      return
+    # Should only check if successfully connected.
     if self.current_db:
       self.current_db = self.current_db.strip('`')
       use_current_db = ('use `%s`' % self.current_db)
-
       if immediately:
         self.onecmd(use_current_db)
       else:
@@ -1185,9 +1189,7 @@ class ImpalaShell(object, cmd.Cmd):
     query = self._create_beeswax_query(args)
     if self._execute_stmt(query) is CmdStatus.SUCCESS:
       self.current_db = args.strip('`').strip()
-      self.prompt = ImpalaShell.PROMPT_FORMAT.format(host=self.impalad[0],
-                                                     port=self.impalad[1],
-                                                     db=self.current_db)
+      self.set_prompt(self.current_db)
     elif args.strip('`') == self.current_db:
       # args == current_db means -d option was passed but the "use [db]" operation failed.
       # We need to set the current_db to None so that it does not show a database, which

http://git-wip-us.apache.org/repos/asf/impala/blob/72db58ac/tests/custom_cluster/test_shell_interactive_reconnect.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_shell_interactive_reconnect.py b/tests/custom_cluster/test_shell_interactive_reconnect.py
index 1f82468..c747139 100644
--- a/tests/custom_cluster/test_shell_interactive_reconnect.py
+++ b/tests/custom_cluster/test_shell_interactive_reconnect.py
@@ -18,11 +18,18 @@
 import pytest
 import tempfile
 import socket
+import pexpect
+import os
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_service import ImpaladService
 from tests.common.skip import SkipIfBuildType
 from tests.shell.util import ImpalaShell, move_shell_history, restore_shell_history
+# Follow tests/shell/test_shell_interactive.py naming.
+from shell.impala_shell import ImpalaShell as ImpalaShellClass
+
+SHELL_CMD = "%s/bin/impala-shell.sh" % os.environ['IMPALA_HOME']
+NUM_QUERIES = 'impala-server.num-queries'
 
 class TestShellInteractiveReconnect(CustomClusterTestSuite):
   """ Check if interactive shell is using the current DB after reconnecting """
@@ -54,8 +61,6 @@ class TestShellInteractiveReconnect(CustomClusterTestSuite):
 
   @pytest.mark.execute_serially
   def test_auto_reconnect(self):
-    NUM_QUERIES = 'impala-server.num-queries'
-
     impalad = ImpaladService(socket.getfqdn())
     start_num_queries = impalad.get_metric_value(NUM_QUERIES)
 
@@ -72,3 +77,31 @@ class TestShellInteractiveReconnect(CustomClusterTestSuite):
     result = p.get_result()
     assert "alltypesaggmultifilesnopart" in result.stdout
 
+  @pytest.mark.execute_serially
+  def test_auto_reconnect_after_impalad_died(self):
+    """Test reconnect after restarting the remote impalad without using connect;"""
+    # Use pexpect instead of ImpalaShell() since after using get_result() in ImpalaShell()
+    # to check Disconnect, send_cmd() will no longer have any effect so we can not check
+    # reconnect.
+    impalad = ImpaladService(socket.getfqdn())
+    start_num_queries = impalad.get_metric_value(NUM_QUERIES)
+
+    proc = pexpect.spawn(' '.join([SHELL_CMD, "-i localhost:21000"]))
+    proc.expect("21000] default>")
+    proc.sendline("use tpch;")
+
+    # wait for the USE command to finish
+    impalad.wait_for_metric_value(NUM_QUERIES, start_num_queries + 1)
+    impalad.wait_for_num_in_flight_queries(0)
+
+    # Disconnect
+    self.cluster.impalads[0].kill()
+    proc.sendline("show tables;")
+    # Search from [1:] since the square brackets "[]" are special characters in regex
+    proc.expect(ImpalaShellClass.DISCONNECTED_PROMPT[1:])
+    # Restarting Impalad
+    self.cluster.impalads[0].start()
+    # Check reconnect
+    proc.sendline("show tables;")
+    proc.expect("nation")
+    proc.expect("21000] tpch>")