You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/10/24 13:07:55 UTC

[impala] 01/03: IMPALA-11674: Fix timeout detection for TSSLSocket

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f917dc111c65dfe7142361660e877d20c08f875e
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Wed Oct 19 16:16:02 2022 +0700

    IMPALA-11674: Fix timeout detection for TSSLSocket
    
    Functions IsPeekTimeoutTException() and IsReadTimeoutTException() in
    be/src/rpc/thrift-util.cc make assumption about the implementation of
    read(), peek(), write() and write_partial() in TSocket.cpp and
    TSSLSocket.cpp. The functions read() and peek() in TSSLSocket.cpp were
    changed in version 0.11.0 and 0.16.0 to throw different exception for
    timeout. This cause IsPeekTimeoutTException() and
    IsReadTimeoutTException() to return wrong value after upgrade thrift,
    which in turn cause TAcceptQueueServer::Peek() to rethrow the exception
    to caller TAcceptQueueServer::run() and make TAcceptQueueServer::run()
    to close the connection, ignoring idle_session_timeout query option.
    
    The issue was reproducible through the following scenario:
    
    1. From the local development environment, start the impala cluster with
    SSL enabled and idle_client_poll_period_s equals 5 seconds.
    
    export CERT_DIR="$IMPALA_HOME/be/src/testutil"
    export SSL_ARGS="--ssl_client_ca_certificate=$CERT_DIR/server-cert.pem
      --ssl_server_certificate=$CERT_DIR/server-cert.pem
      --ssl_private_key=$CERT_DIR/server-key.pem
      --hostname=localhost"
    ./bin/start-impala-cluster.py --state_store_args="$SSL_ARGS" \
      --catalogd_args="$SSL_ARGS" \
      --impalad_args="$SSL_ARGS --idle_client_poll_period_s=5"
    
    2. Run impala-shell with a higher idle_session_timeout query option
    
    impala-shell.sh --ssl -Q idle_session_timeout=100
    
    3. Run a simple query like "show databases" and rerun it after 15
       seconds pass.
    
    The second query run will fail with the following error message in impala-shell:
    [localhost:21050] default> show databases;
    Caught exception TLS/SSL connection has been closed (EOF) (_ssl.c:1829), type=<class 'ssl.SSLZeroReturnError'> in CloseSession.
    Warning: close session RPC failed: TLS/SSL connection has been closed (EOF) (_ssl.c:1829), <class 'ssl.SSLZeroReturnError'>
    
    This patch fix the expected error message in IsReadTimeoutTException and
    IsPeekTimeoutTException to correctly detect timeout error from
    TSSLSocket. Additionally, this patch also fix typo in
    NEW_THRIFT_VERSION_MSG.
    
    Testing:
    - Redo the scenario manually, with and without SSL, and confirm that
      the second query complete without error.
    - Add test_thrift_socket.py to begin verifying IsPeekTimeoutTException
      function.
    
    Change-Id: I6ad168a1c96d751a3c50d924e6ecaf6404e589ab
    Reviewed-on: http://gerrit.cloudera.org:8080/19157
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Zoltan Borok-Nagy <bo...@cloudera.com>
---
 be/src/rpc/TAcceptQueueServer.cpp          |   3 +
 be/src/rpc/thrift-util.cc                  |  18 ++---
 tests/custom_cluster/test_thrift_socket.py | 104 +++++++++++++++++++++++++++++
 3 files changed, 116 insertions(+), 9 deletions(-)

diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index ce21da936..45b5c0374 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -155,6 +155,9 @@ class TAcceptQueueServer::Task : public Runnable {
         // read() or peek() of the socket.
         if (eventHandler != nullptr && server_.idle_poll_period_ms_ > 0 &&
             (IsReadTimeoutTException(ttx) || IsPeekTimeoutTException(ttx))) {
+          VLOG(2) << Substitute("Socket read or peek timeout encountered "
+                                "(idle_poll_period_ms_=$0). $1",
+              server_.idle_poll_period_ms_, ttx.what());
           ThriftServer::ThriftServerEventProcessor* thriftServerHandler =
               static_cast<ThriftServer::ThriftServerEventProcessor*>(eventHandler.get());
           if (thriftServerHandler->IsIdleContext(connectionContext)) {
diff --git a/be/src/rpc/thrift-util.cc b/be/src/rpc/thrift-util.cc
index bef44402d..55fbb43b6 100644
--- a/be/src/rpc/thrift-util.cc
+++ b/be/src/rpc/thrift-util.cc
@@ -65,7 +65,7 @@ using namespace apache::thrift::concurrency;
 // TSocket.cpp and TSSLSocket.cpp. Those functions may change between different versions
 // of Thrift.
 #define NEW_THRIFT_VERSION_MSG \
-    "Thrift 0.11.0 is expected. Please check Thrift error codes during Thrift upgrade."
+  "Thrift 0.16.0 is expected. Please check Thrift error codes during Thrift upgrade."
 static_assert(PACKAGE_VERSION[0] == '0', NEW_THRIFT_VERSION_MSG);
 static_assert(PACKAGE_VERSION[1] == '.', NEW_THRIFT_VERSION_MSG);
 static_assert(PACKAGE_VERSION[2] == '1', NEW_THRIFT_VERSION_MSG);
@@ -253,18 +253,18 @@ bool TNetworkAddressComparator(const TNetworkAddress& a, const TNetworkAddress&
 
 bool IsReadTimeoutTException(const TTransportException& e) {
   // String taken from TSocket::read() Thrift's TSocket.cpp and TSSLSocket.cpp.
-  return (e.getType() == TTransportException::TIMED_OUT &&
-             strstr(e.what(), "EAGAIN (timed out)") != nullptr) ||
-         (e.getType() == TTransportException::INTERNAL_ERROR &&
-             strstr(e.what(), "SSL_read: Resource temporarily unavailable") != nullptr);
+  // Specifically, "THRIFT_EAGAIN (timed out)" from TSocket.cpp,
+  // and "THRIFT_POLL (timed out)" from TSSLSocket.cpp.
+  return (e.getType() == TTransportException::TIMED_OUT
+      && strstr(e.what(), "(timed out)") != nullptr);
 }
 
 bool IsPeekTimeoutTException(const TTransportException& e) {
   // String taken from TSocket::peek() Thrift's TSocket.cpp and TSSLSocket.cpp.
-  return (e.getType() == TTransportException::UNKNOWN &&
-             strstr(e.what(), "recv(): Resource temporarily unavailable") != nullptr) ||
-         (e.getType() == TTransportException::INTERNAL_ERROR &&
-             strstr(e.what(), "SSL_peek: Resource temporarily unavailable") != nullptr);
+  return (e.getType() == TTransportException::UNKNOWN
+             && strstr(e.what(), "recv(): Resource temporarily unavailable") != nullptr)
+      || (e.getType() == TTransportException::TIMED_OUT
+             && strstr(e.what(), "THRIFT_POLL (timed out)") != nullptr);
 }
 
 bool IsConnResetTException(const TTransportException& e) {
diff --git a/tests/custom_cluster/test_thrift_socket.py b/tests/custom_cluster/test_thrift_socket.py
new file mode 100644
index 000000000..42d5fa142
--- /dev/null
+++ b/tests/custom_cluster/test_thrift_socket.py
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pytest
+import ssl
+import sys
+import time
+
+from tests.common.environ import IS_REDHAT_DERIVATIVE
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.test_vector import ImpalaTestVector
+from tests.common.test_dimensions import create_client_protocol_dimension
+from tests.shell.util import ImpalaShell
+
+REQUIRED_MIN_OPENSSL_VERSION = 0x10001000L
+# Python supports TLSv1.2 from 2.7.9 officially but on Red Hat/CentOS Python2.7.5
+# with newer python-libs (eg python-libs-2.7.5-77) supports TLSv1.2 already
+if IS_REDHAT_DERIVATIVE:
+  REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 5)
+else:
+  REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12 = (2, 7, 9)
+_openssl_version_number = getattr(ssl, "OPENSSL_VERSION_NUMBER", None)
+if _openssl_version_number is None:
+  SKIP_SSL_MSG = "Legacy OpenSSL module detected"
+elif _openssl_version_number < REQUIRED_MIN_OPENSSL_VERSION:
+  SKIP_SSL_MSG = "Only have OpenSSL version %X, but test requires %X" % (
+    ssl.OPENSSL_VERSION_NUMBER, REQUIRED_MIN_OPENSSL_VERSION)
+else:
+  SKIP_SSL_MSG = None
+CERT_DIR = "%s/be/src/testutil" % os.environ['IMPALA_HOME']
+
+SSL_ARGS = ("--ssl_client_ca_certificate=%s/server-cert.pem "
+            "--ssl_server_certificate=%s/server-cert.pem "
+            "--ssl_private_key=%s/server-key.pem "
+            "--hostname=localhost "  # Required to match hostname in certificate
+            % (CERT_DIR, CERT_DIR, CERT_DIR))
+
+IDLE_ARGS = " --idle_client_poll_period_s=3 -v=2"
+
+
+class TestThriftSocket(CustomClusterTestSuite):
+  """ Check if thrift timeout errors are detected properly """
+
+  @classmethod
+  def setup_class(cls):
+    if sys.version_info < REQUIRED_MIN_PYTHON_VERSION_FOR_TLSV12:
+      pytest.skip("Python version does not support tls 1.2")
+    super(TestThriftSocket, cls).setup_class()
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args=IDLE_ARGS, cluster_size=1)
+  def test_peek_timeout_no_ssl(self):
+    # Iterate over test vector within test function to avoid restarting cluster.
+    for protocol_dim in create_client_protocol_dimension():
+      for vector in [ImpalaTestVector([protocol_dim])]:
+        shell_args = ["-Q", "idle_session_timeout=1800"]
+        self._run_idle_shell(vector, shell_args, 6)
+    self.assert_impalad_log_contains('INFO',
+        r'Socket read or peek timeout encountered.*THRIFT_EAGAIN \(timed out\)',
+        expected_count=-1)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(statestored_args=SSL_ARGS,
+                                    catalogd_args=SSL_ARGS,
+                                    impalad_args=(SSL_ARGS + IDLE_ARGS),
+                                    cluster_size=1)
+  def test_peek_timeout_ssl(self):
+    # Iterate over test vector within test function to avoid restarting cluster.
+    for protocol_dim in create_client_protocol_dimension():
+      for vector in [ImpalaTestVector([protocol_dim])]:
+        shell_args = ["-Q", "idle_session_timeout=1800", "--ssl"]
+        self._run_idle_shell(vector, shell_args, 6)
+    self.assert_impalad_log_contains('INFO',
+        r'Socket read or peek timeout encountered.*THRIFT_POLL \(timed out\)',
+        expected_count=-1)
+
+  def _run_idle_shell(self, vector, args, idle_time):
+    p = ImpalaShell(vector, args)
+    p.send_cmd("USE functional")
+    p.send_cmd("SHOW TABLES")
+    time.sleep(idle_time)
+    p.send_cmd("SHOW TABLES")
+
+    result = p.get_result()
+    assert "alltypesaggmultifilesnopart" in result.stdout, result.stdout