You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/03/29 06:37:30 UTC

[impala] branch master updated (e9dd5d3 -> 2f53783)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from e9dd5d3  IMPALA-9560: Fix TestStatsExtrapolation for release versions
     new fc784f6  IMPALA-9466: impala-shell client retry for hs2-http protocol
     new 2f53783  IMPALA-9401: primitive include-what-you-use script and mappings

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 bin/iwyu/iwyu.sh                                 |  74 +++++
 bin/iwyu/iwyu_mappings.imp                       |  43 +++
 shell/impala_client.py                           | 221 +++++++++++----
 shell/impala_shell.py                            |   3 +
 shell/shell_exceptions.py                        |   3 +-
 tests/custom_cluster/test_hs2_fault_injection.py | 329 +++++++++++++++++++++++
 6 files changed, 622 insertions(+), 51 deletions(-)
 create mode 100755 bin/iwyu/iwyu.sh
 create mode 100644 bin/iwyu/iwyu_mappings.imp
 create mode 100644 tests/custom_cluster/test_hs2_fault_injection.py


[impala] 01/02: IMPALA-9466: impala-shell client retry for hs2-http protocol

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit fc784f6e95dc633fa9a3504914646b1f793a8b93
Author: Abhishek Rawat <ar...@cloudera.com>
AuthorDate: Mon Feb 24 11:01:41 2020 -0800

    IMPALA-9466: impala-shell client retry for hs2-http protocol
    
    Added retries for idempotent rpcs:
    OpenSession, PingImpalaHS2Service, GetResultSetMetadata,
    CloseImpalaOperation (non dmls), CancelOperation, GetOperationStatus,
    GetRuntimeProfile, GetExecSummary, GetLog
    
    Retries were also added to the 'set all' query execution and subsequent
    result fetch in the ImpalaHS2Client._open_session()
    
    The retries are only supported for hs2-http protocol and enabled by
    default. At most there are 3 retries for a failed rpc. There is a sleep
    duration of 'n' seconds after nth retry.
    
    Only failed rpcs due to an error in the http transport are retried and
    if an rpc failed because the server returned an error in the rpc
    response then such scenarios are not retriable.
    
    Improved error diagnostics by dumping stack trace when ImpalaShell.
    _execute_stmt() gets an 'Unknown Exception'.
    
    Testing:
    - Added a custom_cluster test which injects fault into the http transport
    and checks expected behavior from the various rpcs. Some of these tests
    leave the session in an open state and so these tests are not suitable
    for the e2e test framework which have metric verifiers expecting related
    metrics to be 0 at the end of the test.
    - Manually tested real world scenarios with impala-shell client
    communicating with an impala coordinator via a fault injecting istio mesh.
    - Manually tested dropping connections on an nginx ingress gateway by sending
    SIGTERM to all worker processes.
    
    Change-Id: I0da9e9e8d34a340eaf763397cc095ff6260d65d5
    Reviewed-on: http://gerrit.cloudera.org:8080/15378
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 shell/impala_client.py                           | 221 +++++++++++----
 shell/impala_shell.py                            |   3 +
 shell/shell_exceptions.py                        |   3 +-
 tests/custom_cluster/test_hs2_fault_injection.py | 329 +++++++++++++++++++++++
 4 files changed, 505 insertions(+), 51 deletions(-)

