You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/06 16:39:12 UTC

[impala] branch master updated (bbe064e -> 227b839)

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from bbe064e  IMPALA-8828: Support impersonation via http paths
     new bcedd15  IMPALA-8832: Fix HTTP client protocol to work with Apache Knox
     new 227b839  IMPALA-8771: Missing stats warning for complex type columns

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/rpc/auth-provider.h                         | 24 +++++-----
 be/src/rpc/authentication.cc                       | 52 ++++++++++++----------
 be/src/rpc/thrift-server.cc                        | 26 +----------
 be/src/transport/THttpTransport.cpp                |  3 ++
 .../java/org/apache/impala/planner/ScanNode.java   |  8 +++-
 .../queries/PlannerTest/resource-requirements.test | 24 ----------
 .../compute-stats-complextype-warning.test         | 17 +++++++
 .../queries/QueryTest/show-stats.test              |  5 ++-
 tests/metadata/test_compute_stats.py               |  5 +++
 9 files changed, 79 insertions(+), 85 deletions(-)
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/compute-stats-complextype-warning.test


[impala] 02/02: IMPALA-8771: Missing stats warning for complex type columns

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 227b839e4e71778b74b045331682317e29014c7c
Author: Tamas Mate <tm...@cloudera.com>
AuthorDate: Wed Jul 31 16:20:06 2019 +0200

    IMPALA-8771: Missing stats warning for complex type columns
    
    An extra condition is added to the table stats checking, so that the
    complex type columns are skipped and can not trigger missing stats
    warning.
    
    Change-Id: Ia1b5c14da0c7f6eab373d80b2dbf7c974b2eb567
    Reviewed-on: http://gerrit.cloudera.org:8080/13965
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Tim Armstrong <ta...@cloudera.com>
---
 .../java/org/apache/impala/planner/ScanNode.java   |  8 +++++++-
 .../queries/PlannerTest/resource-requirements.test | 24 ----------------------
 .../compute-stats-complextype-warning.test         | 17 +++++++++++++++
 .../queries/QueryTest/show-stats.test              |  5 +++--
 tests/metadata/test_compute_stats.py               |  5 +++++
 5 files changed, 32 insertions(+), 27 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/ScanNode.java b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
index 78c24af..78f3510 100644
--- a/fe/src/main/java/org/apache/impala/planner/ScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/ScanNode.java
@@ -180,9 +180,15 @@ abstract public class ScanNode extends PlanNode {
     return false;
   }
 
+  /**
+   * Returns true if the column does not have stats, complex type columns are skipped.
+   */
   public boolean isTableMissingColumnStats() {
     for (SlotDescriptor slot: desc_.getSlots()) {
-      if (slot.getColumn() != null && !slot.getStats().hasStats()) return true;
+      if (slot.getColumn() != null && !slot.getStats().hasStats() &&
+          !slot.getColumn().getType().isComplexType()) {
+        return true;
+      }
     }
     return false;
   }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 5faf6d3..25abffb 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -554,8 +554,6 @@ from tpch_nested_parquet.customer c, c.c_orders
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=32.00MB Threads=2
 Per-Host Resource Estimates: Memory=88MB
-WARNING: The following tables are missing relevant table and/or column statistics.
-tpch_nested_parquet.customer
 Analyzed query: SELECT c_custkey, o_orderkey, o_orderstatus, o_totalprice,
 o_orderdate, o_orderpriority, o_clerk FROM tpch_nested_parquet.customer c,
 c.c_orders
@@ -605,8 +603,6 @@ from tpch_nested_parquet.customer c, c.c_orders
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=8.00MB Threads=2
 Per-Host Resource Estimates: Memory=88MB
-WARNING: The following tables are missing relevant table and/or column statistics.
-tpch_nested_parquet.customer
 Analyzed query: SELECT c_custkey, o_orderkey, pos FROM
 tpch_nested_parquet.customer c, c.c_orders
 
@@ -656,8 +652,6 @@ from tpch_nested_parquet.customer c, c.c_orders
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=8.00MB Threads=2
 Per-Host Resource Estimates: Memory=88MB
