You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/06 16:39:13 UTC
[impala] 01/02: IMPALA-8832: Fix HTTP client protocol to work with
Apache Knox
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit bcedd1572ee69bf5f5551af08f8fcb0ae0c48aea
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Thu Aug 1 04:29:52 2019 +0000
IMPALA-8832: Fix HTTP client protocol to work with Apache Knox
This patch fixes two bugs with Impala's HTTP client protocol:
- THttpServer transports are no longer wrapped in TBufferedTransports.
THttpServer already has its own support for buffering to process one
HTTP request at a time, and wrapping it in a TBufferedTransport
interferes with this, in some cases causing client requests to
either not be processed or to recieve multiple responses.
- Fixes a bug in THttpTransport where when a chunked HTTP request is
finished being processed, the 'readHeaders_' variable is never reset
and further requests over the connection are not processed.
Testing:
- Tested by proxying beeline connections to Impala through Apache Knox
Change-Id: I5c9d934a654a9e6aaf9207fa5856f956baaacf55
Reviewed-on: http://gerrit.cloudera.org:8080/14008
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/rpc/auth-provider.h | 24 +++++++++--------
be/src/rpc/authentication.cc | 52 +++++++++++++++++++++----------------
be/src/rpc/thrift-server.cc | 26 ++-----------------
be/src/transport/THttpTransport.cpp | 3 +++
4 files changed, 47 insertions(+), 58 deletions(-)
diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index 095ecac..3b5dc29 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -40,10 +40,9 @@ class AuthProvider {
virtual Status Start() WARN_UNUSED_RESULT = 0;
/// Creates a new Thrift transport factory in the out parameter that performs
- /// authorisation per this provider's protocol. The top-level transport returned by
- /// 'factory' must always be a TBufferedTransport, but depending on the AuthProvider
- /// implementation and the value of 'underlying_transport_type', that may be wrapped
- /// around another transport type, eg. a TSaslServerTransport.
+ /// authorisation per this provider's protocol. The type of the transport returned is
+ /// determined by 'underlying_transport_type' and there may be multiple levels of
+ /// wrapped transports, eg. a TBufferedTransport around a TSaslServerTransport.
virtual Status GetServerTransportFactory(
ThriftServer::TransportType underlying_transport_type,
const std::string& server_name, MetricGroup* metrics,
@@ -60,12 +59,15 @@ class AuthProvider {
boost::shared_ptr<apache::thrift::transport::TTransport>* wrapped_transport)
WARN_UNUSED_RESULT = 0;
- /// Setup 'connection_ptr' to get its username with the given transports.
+ /// Setup 'connection_ptr' to get its username with the given transports and sets
+ /// 'network_address' based on the underlying socket. The transports should be generated
+ /// by the factory returned by GetServerTransportFactory() when called with the same
+ /// 'underlying_transport_type'.
virtual void SetupConnectionContext(
const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
ThriftServer::TransportType underlying_transport_type,
- apache::thrift::transport::TTransport* underlying_input_transport,
- apache::thrift::transport::TTransport* underlying_output_transport) = 0;
+ apache::thrift::transport::TTransport* input_transport,
+ apache::thrift::transport::TTransport* output_transport) = 0;
/// Returns true if this provider uses Sasl at the transport layer.
virtual bool is_secure() = 0;
@@ -113,8 +115,8 @@ class SecureAuthProvider : public AuthProvider {
virtual void SetupConnectionContext(
const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
ThriftServer::TransportType underlying_transport_type,
- apache::thrift::transport::TTransport* underlying_input_transport,
- apache::thrift::transport::TTransport* underlying_output_transport);
+ apache::thrift::transport::TTransport* input_transport,
+ apache::thrift::transport::TTransport* output_transport);
virtual bool is_secure() { return true; }
@@ -193,8 +195,8 @@ class NoAuthProvider : public AuthProvider {
virtual void SetupConnectionContext(
const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
ThriftServer::TransportType underlying_transport_type,
- apache::thrift::transport::TTransport* underlying_input_transport,
- apache::thrift::transport::TTransport* underlying_output_transport);
+ apache::thrift::transport::TTransport* input_transport,
+ apache::thrift::transport::TTransport* output_transport);
virtual bool is_secure() { return false; }
};
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index a7c04ee..1d253d8 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -954,9 +954,8 @@ Status SecureAuthProvider::GetServerTransportFactory(
if (underlying_transport_type == ThriftServer::HTTP) {
bool has_kerberos = !principal_.empty();
- factory->reset(new ThriftServer::BufferedTransportFactory(
- ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
- new THttpServerTransportFactory(server_name, metrics, has_ldap_, has_kerberos)));
+ factory->reset(
+ new THttpServerTransportFactory(server_name, metrics, has_ldap_, has_kerberos));
return Status::OK();
}
@@ -1027,22 +1026,23 @@ Status SecureAuthProvider::WrapClientTransport(const string& hostname,
void SecureAuthProvider::SetupConnectionContext(
const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
- ThriftServer::TransportType underlying_transport_type,
- TTransport* underlying_input_transport, TTransport* underlying_output_transport) {
+ ThriftServer::TransportType underlying_transport_type, TTransport* input_transport,
+ TTransport* output_transport) {
+ TSocket* socket = nullptr;
switch (underlying_transport_type) {
case ThriftServer::BINARY: {
- TSaslServerTransport* sasl_transport =
- down_cast<TSaslServerTransport*>(underlying_input_transport);
-
+ TBufferedTransport* buffered_transport =
+ down_cast<TBufferedTransport*>(input_transport);
+ TSaslServerTransport* sasl_transport = down_cast<TSaslServerTransport*>(
+ buffered_transport->getUnderlyingTransport().get());
+ socket = down_cast<TSocket*>(sasl_transport->getUnderlyingTransport().get());
// Get the username from the transport.
connection_ptr->username = sasl_transport->getUsername();
break;
}
case ThriftServer::HTTP: {
- THttpServer* http_input_transport =
- down_cast<THttpServer*>(underlying_input_transport);
- THttpServer* http_output_transport =
- down_cast<THttpServer*>(underlying_output_transport);
+ THttpServer* http_input_transport = down_cast<THttpServer*>(input_transport);
+ THttpServer* http_output_transport = down_cast<THttpServer*>(output_transport);
THttpServer::HttpCallbacks callbacks;
callbacks.path_fn = std::bind(
HttpPathFn, connection_ptr.get(), std::placeholders::_1, std::placeholders::_2);
@@ -1057,11 +1057,14 @@ void SecureAuthProvider::SetupConnectionContext(
}
http_input_transport->setCallbacks(callbacks);
http_output_transport->setCallbacks(callbacks);
+ socket = down_cast<TSocket*>(http_input_transport->getUnderlyingTransport().get());
break;
}
default:
LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
}
+ connection_ptr->network_address =
+ MakeNetworkAddress(socket->getPeerAddress(), socket->getPeerPort());
}
Status NoAuthProvider::GetServerTransportFactory(
@@ -1073,9 +1076,7 @@ Status NoAuthProvider::GetServerTransportFactory(
factory->reset(new ThriftServer::BufferedTransportFactory());
break;
case ThriftServer::HTTP:
- factory->reset(new ThriftServer::BufferedTransportFactory(
- ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
- new THttpServerTransportFactory()));
+ factory->reset(new THttpServerTransportFactory());
break;
default:
LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
@@ -1093,18 +1094,20 @@ Status NoAuthProvider::WrapClientTransport(const string& hostname,
void NoAuthProvider::SetupConnectionContext(
const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
- ThriftServer::TransportType underlying_transport_type,
- TTransport* underlying_input_transport, TTransport* underlying_output_transport) {
+ ThriftServer::TransportType underlying_transport_type, TTransport* input_transport,
+ TTransport* output_transport) {
connection_ptr->username = "";
+ TSocket* socket = nullptr;
switch (underlying_transport_type) {
- case ThriftServer::BINARY:
- // Intentionally blank - since there's no security, there's nothing to set up here.
+ case ThriftServer::BINARY: {
+ TBufferedTransport* buffered_transport =
+ down_cast<TBufferedTransport*>(input_transport);
+ socket = down_cast<TSocket*>(buffered_transport->getUnderlyingTransport().get());
break;
+ }
case ThriftServer::HTTP: {
- THttpServer* http_input_transport =
- down_cast<THttpServer*>(underlying_input_transport);
- THttpServer* http_output_transport =
- down_cast<THttpServer*>(underlying_input_transport);
+ THttpServer* http_input_transport = down_cast<THttpServer*>(input_transport);
+ THttpServer* http_output_transport = down_cast<THttpServer*>(input_transport);
THttpServer::HttpCallbacks callbacks;
// Even though there's no security, we set up some callbacks, eg. to allow
// impersonation over unsecured connections for testing purposes.
@@ -1113,11 +1116,14 @@ void NoAuthProvider::SetupConnectionContext(
callbacks.return_headers_fn = std::bind(ReturnHeaders, connection_ptr.get());
http_input_transport->setCallbacks(callbacks);
http_output_transport->setCallbacks(callbacks);
+ socket = down_cast<TSocket*>(http_input_transport->getUnderlyingTransport().get());
break;
}
default:
LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
}
+ connection_ptr->network_address =
+ MakeNetworkAddress(socket->getPeerAddress(), socket->getPeerPort());
}
Status AuthManager::Init() {
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index fe475a1..d4c8c86 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -201,36 +201,14 @@ const ThriftServer::ConnectionContext* ThriftServer::GetThreadConnectionContext(
void* ThriftServer::ThriftServerEventProcessor::createContext(
boost::shared_ptr<TProtocol> input, boost::shared_ptr<TProtocol> output) {
- TSocket* socket = NULL;
- TTransport* transport = input->getTransport().get();
boost::shared_ptr<ConnectionContext> connection_ptr =
boost::shared_ptr<ConnectionContext>(new ConnectionContext);
- TTransport* underlying_input_transport =
- (static_cast<TBufferedTransport*>(transport))->getUnderlyingTransport().get();
- TTransport* underlying_output_transport =
- (static_cast<TBufferedTransport*>(output->getTransport().get()))
- ->getUnderlyingTransport()
- .get();
-
thrift_server_->auth_provider_->SetupConnectionContext(connection_ptr,
- thrift_server_->transport_type_, underlying_input_transport,
- underlying_output_transport);
- if (thrift_server_->auth_provider_->is_secure()
- && thrift_server_->transport_type_ == ThriftServer::BINARY) {
- TSaslServerTransport* sasl_transport =
- static_cast<TSaslServerTransport*>(underlying_input_transport);
- socket = static_cast<TSocket*>(sasl_transport->getUnderlyingTransport().get());
- } else if (thrift_server_->transport_type_ == ThriftServer::HTTP) {
- THttpServer* http_transport = static_cast<THttpServer*>(underlying_input_transport);
- socket = static_cast<TSocket*>(http_transport->getUnderlyingTransport().get());
- } else {
- socket = static_cast<TSocket*>(underlying_input_transport);
- }
+ thrift_server_->transport_type_, input->getTransport().get(),
+ output->getTransport().get());
{
connection_ptr->server_name = thrift_server_->name_;
- connection_ptr->network_address =
- MakeNetworkAddress(socket->getPeerAddress(), socket->getPeerPort());
lock_guard<mutex> l(thrift_server_->connection_contexts_lock_);
uuid connection_uuid = thrift_server_->uuid_generator_();
diff --git a/be/src/transport/THttpTransport.cpp b/be/src/transport/THttpTransport.cpp
index 339e02e..b6c26dc 100644
--- a/be/src/transport/THttpTransport.cpp
+++ b/be/src/transport/THttpTransport.cpp
@@ -78,6 +78,7 @@ uint32_t THttpTransport::readEnd() {
readChunked();
}
}
+ readHeaders_ = true;
return 0;
}
@@ -93,6 +94,8 @@ uint32_t THttpTransport::readMoreData() {
if (chunked_) {
size = readChunked();
+ // If we read to the end of the chunked data, we should read headers again.
+ if (size == 0) readHeaders_ = true;
} else {
size = readContent(contentLength_);
readHeaders_ = true;