diff --git a/shell/impala_client.py b/shell/impala_client.py
index 6feaa8a..1cf3995 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -84,6 +84,11 @@ HS2_VALUE_CONVERTERS = {
 # the Impala server.
 HS2_LOG_PROGRESS_REGEX = re.compile("Query.*Complete \([0-9]* out of [0-9]*\)\n")
 
+# Exception types to differentiate between the different RPCExceptions.
+# RPCException raised when TApplicationException is caught.
+RPC_EXCEPTION_TAPPLICATION = "TAPPLICATION_EXCEPTION"
+# RPCException raised when impala server sends a TStatusCode.ERROR_STATUS status code.
+RPC_EXCEPTION_SERVER = "SERVER_ERROR"
 
 class QueryOptionLevels:
   """These are the levels used when displaying query options.
@@ -197,7 +202,7 @@ class ImpalaClient(object):
   def _close_transport(self):
     """Closes transport if not closed and set self.connected to False. This is the last
     step of close_connection()."""
-    if self.transport and not self.transport.isOpen():
+    if self.transport and self.transport.isOpen():
       self.transport.close()
     self.connected = False
 
@@ -600,14 +605,33 @@ class ImpalaHS2Client(ImpalaClient):
     # If connected, this is the handle returned by the OpenSession RPC that needs
     # to be passed into most HS2 RPCs.
     self.session_handle = None
+    # Enable retries only for hs2-http protocol.
+    if self.use_http_base_transport:
+      # Maximum number of tries for idempotent rpcs.
+      self.max_tries = 4
+    else:
+      self.max_tries = 1
+    # Minimum sleep interval between retry attempts.
+    self.min_sleep_interval = 1
 
   def _get_thrift_client(self, protocol):
     return ImpalaHiveServer2Service.Client(protocol)
 
+  def _get_sleep_interval_for_retries(self, num_tries):
+    """Returns the sleep interval in seconds for the 'num_tries' retry attempt."""
+    assert num_tries > 0 and num_tries < self.max_tries
+    return self.min_sleep_interval * (num_tries - 1)
+
   def _open_session(self):
     open_session_req = TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6,
         username=self.user)
-    resp = self._do_hs2_rpc(lambda: self.imp_service.OpenSession(open_session_req))
+
+    def OpenSession():
+      return self.imp_service.OpenSession(open_session_req)
+    # OpenSession rpcs are idempotent and so ok to retry. If the client gets disconnected
+    # and the server successfully opened a session, the client will retry and rely on
+    # server to clean up the session.
+    resp = self._do_hs2_rpc(OpenSession, retry_on_error=True)
     self._check_hs2_rpc_status(resp.status)
     assert (resp.serverProtocolVersion ==
             TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6), resp.serverProtocolVersion
@@ -615,18 +639,43 @@ class ImpalaHS2Client(ImpalaClient):
     self.session_handle = resp.sessionHandle
 
     # List all of the query options and their levels.
-    set_all_handle = self.execute_query("set all", {})
-    self.default_query_options = {}
-    self.query_option_levels = {}
-    for rows in self.fetch(set_all_handle):
-      for name, value, level in rows:
-        self.default_query_options[name.upper()] = value
-        self.query_option_levels[name.upper()] = QueryOptionLevels.from_string(level)
-    try:
-      self.close_query(set_all_handle)
-    except Exception as e:
-      print('{0} {1}'.format(str(e), type(e)), file=sys.stderr)
-      raise
+    # Retrying "set all" should be idempotent
+    num_tries = 1
+    while num_tries <= self.max_tries:
+      raise_error = (num_tries == self.max_tries)
+      set_all_handle = None
+      if self.max_tries > 1:
+        retry_msg = 'Num remaining tries: {0}'.format(self.max_tries - num_tries)
+      try:
+        set_all_handle = self.execute_query("set all", {})
+        self.default_query_options = {}
+        self.query_option_levels = {}
+        for rows in self.fetch(set_all_handle):
+          for name, value, level in rows:
+            self.default_query_options[name.upper()] = value
+            self.query_option_levels[name.upper()] = QueryOptionLevels.from_string(level)
+        break
+      except (QueryCancelledByShellException, MissingThriftMethodException,
+        QueryStateException):
+        raise
+      except RPCException as r:
+        if (r.exception_type == RPC_EXCEPTION_TAPPLICATION or
+          r.exception_type == RPC_EXCEPTION_SERVER):
+          raise
+        print('Caught exception {0}, type={1} when listing query options. {2}'
+          .format(str(r), type(r), retry_msg), file=sys.stderr)
+        if raise_error:
+          raise
+      except Exception as e:
+        print('Caught exception {0}, type={1} when listing query options. {2}'
+          .format(str(e), type(e), retry_msg), file=sys.stderr)
+        if raise_error:
+          raise
+      finally:
+        if set_all_handle is not None:
+          self.close_query(set_all_handle)
+      time.sleep(self._get_sleep_interval_for_retries(num_tries))
+      num_tries += 1
 
   def close_connection(self):
     if self.session_handle is not None:
@@ -635,7 +684,12 @@ class ImpalaHS2Client(ImpalaClient):
       # server to clean up the session.
       try:
         req = TCloseSessionReq(self.session_handle)
-        resp = self._do_hs2_rpc(lambda: self.imp_service.CloseSession(req))
+
+        def CloseSession():
+          return self.imp_service.CloseSession(req)
+        # CloseSession rpcs don't need retries since we catch all exceptions and close
+        # transport.
+        resp = self._do_hs2_rpc(CloseSession)
         self._check_hs2_rpc_status(resp.status)
       except Exception as e:
         print("Warning: close session RPC failed: {0}, {1}".format(str(e), type(e)))
@@ -644,12 +698,11 @@ class ImpalaHS2Client(ImpalaClient):
 
   def _ping_impala_service(self):
     req = TPingImpalaHS2ServiceReq(self.session_handle)
-    try:
-      resp = self.imp_service.PingImpalaHS2Service(req)
-    except TApplicationException as t:
-      if t.type == TApplicationException.UNKNOWN_METHOD:
-        raise MissingThriftMethodException(t.message)
-      raise
+
+    def PingImpalaHS2Service():
+      return self.imp_service.PingImpalaHS2Service(req)
+    # PingImpalaHS2Service rpc is idempotent and so safe to retry.
+    resp = self._do_hs2_rpc(PingImpalaHS2Service, retry_on_error=True)
     self._check_hs2_rpc_status(resp.status)
     return (resp.version, resp.webserver_address)
 
@@ -664,13 +717,23 @@ class ImpalaHS2Client(ImpalaClient):
     ImpalaClient method calls for the query."""
     query = self._create_query_req(query_str, set_query_options)
     self.is_query_cancelled = False
-    resp = self._do_hs2_rpc(lambda: self.imp_service.ExecuteStatement(query))
+
+    def ExecuteStatement():
+      return self.imp_service.ExecuteStatement(query)
+    # Read queries should be idempotent but most dml queries are not. Also retrying
+    # query execution from client could be expensive and so likely makes sense to do
+    # it if server is also aware of the retries.
+    resp = self._do_hs2_rpc(ExecuteStatement)
     if resp.status.statusCode != TStatusCode.SUCCESS_STATUS:
       raise QueryStateException("ERROR: {0}".format(resp.status.errorMessage))
     handle = resp.operationHandle
     if handle.hasResultSet:
       req = TGetResultSetMetadataReq(handle)
-      resp = self._do_hs2_rpc(lambda: self.imp_service.GetResultSetMetadata(req))
+
+      def GetResultSetMetadata():
+        return self.imp_service.GetResultSetMetadata(req)
+      # GetResultSetMetadata rpc is idempotent and should be safe to retry.
+      resp = self._do_hs2_rpc(GetResultSetMetadata, retry_on_error=True)
       self._check_hs2_rpc_status(resp.status)
       assert resp.schema is not None, resp
       # Attach the schema to the handle for convenience.
@@ -706,7 +769,12 @@ class ImpalaHS2Client(ImpalaClient):
     while True:
       req = TFetchResultsReq(query_handle, TFetchOrientation.FETCH_NEXT,
           self.fetch_batch_size)
-      resp = self._do_hs2_rpc(lambda: self.imp_service.FetchResults(req))
+
+      def FetchResults():
+        return self.imp_service.FetchResults(req)
+      # FetchResults rpc is not idempotent unless the client and server communicate and
+      # results are kept around for retry to be successful.
+      resp = self._do_hs2_rpc(FetchResults)
       self._check_hs2_rpc_status(resp.status)
 
       # Transpose the columns into a row-based format for more convenient processing
@@ -741,7 +809,11 @@ class ImpalaHS2Client(ImpalaClient):
 
   def close_dml(self, last_query_handle):
     req = TCloseImpalaOperationReq(last_query_handle)
-    resp = self._do_hs2_rpc(lambda: self.imp_service.CloseImpalaOperation(req))
+
+    def CloseImpalaOperation():
+      return self.imp_service.CloseImpalaOperation(req)
+    # CloseImpalaOperation rpc is not idempotent for dmls.
+    resp = self._do_hs2_rpc(CloseImpalaOperation)
     self._check_hs2_rpc_status(resp.status)
     if not resp.dml_result:
       raise RPCException("Impala DML operation did not return DML statistics.")
@@ -755,7 +827,11 @@ class ImpalaHS2Client(ImpalaClient):
     if last_query_handle.is_closed:
       return True
     req = TCloseImpalaOperationReq(last_query_handle)
-    resp = self._do_hs2_rpc(lambda: self.imp_service.CloseImpalaOperation(req))
+
+    def CloseImpalaOperation():
+      return self.imp_service.CloseImpalaOperation(req)
+    # CloseImpalaOperation rpc is idempotent for non dml queries and so safe to retry.
+    resp = self._do_hs2_rpc(CloseImpalaOperation, retry_on_error=True)
     last_query_handle.is_closed = True
     return self._is_hs2_nonerror_status(resp.status.statusCode)
 
@@ -765,26 +841,40 @@ class ImpalaHS2Client(ImpalaClient):
     if last_query_handle.is_closed:
       return True
     req = TCancelOperationReq(last_query_handle)
-    resp = self._do_hs2_rpc(lambda: self.imp_service.CancelOperation(req),
-        suppress_error_on_cancel=False)
+
+    def CancelOperation():
+      return self.imp_service.CancelOperation(req)
+    # CancelOperation rpc is idempotent and so safe to retry.
+    resp = self._do_hs2_rpc(CancelOperation, retry_on_error=True)
     return self._is_hs2_nonerror_status(resp.status.statusCode)
 
   def get_query_state(self, last_query_handle):
-    resp = self._do_hs2_rpc(
-        lambda: self.imp_service.GetOperationStatus(
-          TGetOperationStatusReq(last_query_handle)))
+    req = TGetOperationStatusReq(last_query_handle)
+
+    def GetOperationStatus():
+      return self.imp_service.GetOperationStatus(req)
+    # GetOperationStatus rpc is idempotent and so safe to retry.
+    resp = self._do_hs2_rpc(GetOperationStatus, retry_on_error=True)
     self._check_hs2_rpc_status(resp.status)
     return resp.operationState
 
   def get_runtime_profile(self, last_query_handle):
     req = TGetRuntimeProfileReq(last_query_handle, self.session_handle)
-    resp = self._do_hs2_rpc(lambda: self.imp_service.GetRuntimeProfile(req))
+
+    def GetRuntimeProfile():
+      return self.imp_service.GetRuntimeProfile(req)
+    # GetRuntimeProfile rpc is idempotent and so safe to retry.
+    resp = self._do_hs2_rpc(GetRuntimeProfile, retry_on_error=True)
     self._check_hs2_rpc_status(resp.status)
     return resp.profile
 
   def get_summary(self, last_query_handle):
     req = TGetExecSummaryReq(last_query_handle, self.session_handle)
-    resp = self._do_hs2_rpc(lambda: self.imp_service.GetExecSummary(req))
+
+    def GetExecSummary():
+      return self.imp_service.GetExecSummary(req)
+    # GetExecSummary rpc is idempotent and so safe to retry.
+    resp = self._do_hs2_rpc(GetExecSummary, retry_on_error=True)
     self._check_hs2_rpc_status(resp.status)
     return resp.summary
 
@@ -804,7 +894,11 @@ class ImpalaHS2Client(ImpalaClient):
     if last_query_handle is None:
       return "Query could not be executed"
     req = TGetLogReq(last_query_handle)
-    resp = self._do_hs2_rpc(lambda: self.imp_service.GetLog(req))
+
+    def GetLog():
+      return self.imp_service.GetLog(req)
+    # GetLog rpc is idempotent and so safe to retry.
+    resp = self._do_hs2_rpc(GetLog, retry_on_error=True)
     self._check_hs2_rpc_status(resp.status)
 
     log = resp.log
@@ -815,27 +909,53 @@ class ImpalaHS2Client(ImpalaClient):
       return "%s: %s" % (type_str, log)
     return ""
 
-  def _do_hs2_rpc(self, rpc, suppress_error_on_cancel=True):
+  def _do_hs2_rpc(self, rpc, suppress_error_on_cancel=True, retry_on_error=False):
     """Executes the provided 'rpc' callable and tranlates any exceptions in the
-    appropriate exception for the shell. Exceptions raised include:
+    appropriate exception for the shell. The input 'rpc' must be a python function
+    with the __name__ attribute and not a lambda function. Exceptions raised include:
     * DisconnectedException if the client cannot communicate with the server.
     * QueryCancelledByShellException if 'suppress_error_on_cancel' is true, the RPC
       fails and the query was cancelled from the shell (i.e. self.is_query_cancelled).
     * MissingThriftMethodException if the thrift method is not implemented on the server.
-    Does not validate any status embedded in the returned RPC message."""
+    Does not validate any status embedded in the returned RPC message.
+    If 'retry_on_error' is true, the rpc is retried if an exception is raised. The maximum
+    number of tries is determined by 'self.max_tries'. Retries, if enabled, are attempted
+    for all exceptions other than TApplicationException."""
     self._check_connected()
-    try:
-      return rpc()
-    except TTransportException as e:
-      # issue with the connection with the impalad
-      raise DisconnectedException("Error communicating with impalad: %s" % e)
-    except TApplicationException as t:
-      # Suppress the errors from cancelling a query that is in waiting_to_finish state
-      if suppress_error_on_cancel and self.is_query_cancelled:
-        raise QueryCancelledByShellException()
-      if t.type == TApplicationException.UNKNOWN_METHOD:
-        raise MissingThriftMethodException(t.message)
-      raise RPCException("Application Exception : {0}".format(t))
+    num_tries = 1
+    max_tries = num_tries
+    if retry_on_error:
+      max_tries = self.max_tries
+    while num_tries <= max_tries:
+      raise_error = (num_tries == max_tries)
+      # Generate a retry message, only if retries and supported.
+      if retry_on_error and self.max_tries > 1:
+        retry_msg = 'Num remaining tries: {0}'.format(max_tries - num_tries)
+      else:
+        retry_msg = ''
+      try:
+        return rpc()
+      except TTransportException as e:
+        # issue with the connection with the impalad
+        print('Caught exception {0}, type={1} in {2}. {3}'
+          .format(str(e), type(e), rpc.__name__, retry_msg), file=sys.stderr)
+        if raise_error:
+          raise DisconnectedException("Error communicating with impalad: %s" % e)
+      except TApplicationException as t:
+        # Suppress the errors from cancelling a query that is in waiting_to_finish state
+        if suppress_error_on_cancel and self.is_query_cancelled:
+          raise QueryCancelledByShellException()
+        if t.type == TApplicationException.UNKNOWN_METHOD:
+          raise MissingThriftMethodException(t.message)
+        raise RPCException("Application Exception : {0}".format(t),
+          RPC_EXCEPTION_TAPPLICATION)
+      except Exception as e:
+        print('Caught exception {0}, type={1} in {2}. {3}'
+          .format(str(e), type(e), rpc.__name__, retry_msg), file=sys.stderr)
+        if raise_error:
+          raise
+      time.sleep(self._get_sleep_interval_for_retries(num_tries))
+      num_tries += 1
 
   def _check_hs2_rpc_status(self, status):
     """If the TCLIService.TStatus 'status' is an error status the raise an exception
@@ -848,7 +968,8 @@ class ImpalaHS2Client(ImpalaClient):
       # Suppress the errors from cancelling a query that is in fetch state
       if self.is_query_cancelled:
         raise QueryCancelledByShellException()
-      raise RPCException("ERROR: {0}".format(status.errorMessage))
+      raise RPCException("ERROR: {0}".format(status.errorMessage),
+        RPC_EXCEPTION_SERVER)
     elif status.statusCode == TStatusCode.INVALID_HANDLE_STATUS:
       if self.is_query_cancelled:
         raise QueryCancelledByShellException()
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 942ba41..353db6b 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -36,6 +36,7 @@ import subprocess
 import sys
 import textwrap
 import time
+import traceback
 
 from impala_client import ImpalaHS2Client, ImpalaBeeswaxClient, QueryOptionLevels
 from impala_shell_config_defaults import impala_shell_defaults
@@ -1220,6 +1221,8 @@ class ImpalaShell(object, cmd.Cmd):
       # if the exception is unknown, there was possibly an issue with the connection
       # set the shell as disconnected
       print('Unknown Exception : %s' % (e,), file=sys.stderr)
+      # Print the stack trace for the exception.
+      traceback.print_exc()
       self.close_connection()
       self.prompt = ImpalaShell.DISCONNECTED_PROMPT
     return CmdStatus.ERROR
diff --git a/shell/shell_exceptions.py b/shell/shell_exceptions.py
index efff4f7..800004b 100644
--- a/shell/shell_exceptions.py
+++ b/shell/shell_exceptions.py
@@ -17,8 +17,9 @@
 
 
 class RPCException(Exception):
-  def __init__(self, value=""):
+  def __init__(self, value="", exception_type=""):
     self.value = value
+    self.exception_type = exception_type
 
   def __str__(self):
     return self.value
diff --git a/tests/custom_cluster/test_hs2_fault_injection.py b/tests/custom_cluster/test_hs2_fault_injection.py
new file mode 100644
index 0000000..2393cf4
--- /dev/null
+++ b/tests/custom_cluster/test_hs2_fault_injection.py
@@ -0,0 +1,329 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import requests
+
+from shell.ImpalaHttpClient import ImpalaHttpClient
+from shell.impala_client import ImpalaHS2Client
+from tests.common.impala_test_suite import IMPALAD_HS2_HTTP_HOST_PORT
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from time import sleep
+
+
+class FaultInjectingHttpClient(ImpalaHttpClient, object):
+  """Class for injecting faults in the ImpalaHttpClient. Faults are injected by using the
+  'enable_fault' method. The 'flush' method is overridden to check for injected faults
+  and raise exceptions, if needed."""
+  def __init__(self, *args, **kwargs):
+    super(FaultInjectingHttpClient, self).__init__(*args, **kwargs)
+    self.fault_code = None
+    self.fault_message = None
+    self.fault_enabled = False
+    self.num_requests = 0
+    self.fault_frequency = 0
+    self.fault_enabled = False
+
+  def enable_fault(self, http_code, http_message, fault_frequency):
+    """Inject fault with given code and message at the given frequency.
+    As an example, if frequency is 20% then inject fault for 1 out of every 5
+    requests."""
+    self.fault_enabled = True
+    self.fault_code = http_code
+    self.fault_message = http_message
+    self.fault_frequency = fault_frequency
+    assert fault_frequency > 0 and fault_frequency <= 1
+    self.num_requests = 0
+
+  def disable_fault(self):
+    self.fault_enabled = False
+
+  def _check_code(self):
+    if self.code >= 300:
+      # Report any http response code that is not 1XX (informational response) or
+      # 2XX (successful).
+      raise Exception("HTTP code {}: {}".format(self.code, self.message))
+
+  def _inject_fault(self):
+    if not self.fault_enabled:
+      return False
+    if self.fault_frequency == 1:
+      return True
+    if round(self.num_requests % (1 / self.fault_frequency)) == 1:
+      return True
+    return False
+
+  def flush(self):
+    ImpalaHttpClient.flush(self)
+    self.num_requests += 1
+    # Override code and message with the injected fault
+    if self.fault_code is not None and self._inject_fault():
+      self.code = self.fault_code
+      self.message = self.fault_message
+      self._check_code()
+
+
+class FaultInjectingImpalaHS2Client(ImpalaHS2Client):
+  """Fault injecting ImpalaHS2Client class using FaultInjectingHttpClient as the
+  transport"""
+  def __init__(self, *args, **kwargs):
+    """Creates a transport with HTTP as the base."""
+    super(FaultInjectingImpalaHS2Client, self).__init__(*args, **kwargs)
+    assert self.use_http_base_transport
+    host_and_port = "{0}:{1}".format(self.impalad_host, self.impalad_port)
+    assert self.http_path
+    transport = FaultInjectingHttpClient("http://{0}/{1}".
+        format(host_and_port, self.http_path))
+    transport.open()
+    self.transport = transport
+
+  def _get_http_transport(self, connect_timeout_ms):
+    self.transport.open()
+    return self.transport
+
+  def ping(self):
+    return self._ping_impala_service()
+
+
+class TestHS2FaultInjection(CustomClusterTestSuite):
+  """Class for testing the http fault injection in various rpcs used by the
+  impala-shell client"""
+  def setup(self):
+    impalad = IMPALAD_HS2_HTTP_HOST_PORT.split(":")
+    self.custom_hs2_http_client = FaultInjectingImpalaHS2Client(impalad,
+        kerberos_host_fqdn=None, use_http_base_transport=True, http_path='cliservice')
+    self.transport = self.custom_hs2_http_client.transport
+
+  def teardown(self):
+    self.transport.disable_fault()
+    self.custom_hs2_http_client.close_connection()
+
+  def connect(self):
+    self.custom_hs2_http_client.connect()
+    assert self.custom_hs2_http_client.connected
+
+  def close_query(self, query_handle):
+    self.transport.disable_fault()
+    self.custom_hs2_http_client.close_query(query_handle)
+
+  def __expect_msg_retry(self, impala_rpc_name):
+    """Returns expected log message for rpcs which can be retried"""
+    return ("Caught exception HTTP code 502: Injected Fault, "
+      "type=<type 'exceptions.Exception'> in {0}. "
+      "Num remaining tries: 3".format(impala_rpc_name))
+
+  def __expect_msg_no_retry(self, impala_rpc_name):
+    """Returns expected log message for rpcs which can not be retried"""
+    return ("Caught exception HTTP code 502: Injected Fault, "
+      "type=<type 'exceptions.Exception'> in {0}. ".format(impala_rpc_name))
+
+  @pytest.mark.execute_serially
+  def test_connect(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's connect().
+    OpenSession and CloseImpalaOperation rpcs fail.
+    Retries results in a successfull connection."""
+    self.transport.enable_fault(502, "Injected Fault", 0.20)
+    self.connect()
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_retry("OpenSession")
+    assert output[2] == self.__expect_msg_retry("CloseImpalaOperation")
+
+  @pytest.mark.execute_serially
+  def test_close_connection(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's close_connection().
+    CloseSession rpc fails due to the fault, but succeeds anyways since exceptions
+    are ignored."""
+    self.connect()
+    self.transport.enable_fault(502, "Injected Fault", 0.50)
+    self.custom_hs2_http_client.close_connection()
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_no_retry("CloseSession")
+
+  @pytest.mark.execute_serially
+  def test_ping(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's _ping_impala_service().
+    PingImpalaHS2Service rpc fails due to the fault, but suceeds after retry"""
+    self.connect()
+    self.transport.enable_fault(502, "Injected Fault", 0.50)
+    version, webserver_address = None, None
+    version, webserver_address = self.custom_hs2_http_client.ping()
+    assert version is not None
+    assert webserver_address is not None
+    page = requests.get('{0}/{1}'.format(webserver_address, 'healthz'))
+    assert page.status_code == requests.codes.ok
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_retry("PingImpalaHS2Service")
+
+  @pytest.mark.execute_serially
+  def test_execute_query(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's execute_query().
+    ExecuteStatement rpc fails and results in error since retries are not supported."""
+    self.connect()
+    self.transport.enable_fault(502, "Injected Fault", 0.50)
+    query_handle = None
+    try:
+      query_handle = self.custom_hs2_http_client.execute_query('select 1', {})
+    except Exception, e:
+      assert str(e) == 'HTTP code 502: Injected Fault'
+    assert query_handle is None
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_no_retry("ExecuteStatement")
+
+  @pytest.mark.execute_serially
+  def test_fetch(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's fetch().
+    FetchResults rpc fails and results in error since retries are not supported."""
+    self.connect()
+    query_handle = self.custom_hs2_http_client.execute_query('select 1', {})
+    rows_fetched = None
+    self.transport.enable_fault(502, "Injected Fault", 0.50)
+    num_rows = None
+    rows_fetched = self.custom_hs2_http_client.fetch(query_handle)
+    try:
+      for rows in rows_fetched:
+        num_rows += 1
+    except Exception, e:
+      assert str(e) == 'HTTP code 502: Injected Fault'
+    assert num_rows is None
+    self.close_query(query_handle)
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_no_retry("FetchResults")
+
+  @pytest.mark.execute_serially
+  def test_close_dml(self, unique_database, capsys):
+    """Tests fault injection in ImpalaHS2Client's close_dml().
+    CloseImpalaOperation rpc fails and results in error since retries are not
+    supported."""
+    table = '{0}.{1}'.format(unique_database, 'tbl')
+    self.client.execute('create table {0} (c1 int, c2 int)'.format(table))
+    self.connect()
+    dml = 'insert into {0} values (1,1)'.format(table)
+    query_handle = self.custom_hs2_http_client.execute_query(dml, {})
+    self.custom_hs2_http_client.wait_to_finish(query_handle)
+    self.transport.enable_fault(502, "Injected Fault", 0.50)
+    (num_rows, num_row_errors) = None, None
+    try:
+      (num_rows, num_row_errors) = self.custom_hs2_http_client.close_dml(query_handle)
+    except Exception, e:
+      assert str(e) == 'HTTP code 502: Injected Fault'
+    assert num_rows is None
+    assert num_row_errors is None
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_no_retry("CloseImpalaOperation")
+
+  @pytest.mark.execute_serially
+  def test_close_query(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's close_query().
+    CloseImpalaOperation rpc fails due to fault, but succeeds after a retry"""
+    self.connect()
+    query_handle = self.custom_hs2_http_client.execute_query('select 1', {})
+    self.custom_hs2_http_client.wait_to_finish(query_handle)
+    self.transport.enable_fault(502, "Injected Fault", 0.50)
+    self.custom_hs2_http_client.close_query(query_handle)
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_retry("CloseImpalaOperation")
+
+  @pytest.mark.execute_serially
+  def test_cancel_query(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's cancel_query().
+    CancelOperation rpc fails due to fault, but succeeds after a retry"""
+    self.connect()
+    query_handle = self.custom_hs2_http_client.execute_query('select sleep(50000)', {})
+    sleep(5)
+    self.transport.enable_fault(502, "Injected Fault", 0.50)
+    success = self.custom_hs2_http_client.cancel_query(query_handle)
+    assert success
+    self.close_query(query_handle)
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_retry("CancelOperation")
+
+  @pytest.mark.execute_serially
+  def test_get_query_state(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's get_query_state().
+    GetOperationStatus rpc fails due to fault, but succeeds after a retry"""
+    self.connect()
+    query_handle = self.custom_hs2_http_client.execute_query('select 1', {})
+    self.transport.enable_fault(502, "Injected Fault", 0.50)
+    self.custom_hs2_http_client.wait_to_finish(query_handle)
+    query_state = self.custom_hs2_http_client.get_query_state(query_handle)
+    assert query_state == self.custom_hs2_http_client.FINISHED_STATE
+    self.close_query(query_handle)
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_retry("GetOperationStatus")
+
+  @pytest.mark.execute_serially
+  def test_get_runtime_profile_summary(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's get_runtime_profile().
+    GetRuntimeProfile and GetExecSummary rpc fails due to fault, but succeeds
+    after a retry"""
+    self.connect()
+    query_handle = self.custom_hs2_http_client.execute_query('select 1', {})
+    self.custom_hs2_http_client.wait_to_finish(query_handle)
+    self.custom_hs2_http_client.close_query(query_handle)
+    self.transport.enable_fault(502, "Injected Fault", 0.50)
+    profile = self.custom_hs2_http_client.get_runtime_profile(query_handle)
+    assert profile is not None
+    summary = self.custom_hs2_http_client.get_summary(query_handle)
+    assert summary is not None
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_retry("GetRuntimeProfile")
+    assert output[2] == self.__expect_msg_retry("GetExecSummary")
+
+  @pytest.mark.execute_serially
+  def test_get_warning_log(self, capsys):
+    """Tests fault injection in ImpalaHS2Client's get_warning_log().
+    GetLog rpc fails due to fault, but succeeds after a retry"""
+    self.connect()
+    sql = ("select * from functional.alltypes a inner join /* +foo */ "
+        "functional.alltypes b on a.id=b.id")
+    query_handle = self.custom_hs2_http_client.execute_query(sql, {})
+    self.custom_hs2_http_client.wait_to_finish(query_handle)
+    self.transport.enable_fault(502, "Injected Fault", 0.50)
+    warning_log = self.custom_hs2_http_client.get_warning_log(query_handle)
+    assert warning_log == 'WARNINGS: JOIN hint not recognized: foo'
+    self.close_query(query_handle)
+    output = capsys.readouterr()[1].splitlines()
+    assert output[1] == self.__expect_msg_retry("GetLog")
+
+  @pytest.mark.execute_serially
+  def test_connection_drop(self):
+    """Tests connection drop between rpcs. Each rpc starts a new connection so dropping
+    connections between rpcs should have no effect"""
+    self.connect()
+    self.transport.close()
+    query_handle = self.custom_hs2_http_client.execute_query('select 1', {})
+    self.transport.close()
+    self.custom_hs2_http_client.wait_to_finish(query_handle)
+    self.transport.close()
+    query_state = self.custom_hs2_http_client.get_query_state(query_handle)
+    assert query_state == self.custom_hs2_http_client.FINISHED_STATE
+    num_rows = 0
+    self.transport.close()
+    rows_fetched = self.custom_hs2_http_client.fetch(query_handle)
+    for rows in rows_fetched:
+      num_rows += 1
+    assert num_rows == 1
+    self.transport.close()
+    self.close_query(query_handle)
+    self.transport.close()
+    profile = self.custom_hs2_http_client.get_runtime_profile(query_handle)
+    assert profile is not None
+    self.transport.close()
+    summary = self.custom_hs2_http_client.get_summary(query_handle)
+    assert summary is not None


[impala] 02/02: IMPALA-9401: primitive include-what-you-use script and mappings

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2f53783b14ddf215a4c96bc52a511d3a83896128
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Wed Feb 26 19:46:18 2020 -0800

    IMPALA-9401: primitive include-what-you-use script and mappings
    
    This is a cleaned up version of the script I used to run
    include-what-you-use on the Impala codebase.
    
    The helper script assumes you have built IWYU and
    have a Kudu source checkout, and can then run IWYU
    on the entire codebase.
    
    Some mappings files are used to improve the quality of
    the IWYU output. There are still incorrect recommendations
    made, but this is sufficient to fix many common issues.
    
    Change-Id: Iaf5f9ba865313afb0c581e6482514ef7f1c65367
    Reviewed-on: http://gerrit.cloudera.org:8080/15552
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 bin/iwyu/iwyu.sh           | 74 ++++++++++++++++++++++++++++++++++++++++++++++
 bin/iwyu/iwyu_mappings.imp | 43 +++++++++++++++++++++++++++
 2 files changed, 117 insertions(+)

diff --git a/bin/iwyu/iwyu.sh b/bin/iwyu/iwyu.sh
new file mode 100755
index 0000000..f5cd4ea
--- /dev/null
+++ b/bin/iwyu/iwyu.sh
@@ -0,0 +1,74 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -euo pipefail
+
+IMPALA_HOME=$(cd $(dirname "$0")/../.. && pwd)
+
+usage() {
+  echo "Usage: iwyu.sh"
+  echo "This will run include-what-you-use on the Impala codebase."
+  echo "Results are printed to stdout"
+  echo "The following environment variables must be set:"
+  echo "  KUDU_SOURCE: the root directory of a Kudu source tree"
+  echo "  IWYU_BUILD_DIR: the root directory of a IWYU build tree"
+  echo "  IWYU_SOURCE: the root directory of a IWYU source tree"
+  echo ""
+  echo "See IMPALA-9371 for one way to build IWYU for toolchain clang."
+  echo ""
+  echo "Example Invocation:"
+  echo "    KUDU_SOURCE=~/kudu IWYU_SOURCE=~/include-what-you-use \\"
+  echo "    IWYU_BUILD_DIR=~/include-what-you-use/build \\"
+  echo "    \$IMPALA_HOME/bin/iwyu/iwyu.sh"
+}
+
+if [[ ! -v KUDU_SOURCE || ! -d "$KUDU_SOURCE" ]]; then
+  echo "KUDU_SOURCE must be set to a Kudu source directory"
+  usage
+  exit 1
+fi
+
+if [[ ! -v IWYU_BUILD_DIR || ! -d "$IWYU_BUILD_DIR" ]]; then
+  echo "IWYU_BUILD_DIR must be set to a IWYU build directory"
+  usage
+  exit 1
+fi
+
+if [[ ! -v IWYU_SOURCE || ! -d "$IWYU_SOURCE" ]]; then
+  echo "IWYU_SOURCE must be set to a IWYU source directory"
+  usage
+  exit 1
+fi
+
+if [[ ! -f "$IMPALA_HOME/compile_commands.json" ]]; then
+  echo "$IMPALA_HOME/compile_commands.json is required for IWYU."
+  echo "Please run buildall.sh (or CMake directly) to generate it"
+  exit 1
+fi
+
+
+IWYU_ARGS="--mapping_file=iwyu_mappings.imp"
+
+# Make use of Kudu's pre-existing mappings files that are relevant.
+# TODO: consider importing into Impala codebase.
+for FILE in gflags.imp gtest.imp kudu.imp libstdcpp.imp libunwind.imp system-linux.imp; do
+  IWYU_ARGS+=" --mapping_file=$KUDU_SOURCE/build-support/iwyu/mappings/${FILE}"
+done
+
+cd "$IMPALA_HOME"
+PATH=$IWYU_BUILD_DIR:$PATH $IWYU_SOURCE/iwyu_tool.py -p . -j $(nproc) -- $IWYU_ARGS
diff --git a/bin/iwyu/iwyu_mappings.imp b/bin/iwyu/iwyu_mappings.imp
new file mode 100644
index 0000000..e44d958
--- /dev/null
+++ b/bin/iwyu/iwyu_mappings.imp
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Custom IWYU rules for the Impala codebase to reflect the codebase's preferred headers
+# and to work around any quirks of IWYU's recommendations.
+[
+    { include: ["<boost/smart_ptr/scoped_ptr.hpp>", private, "<boost/scoped_ptr.hpp>", public ] },
+    { include: ["<boost/smart_ptr/shared_ptr.hpp>", private, "<boost/shared_ptr.hpp>", public ] },
+    { include: ["<boost/unordered/unordered_set.hpp>", private, "<boost/unordered_set.hpp>", public ] },
+    { include: ["<boost/unordered/unordered_map.hpp>", private, "<boost/unordered_map.hpp>", public ] },
+    { include: ["<boost/bind/bind.hpp>", private, "<boost/bind.hpp>", public ] },
+    { include: ["<boost/bind/placeholders.hpp>", private, "<boost/bind.hpp>", public ] },
+    { include: ["<bits/stdint-intn.h>", private, "<cstdint>", public ] },
+    { include: ["<bits/stdint-uintn.h>", private, "<cstdint>", public ] },
+    { include: ["<glog/logging.h>", private, "\"common/logging.h\"", public ] },
+    { include: ["<hdfs.h>", private, "\"common/hdfs.h\"", public ] },
+    { symbol: ["string", private, "<string>", public] },
+    { symbol: ["ostream", private, "<ostream>", public] },
+    { symbol: [ "LOG", private, "\"common/logging.h\"", public ] },
+    { symbol: [ "VLOG", private, "\"common/logging.h\"", public ] },
+    { symbol: [ "CHECK_EQ", private, "\"common/logging.h\"", public ] },
+    { symbol: [ "CHECK_NE", private, "\"common/logging.h\"", public ] },
+    { symbol: [ "CHECK_LT", private, "\"common/logging.h\"", public ] },
+    { symbol: [ "CHECK_GE", private, "\"common/logging.h\"", public ] },
+    { symbol: [ "CHECK_GT", private, "\"common/logging.h\"", public ] },
+    { symbol: [ "ErrnoLogMessage", private, "\"common/logging.h\"", public ] },
+    { symbol: [ "COMPACT_GOOGLE_LOG_0", private, "\"common/logging.h\"", public ] }
+]
+