You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/06 16:39:13 UTC

[impala] 01/02: IMPALA-8832: Fix HTTP client protocol to work with Apache Knox

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bcedd1572ee69bf5f5551af08f8fcb0ae0c48aea
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Thu Aug 1 04:29:52 2019 +0000

    IMPALA-8832: Fix HTTP client protocol to work with Apache Knox
    
    This patch fixes two bugs with Impala's HTTP client protocol:
    - THttpServer transports are no longer wrapped in TBufferedTransports.
      THttpServer already has its own support for buffering to process one
      HTTP request at a time, and wrapping it in a TBufferedTransport
      interferes with this, in some cases causing client requests to
      either not be processed or to recieve multiple responses.
    - Fixes a bug in THttpTransport where when a chunked HTTP request is
      finished being processed, the 'readHeaders_' variable is never reset
      and further requests over the connection are not processed.
    
    Testing:
    - Tested by proxying beeline connections to Impala through Apache Knox
    
    Change-Id: I5c9d934a654a9e6aaf9207fa5856f956baaacf55
    Reviewed-on: http://gerrit.cloudera.org:8080/14008
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/auth-provider.h          | 24 +++++++++--------
 be/src/rpc/authentication.cc        | 52 +++++++++++++++++++++----------------
 be/src/rpc/thrift-server.cc         | 26 ++-----------------
 be/src/transport/THttpTransport.cpp |  3 +++
 4 files changed, 47 insertions(+), 58 deletions(-)

diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index 095ecac..3b5dc29 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -40,10 +40,9 @@ class AuthProvider {
   virtual Status Start() WARN_UNUSED_RESULT = 0;
 
   /// Creates a new Thrift transport factory in the out parameter that performs
-  /// authorisation per this provider's protocol. The top-level transport returned by
-  /// 'factory' must always be a TBufferedTransport, but depending on the AuthProvider
-  /// implementation and the value of 'underlying_transport_type', that may be wrapped
-  /// around another transport type, eg. a TSaslServerTransport.
+  /// authorisation per this provider's protocol. The type of the transport returned is
+  /// determined by 'underlying_transport_type' and there may be multiple levels of
+  /// wrapped transports, eg. a TBufferedTransport around a TSaslServerTransport.
   virtual Status GetServerTransportFactory(
       ThriftServer::TransportType underlying_transport_type,
       const std::string& server_name, MetricGroup* metrics,
@@ -60,12 +59,15 @@ class AuthProvider {
       boost::shared_ptr<apache::thrift::transport::TTransport>* wrapped_transport)
       WARN_UNUSED_RESULT = 0;
 
-  /// Setup 'connection_ptr' to get its username with the given transports.
+  /// Setup 'connection_ptr' to get its username with the given transports and sets
+  /// 'network_address' based on the underlying socket. The transports should be generated
+  /// by the factory returned by GetServerTransportFactory() when called with the same
+  /// 'underlying_transport_type'.
   virtual void SetupConnectionContext(
       const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
       ThriftServer::TransportType underlying_transport_type,
-      apache::thrift::transport::TTransport* underlying_input_transport,
-      apache::thrift::transport::TTransport* underlying_output_transport) = 0;
+      apache::thrift::transport::TTransport* input_transport,
+      apache::thrift::transport::TTransport* output_transport) = 0;
 
   /// Returns true if this provider uses Sasl at the transport layer.
   virtual bool is_secure() = 0;
@@ -113,8 +115,8 @@ class SecureAuthProvider : public AuthProvider {
   virtual void SetupConnectionContext(
       const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
       ThriftServer::TransportType underlying_transport_type,
-      apache::thrift::transport::TTransport* underlying_input_transport,
-      apache::thrift::transport::TTransport* underlying_output_transport);
+      apache::thrift::transport::TTransport* input_transport,
+      apache::thrift::transport::TTransport* output_transport);
 
   virtual bool is_secure() { return true; }
 
@@ -193,8 +195,8 @@ class NoAuthProvider : public AuthProvider {
   virtual void SetupConnectionContext(
       const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
       ThriftServer::TransportType underlying_transport_type,
-      apache::thrift::transport::TTransport* underlying_input_transport,
-      apache::thrift::transport::TTransport* underlying_output_transport);
+      apache::thrift::transport::TTransport* input_transport,
+      apache::thrift::transport::TTransport* output_transport);
 
   virtual bool is_secure() { return false; }
 };
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index a7c04ee..1d253d8 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -954,9 +954,8 @@ Status SecureAuthProvider::GetServerTransportFactory(
 
   if (underlying_transport_type == ThriftServer::HTTP) {
     bool has_kerberos = !principal_.empty();
-    factory->reset(new ThriftServer::BufferedTransportFactory(
-        ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
-        new THttpServerTransportFactory(server_name, metrics, has_ldap_, has_kerberos)));
+    factory->reset(
+        new THttpServerTransportFactory(server_name, metrics, has_ldap_, has_kerberos));
     return Status::OK();
   }
 
@@ -1027,22 +1026,23 @@ Status SecureAuthProvider::WrapClientTransport(const string& hostname,
 
 void SecureAuthProvider::SetupConnectionContext(
     const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
-    ThriftServer::TransportType underlying_transport_type,
-    TTransport* underlying_input_transport, TTransport* underlying_output_transport) {
+    ThriftServer::TransportType underlying_transport_type, TTransport* input_transport,
+    TTransport* output_transport) {
+  TSocket* socket = nullptr;
   switch (underlying_transport_type) {
     case ThriftServer::BINARY: {
-      TSaslServerTransport* sasl_transport =
-          down_cast<TSaslServerTransport*>(underlying_input_transport);
-
+      TBufferedTransport* buffered_transport =
+          down_cast<TBufferedTransport*>(input_transport);
+      TSaslServerTransport* sasl_transport = down_cast<TSaslServerTransport*>(
+          buffered_transport->getUnderlyingTransport().get());
+      socket = down_cast<TSocket*>(sasl_transport->getUnderlyingTransport().get());
       // Get the username from the transport.
       connection_ptr->username = sasl_transport->getUsername();
       break;
     }
     case ThriftServer::HTTP: {
-      THttpServer* http_input_transport =
-          down_cast<THttpServer*>(underlying_input_transport);
-      THttpServer* http_output_transport =
-          down_cast<THttpServer*>(underlying_output_transport);
+      THttpServer* http_input_transport = down_cast<THttpServer*>(input_transport);
+      THttpServer* http_output_transport = down_cast<THttpServer*>(output_transport);
       THttpServer::HttpCallbacks callbacks;
       callbacks.path_fn = std::bind(
           HttpPathFn, connection_ptr.get(), std::placeholders::_1, std::placeholders::_2);
@@ -1057,11 +1057,14 @@ void SecureAuthProvider::SetupConnectionContext(
       }
       http_input_transport->setCallbacks(callbacks);
       http_output_transport->setCallbacks(callbacks);
+      socket = down_cast<TSocket*>(http_input_transport->getUnderlyingTransport().get());
       break;
     }
     default:
       LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
   }
+  connection_ptr->network_address =
+      MakeNetworkAddress(socket->getPeerAddress(), socket->getPeerPort());
 }
 
 Status NoAuthProvider::GetServerTransportFactory(
@@ -1073,9 +1076,7 @@ Status NoAuthProvider::GetServerTransportFactory(
       factory->reset(new ThriftServer::BufferedTransportFactory());
       break;
     case ThriftServer::HTTP:
-      factory->reset(new ThriftServer::BufferedTransportFactory(
-          ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
-          new THttpServerTransportFactory()));
+      factory->reset(new THttpServerTransportFactory());
       break;
     default:
       LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
@@ -1093,18 +1094,20 @@ Status NoAuthProvider::WrapClientTransport(const string& hostname,
 
 void NoAuthProvider::SetupConnectionContext(
     const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
-    ThriftServer::TransportType underlying_transport_type,
-    TTransport* underlying_input_transport, TTransport* underlying_output_transport) {
+    ThriftServer::TransportType underlying_transport_type, TTransport* input_transport,
+    TTransport* output_transport) {
   connection_ptr->username = "";
+  TSocket* socket = nullptr;
   switch (underlying_transport_type) {
-    case ThriftServer::BINARY:
-      // Intentionally blank - since there's no security, there's nothing to set up here.
+    case ThriftServer::BINARY: {
+      TBufferedTransport* buffered_transport =
+          down_cast<TBufferedTransport*>(input_transport);
+      socket = down_cast<TSocket*>(buffered_transport->getUnderlyingTransport().get());
       break;
+    }
     case ThriftServer::HTTP: {
-      THttpServer* http_input_transport =
-          down_cast<THttpServer*>(underlying_input_transport);
-      THttpServer* http_output_transport =
-          down_cast<THttpServer*>(underlying_input_transport);
+      THttpServer* http_input_transport = down_cast<THttpServer*>(input_transport);
+      THttpServer* http_output_transport = down_cast<THttpServer*>(input_transport);
       THttpServer::HttpCallbacks callbacks;
       // Even though there's no security, we set up some callbacks, eg. to allow
       // impersonation over unsecured connections for testing purposes.
@@ -1113,11 +1116,14 @@ void NoAuthProvider::SetupConnectionContext(
       callbacks.return_headers_fn = std::bind(ReturnHeaders, connection_ptr.get());
       http_input_transport->setCallbacks(callbacks);
       http_output_transport->setCallbacks(callbacks);
+      socket = down_cast<TSocket*>(http_input_transport->getUnderlyingTransport().get());
       break;
     }
     default:
       LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
   }
+  connection_ptr->network_address =
+      MakeNetworkAddress(socket->getPeerAddress(), socket->getPeerPort());
 }
 
 Status AuthManager::Init() {
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index fe475a1..d4c8c86 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -201,36 +201,14 @@ const ThriftServer::ConnectionContext* ThriftServer::GetThreadConnectionContext(
 
 void* ThriftServer::ThriftServerEventProcessor::createContext(
     boost::shared_ptr<TProtocol> input, boost::shared_ptr<TProtocol> output) {
-  TSocket* socket = NULL;
-  TTransport* transport = input->getTransport().get();
   boost::shared_ptr<ConnectionContext> connection_ptr =
       boost::shared_ptr<ConnectionContext>(new ConnectionContext);
-  TTransport* underlying_input_transport =
-      (static_cast<TBufferedTransport*>(transport))->getUnderlyingTransport().get();
-  TTransport* underlying_output_transport =
-      (static_cast<TBufferedTransport*>(output->getTransport().get()))
-          ->getUnderlyingTransport()
-          .get();
-
   thrift_server_->auth_provider_->SetupConnectionContext(connection_ptr,
-      thrift_server_->transport_type_, underlying_input_transport,
-      underlying_output_transport);
-  if (thrift_server_->auth_provider_->is_secure()
-      && thrift_server_->transport_type_ == ThriftServer::BINARY) {
-    TSaslServerTransport* sasl_transport =
-        static_cast<TSaslServerTransport*>(underlying_input_transport);
-    socket = static_cast<TSocket*>(sasl_transport->getUnderlyingTransport().get());
-  } else if (thrift_server_->transport_type_ == ThriftServer::HTTP) {
-    THttpServer* http_transport = static_cast<THttpServer*>(underlying_input_transport);
-    socket = static_cast<TSocket*>(http_transport->getUnderlyingTransport().get());
-  } else {
-    socket = static_cast<TSocket*>(underlying_input_transport);
-  }
+      thrift_server_->transport_type_, input->getTransport().get(),
+      output->getTransport().get());
 
   {
     connection_ptr->server_name = thrift_server_->name_;
-    connection_ptr->network_address =
-        MakeNetworkAddress(socket->getPeerAddress(), socket->getPeerPort());
 
     lock_guard<mutex> l(thrift_server_->connection_contexts_lock_);
     uuid connection_uuid = thrift_server_->uuid_generator_();
diff --git a/be/src/transport/THttpTransport.cpp b/be/src/transport/THttpTransport.cpp
index 339e02e..b6c26dc 100644
--- a/be/src/transport/THttpTransport.cpp
+++ b/be/src/transport/THttpTransport.cpp
@@ -78,6 +78,7 @@ uint32_t THttpTransport::readEnd() {
       readChunked();
     }
   }
+  readHeaders_ = true;
   return 0;
 }
 
@@ -93,6 +94,8 @@ uint32_t THttpTransport::readMoreData() {
 
   if (chunked_) {
     size = readChunked();
+    // If we read to the end of the chunked data, we should read headers again.
+    if (size == 0) readHeaders_ = true;
   } else {
     size = readContent(contentLength_);
     readHeaders_ = true;