-WARNING: The following tables are missing relevant table and/or column statistics.
-tpch_nested_parquet.customer
 Analyzed query: SELECT c_custkey, pos FROM tpch_nested_parquet.customer c,
 c.c_orders
 
@@ -707,8 +701,6 @@ from tpch_nested_parquet.customer c, c.c_orders
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=8.00MB Threads=2
 Per-Host Resource Estimates: Memory=88MB
-WARNING: The following tables are missing relevant table and/or column statistics.
-tpch_nested_parquet.customer
 Analyzed query: SELECT c_custkey FROM tpch_nested_parquet.customer c, c.c_orders
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -756,8 +748,6 @@ from tpch_nested_parquet.customer c, c.c_orders
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=4.00MB Threads=2
 Per-Host Resource Estimates: Memory=88MB
-WARNING: The following tables are missing relevant table and/or column statistics.
-tpch_nested_parquet.customer
 Analyzed query: SELECT o_orderkey FROM tpch_nested_parquet.customer c,
 c.c_orders
 
@@ -806,8 +796,6 @@ from tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=16.00MB Threads=2
 Per-Host Resource Estimates: Memory=88MB
-WARNING: The following tables are missing relevant table and/or column statistics.
-tpch_nested_parquet.customer
 Analyzed query: SELECT c_custkey, o_orderkey, l_comment FROM
 tpch_nested_parquet.customer c, c.c_orders o, o.o_lineitems
 
@@ -4521,8 +4509,6 @@ from tpch_nested_parquet.customer c,
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=85.94MB Threads=2
 Per-Host Resource Estimates: Memory=346MB
-WARNING: The following tables are missing relevant table and/or column statistics.
-tpch_nested_parquet.customer
 Analyzed query: SELECT DISTINCT c_name, v.o_orderkey, v.o_orderstatus FROM
 tpch_nested_parquet.customer c, (SELECT DISTINCT o1.o_orderkey, o2.o_orderstatus
 FROM c.c_orders o1 INNER JOIN c.c_orders o2 ON o1.o_orderkey = o2.o_orderkey
@@ -4598,8 +4584,6 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=119.94MB Threads=4
 Per-Host Resource Estimates: Memory=494MB
-WARNING: The following tables are missing relevant table and/or column statistics.
-tpch_nested_parquet.customer
 Analyzed query: SELECT DISTINCT c_name, v.o_orderkey, v.o_orderstatus FROM
 tpch_nested_parquet.customer c, (SELECT DISTINCT o1.o_orderkey, o2.o_orderstatus
 FROM c.c_orders o1 INNER JOIN c.c_orders o2 ON o1.o_orderkey = o2.o_orderkey
@@ -4695,8 +4679,6 @@ Per-Host Resources: mem-estimate=345.94MB mem-reservation=85.94MB thread-reserva
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=239.88MB Threads=5
 Per-Host Resource Estimates: Memory=979MB
-WARNING: The following tables are missing relevant table and/or column statistics.
-tpch_nested_parquet.customer
 Analyzed query: SELECT DISTINCT c_name, v.o_orderkey, v.o_orderstatus FROM
 tpch_nested_parquet.customer c, (SELECT DISTINCT o1.o_orderkey, o2.o_orderstatus
 FROM c.c_orders o1 INNER JOIN c.c_orders o2 ON o1.o_orderkey = o2.o_orderkey
@@ -4801,8 +4783,6 @@ from tpch_nested_parquet.customer c,
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=104.00MB Threads=2
 Per-Host Resource Estimates: Memory=136MB
-WARNING: The following tables are missing relevant table and/or column statistics.
-tpch_nested_parquet.customer
 Analyzed query: SELECT * FROM tpch_nested_parquet.customer c, (SELECT *,
 row_number() OVER (ORDER BY o_totalprice ASC) rnum_price, row_number() OVER
 (ORDER BY o_orderdate ASC) rnum_date, row_number() OVER (ORDER BY
@@ -4889,8 +4869,6 @@ PLAN-ROOT SINK
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=104.00MB Threads=3
 Per-Host Resource Estimates: Memory=147MB
-WARNING: The following tables are missing relevant table and/or column statistics.
-tpch_nested_parquet.customer
 Analyzed query: SELECT * FROM tpch_nested_parquet.customer c, (SELECT *,
 row_number() OVER (ORDER BY o_totalprice ASC) rnum_price, row_number() OVER
 (ORDER BY o_orderdate ASC) rnum_date, row_number() OVER (ORDER BY
@@ -4984,8 +4962,6 @@ Per-Host Resources: mem-estimate=136.00MB mem-reservation=104.00MB thread-reserv
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=208.00MB Threads=3
 Per-Host Resource Estimates: Memory=284MB
-WARNING: The following tables are missing relevant table and/or column statistics.
-tpch_nested_parquet.customer
 Analyzed query: SELECT * FROM tpch_nested_parquet.customer c, (SELECT *,
 row_number() OVER (ORDER BY o_totalprice ASC) rnum_price, row_number() OVER
 (ORDER BY o_orderdate ASC) rnum_date, row_number() OVER (ORDER BY
diff --git a/testdata/workloads/functional-query/queries/QueryTest/compute-stats-complextype-warning.test b/testdata/workloads/functional-query/queries/QueryTest/compute-stats-complextype-warning.test
new file mode 100644
index 0000000..468000f
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/compute-stats-complextype-warning.test
@@ -0,0 +1,17 @@
+====
+---- QUERY
+# The missing stats warning should appear when the stats are not available,
+# in this case it is due to missing table stats.
+create table if not exists complex_collection (list ARRAY < STRING >);
+explain select count(*) from complex_collection c, c.list;
+---- RESULTS: VERIFY_IS_SUBSET
+'WARNING: The following tables are missing relevant table and/or column statistics.'
+====
+---- QUERY
+# Although stats are not available for complex types the missing stats warning
+# should not appear when stats are available.
+compute stats complex_collection;
+explain select count(*) from complex_collection c, c.list;
+---- RESULTS: VERIFY_IS_NOT_IN
+'WARNING: The following tables are missing relevant table and/or column statistics.'
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/show-stats.test b/testdata/workloads/functional-query/queries/QueryTest/show-stats.test
index d820a8c..73cdfb1 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/show-stats.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/show-stats.test
@@ -148,7 +148,8 @@ COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE
 STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE
 ====
 ---- QUERY
-# Column column stats for a table with complex types.
+# Column stats for a table with complex types.
+# TODO: when complex type stats are supported revisit: compute-stats-complextype-warning.test
 show column stats functional.allcomplextypes
 ---- LABELS
 COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE
@@ -170,4 +171,4 @@ COLUMN, TYPE, #DISTINCT VALUES, #NULLS, MAX SIZE, AVG SIZE
 'month','INT',0,0,4,4
 ---- TYPES
 STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE
-====
\ No newline at end of file
+====
diff --git a/tests/metadata/test_compute_stats.py b/tests/metadata/test_compute_stats.py
index ec81909..70e99f5 100644
--- a/tests/metadata/test_compute_stats.py
+++ b/tests/metadata/test_compute_stats.py
@@ -81,6 +81,11 @@ class TestComputeStats(ImpalaTestSuite):
   def test_compute_stats_incremental(self, vector, unique_database):
     self.run_test_case('QueryTest/compute-stats-incremental', vector, unique_database)
 
+  @SkipIfS3.eventually_consistent
+  def test_compute_stats_complextype_warning(self, vector, unique_database):
+    self.run_test_case('QueryTest/compute-stats-complextype-warning', vector,
+        unique_database)
+
   @pytest.mark.execute_serially
   @SkipIfS3.eventually_consistent
   def test_compute_stats_many_partitions(self, vector):


[impala] 01/02: IMPALA-8832: Fix HTTP client protocol to work with Apache Knox

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit bcedd1572ee69bf5f5551af08f8fcb0ae0c48aea
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
AuthorDate: Thu Aug 1 04:29:52 2019 +0000

    IMPALA-8832: Fix HTTP client protocol to work with Apache Knox
    
    This patch fixes two bugs with Impala's HTTP client protocol:
    - THttpServer transports are no longer wrapped in TBufferedTransports.
      THttpServer already has its own support for buffering to process one
      HTTP request at a time, and wrapping it in a TBufferedTransport
      interferes with this, in some cases causing client requests to
      either not be processed or to recieve multiple responses.
    - Fixes a bug in THttpTransport where when a chunked HTTP request is
      finished being processed, the 'readHeaders_' variable is never reset
      and further requests over the connection are not processed.
    
    Testing:
    - Tested by proxying beeline connections to Impala through Apache Knox
    
    Change-Id: I5c9d934a654a9e6aaf9207fa5856f956baaacf55
    Reviewed-on: http://gerrit.cloudera.org:8080/14008
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/auth-provider.h          | 24 +++++++++--------
 be/src/rpc/authentication.cc        | 52 +++++++++++++++++++++----------------
 be/src/rpc/thrift-server.cc         | 26 ++-----------------
 be/src/transport/THttpTransport.cpp |  3 +++
 4 files changed, 47 insertions(+), 58 deletions(-)

diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index 095ecac..3b5dc29 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -40,10 +40,9 @@ class AuthProvider {
   virtual Status Start() WARN_UNUSED_RESULT = 0;
 
   /// Creates a new Thrift transport factory in the out parameter that performs
-  /// authorisation per this provider's protocol. The top-level transport returned by
-  /// 'factory' must always be a TBufferedTransport, but depending on the AuthProvider
-  /// implementation and the value of 'underlying_transport_type', that may be wrapped
-  /// around another transport type, eg. a TSaslServerTransport.
+  /// authorisation per this provider's protocol. The type of the transport returned is
+  /// determined by 'underlying_transport_type' and there may be multiple levels of
+  /// wrapped transports, eg. a TBufferedTransport around a TSaslServerTransport.
   virtual Status GetServerTransportFactory(
       ThriftServer::TransportType underlying_transport_type,
       const std::string& server_name, MetricGroup* metrics,
@@ -60,12 +59,15 @@ class AuthProvider {
       boost::shared_ptr<apache::thrift::transport::TTransport>* wrapped_transport)
       WARN_UNUSED_RESULT = 0;
 
-  /// Setup 'connection_ptr' to get its username with the given transports.
+  /// Setup 'connection_ptr' to get its username with the given transports and sets
+  /// 'network_address' based on the underlying socket. The transports should be generated
+  /// by the factory returned by GetServerTransportFactory() when called with the same
+  /// 'underlying_transport_type'.
   virtual void SetupConnectionContext(
       const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
       ThriftServer::TransportType underlying_transport_type,
-      apache::thrift::transport::TTransport* underlying_input_transport,
-      apache::thrift::transport::TTransport* underlying_output_transport) = 0;
+      apache::thrift::transport::TTransport* input_transport,
+      apache::thrift::transport::TTransport* output_transport) = 0;
 
   /// Returns true if this provider uses Sasl at the transport layer.
   virtual bool is_secure() = 0;
@@ -113,8 +115,8 @@ class SecureAuthProvider : public AuthProvider {
   virtual void SetupConnectionContext(
       const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
       ThriftServer::TransportType underlying_transport_type,
-      apache::thrift::transport::TTransport* underlying_input_transport,
-      apache::thrift::transport::TTransport* underlying_output_transport);
+      apache::thrift::transport::TTransport* input_transport,
+      apache::thrift::transport::TTransport* output_transport);
 
   virtual bool is_secure() { return true; }
 
@@ -193,8 +195,8 @@ class NoAuthProvider : public AuthProvider {
   virtual void SetupConnectionContext(
       const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
       ThriftServer::TransportType underlying_transport_type,
-      apache::thrift::transport::TTransport* underlying_input_transport,
-      apache::thrift::transport::TTransport* underlying_output_transport);
+      apache::thrift::transport::TTransport* input_transport,
+      apache::thrift::transport::TTransport* output_transport);
 
   virtual bool is_secure() { return false; }
 };
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index a7c04ee..1d253d8 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -954,9 +954,8 @@ Status SecureAuthProvider::GetServerTransportFactory(
 
   if (underlying_transport_type == ThriftServer::HTTP) {
     bool has_kerberos = !principal_.empty();
-    factory->reset(new ThriftServer::BufferedTransportFactory(
-        ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
-        new THttpServerTransportFactory(server_name, metrics, has_ldap_, has_kerberos)));
+    factory->reset(
+        new THttpServerTransportFactory(server_name, metrics, has_ldap_, has_kerberos));
     return Status::OK();
   }
 
@@ -1027,22 +1026,23 @@ Status SecureAuthProvider::WrapClientTransport(const string& hostname,
 
 void SecureAuthProvider::SetupConnectionContext(
     const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
-    ThriftServer::TransportType underlying_transport_type,
-    TTransport* underlying_input_transport, TTransport* underlying_output_transport) {
+    ThriftServer::TransportType underlying_transport_type, TTransport* input_transport,
+    TTransport* output_transport) {
+  TSocket* socket = nullptr;
   switch (underlying_transport_type) {
     case ThriftServer::BINARY: {
-      TSaslServerTransport* sasl_transport =
-          down_cast<TSaslServerTransport*>(underlying_input_transport);
-
+      TBufferedTransport* buffered_transport =
+          down_cast<TBufferedTransport*>(input_transport);
+      TSaslServerTransport* sasl_transport = down_cast<TSaslServerTransport*>(
+          buffered_transport->getUnderlyingTransport().get());
+      socket = down_cast<TSocket*>(sasl_transport->getUnderlyingTransport().get());
       // Get the username from the transport.
       connection_ptr->username = sasl_transport->getUsername();
       break;
     }
     case ThriftServer::HTTP: {
-      THttpServer* http_input_transport =
-          down_cast<THttpServer*>(underlying_input_transport);
-      THttpServer* http_output_transport =
-          down_cast<THttpServer*>(underlying_output_transport);
+      THttpServer* http_input_transport = down_cast<THttpServer*>(input_transport);
+      THttpServer* http_output_transport = down_cast<THttpServer*>(output_transport);
       THttpServer::HttpCallbacks callbacks;
       callbacks.path_fn = std::bind(
           HttpPathFn, connection_ptr.get(), std::placeholders::_1, std::placeholders::_2);
@@ -1057,11 +1057,14 @@ void SecureAuthProvider::SetupConnectionContext(
       }
       http_input_transport->setCallbacks(callbacks);
       http_output_transport->setCallbacks(callbacks);
+      socket = down_cast<TSocket*>(http_input_transport->getUnderlyingTransport().get());
       break;
     }
     default:
       LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
   }
+  connection_ptr->network_address =
+      MakeNetworkAddress(socket->getPeerAddress(), socket->getPeerPort());
 }
 
 Status NoAuthProvider::GetServerTransportFactory(
@@ -1073,9 +1076,7 @@ Status NoAuthProvider::GetServerTransportFactory(
       factory->reset(new ThriftServer::BufferedTransportFactory());
       break;
     case ThriftServer::HTTP:
-      factory->reset(new ThriftServer::BufferedTransportFactory(
-          ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES,
-          new THttpServerTransportFactory()));
+      factory->reset(new THttpServerTransportFactory());
       break;
     default:
       LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
@@ -1093,18 +1094,20 @@ Status NoAuthProvider::WrapClientTransport(const string& hostname,
 
 void NoAuthProvider::SetupConnectionContext(
     const boost::shared_ptr<ThriftServer::ConnectionContext>& connection_ptr,
-    ThriftServer::TransportType underlying_transport_type,
-    TTransport* underlying_input_transport, TTransport* underlying_output_transport) {
+    ThriftServer::TransportType underlying_transport_type, TTransport* input_transport,
+    TTransport* output_transport) {
   connection_ptr->username = "";
+  TSocket* socket = nullptr;
   switch (underlying_transport_type) {
-    case ThriftServer::BINARY:
-      // Intentionally blank - since there's no security, there's nothing to set up here.
+    case ThriftServer::BINARY: {
+      TBufferedTransport* buffered_transport =
+          down_cast<TBufferedTransport*>(input_transport);
+      socket = down_cast<TSocket*>(buffered_transport->getUnderlyingTransport().get());
       break;
+    }
     case ThriftServer::HTTP: {
-      THttpServer* http_input_transport =
-          down_cast<THttpServer*>(underlying_input_transport);
-      THttpServer* http_output_transport =
-          down_cast<THttpServer*>(underlying_input_transport);
+      THttpServer* http_input_transport = down_cast<THttpServer*>(input_transport);
+      THttpServer* http_output_transport = down_cast<THttpServer*>(input_transport);
       THttpServer::HttpCallbacks callbacks;
       // Even though there's no security, we set up some callbacks, eg. to allow
       // impersonation over unsecured connections for testing purposes.
@@ -1113,11 +1116,14 @@ void NoAuthProvider::SetupConnectionContext(
       callbacks.return_headers_fn = std::bind(ReturnHeaders, connection_ptr.get());
       http_input_transport->setCallbacks(callbacks);
       http_output_transport->setCallbacks(callbacks);
+      socket = down_cast<TSocket*>(http_input_transport->getUnderlyingTransport().get());
       break;
     }
     default:
       LOG(FATAL) << Substitute("Bad transport type: $0", underlying_transport_type);
   }
+  connection_ptr->network_address =
+      MakeNetworkAddress(socket->getPeerAddress(), socket->getPeerPort());
 }
 
 Status AuthManager::Init() {
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index fe475a1..d4c8c86 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -201,36 +201,14 @@ const ThriftServer::ConnectionContext* ThriftServer::GetThreadConnectionContext(
 
 void* ThriftServer::ThriftServerEventProcessor::createContext(
     boost::shared_ptr<TProtocol> input, boost::shared_ptr<TProtocol> output) {
-  TSocket* socket = NULL;
-  TTransport* transport = input->getTransport().get();
   boost::shared_ptr<ConnectionContext> connection_ptr =
       boost::shared_ptr<ConnectionContext>(new ConnectionContext);
-  TTransport* underlying_input_transport =
-      (static_cast<TBufferedTransport*>(transport))->getUnderlyingTransport().get();
-  TTransport* underlying_output_transport =
-      (static_cast<TBufferedTransport*>(output->getTransport().get()))
-          ->getUnderlyingTransport()
-          .get();
-
   thrift_server_->auth_provider_->SetupConnectionContext(connection_ptr,
-      thrift_server_->transport_type_, underlying_input_transport,
-      underlying_output_transport);
-  if (thrift_server_->auth_provider_->is_secure()
-      && thrift_server_->transport_type_ == ThriftServer::BINARY) {
-    TSaslServerTransport* sasl_transport =
-        static_cast<TSaslServerTransport*>(underlying_input_transport);
-    socket = static_cast<TSocket*>(sasl_transport->getUnderlyingTransport().get());
-  } else if (thrift_server_->transport_type_ == ThriftServer::HTTP) {
-    THttpServer* http_transport = static_cast<THttpServer*>(underlying_input_transport);
-    socket = static_cast<TSocket*>(http_transport->getUnderlyingTransport().get());
-  } else {
-    socket = static_cast<TSocket*>(underlying_input_transport);
-  }
+      thrift_server_->transport_type_, input->getTransport().get(),
+      output->getTransport().get());
 
   {
     connection_ptr->server_name = thrift_server_->name_;
-    connection_ptr->network_address =
-        MakeNetworkAddress(socket->getPeerAddress(), socket->getPeerPort());
 
     lock_guard<mutex> l(thrift_server_->connection_contexts_lock_);
     uuid connection_uuid = thrift_server_->uuid_generator_();
diff --git a/be/src/transport/THttpTransport.cpp b/be/src/transport/THttpTransport.cpp
index 339e02e..b6c26dc 100644
--- a/be/src/transport/THttpTransport.cpp
+++ b/be/src/transport/THttpTransport.cpp
@@ -78,6 +78,7 @@ uint32_t THttpTransport::readEnd() {
       readChunked();
     }
   }
+  readHeaders_ = true;
   return 0;
 }
 
@@ -93,6 +94,8 @@ uint32_t THttpTransport::readMoreData() {
 
   if (chunked_) {
     size = readChunked();
+    // If we read to the end of the chunked data, we should read headers again.
+    if (size == 0) readHeaders_ = true;
   } else {
     size = readContent(contentLength_);
     readHeaders_ = true;