You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/10/08 06:05:08 UTC

[2/2] incubator-impala git commit: IMPALA-4135: Thrift threaded server times-out connections during high load

IMPALA-4135: Thrift threaded server times-out connections during high load

During times of high load, Thrift's TThreadedServer can't keep up with the
rate of new socket connections, causing some to time out.

This patch creates TAcceptQueueServer, which is a modified version of
TThreadedServer that calls accept() and then hands the returned TTransport
off to a thread pool to handle setting up the connection. This ensures that
accept() is called as quickly as possible, preventing connections from timing
out while waiting.

It also adds a metric, connection-setup-queue-size, to monitor the number
of accepted connections waiting to be processed.

A flag, --accepted_cnxn_queue_depth, controls the size of the accepted
connection buffer.

Testing:
- New test added to thrift-server-test. (Disabled by default, due to
  high ulimit -n requirement)
- Locally with the repro shown in IMPALA-4135.
- On the 16-node with a real repro query.
- Ran the stress test for a while.

Change-Id: Ie50e728974ef31a9d49132a0b3f7cde2a4f3356d
Reviewed-on: http://gerrit.cloudera.org:8080/4519
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/a9c40595
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/a9c40595
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/a9c40595

Branch: refs/heads/master
Commit: a9c40595549bfb74d2b9796a0a7098361793492e
Parents: b0f5e0a
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Thu Sep 22 16:40:31 2016 -0700
Committer: Henry Robinson <he...@cloudera.com>
Committed: Fri Oct 7 22:27:57 2016 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc     |   6 +
 be/src/rpc/CMakeLists.txt         |   1 +
 be/src/rpc/TAcceptQueueServer.cpp | 285 +++++++++++++++++++++++++++++++++
 be/src/rpc/TAcceptQueueServer.h   | 168 +++++++++++++++++++
 be/src/rpc/thrift-server-test.cc  |  23 +++
 be/src/rpc/thrift-server.cc       |  43 +++--
 be/src/rpc/thrift-server.h        |   3 +
 common/thrift/metrics.json        |  30 ++++
 8 files changed, 545 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9c40595/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index da87aa2..c67ec2c 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -109,3 +109,9 @@ DEFINE_int32(fault_injection_rpc_type, 0, "A fault injection option that specifi
 #endif
 
 DEFINE_bool(disable_kudu, false, "If true, Kudu features will be disabled.");
+
+DEFINE_bool(enable_accept_queue_server, true,
+    "If true, uses a modified version of "
+    "TThreadedServer that accepts connections as quickly as possible and hands them off "
+    "to a thread pool to finish setup, reducing the chances that connections time out "
+    "waiting to be accepted.");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9c40595/be/src/rpc/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index ade39a7..2386a85 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -24,6 +24,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc")
 add_library(Rpc
   authentication.cc
   rpc-trace.cc
+  TAcceptQueueServer.cpp
   thrift-util.cc
   thrift-client.cc
   thrift-server.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9c40595/be/src/rpc/TAcceptQueueServer.cpp
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
new file mode 100644
index 0000000..58d90f1
--- /dev/null
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// This file was copied from apache::thrift::server::TThreadedServer.cpp v0.9.0, with the
+// significant changes noted inline below.
+
+#include "rpc/TAcceptQueueServer.h"
+
+#include <thrift/concurrency/PlatformThreadFactory.h>
+#include <thrift/transport/TTransportException.h>
+
+#include <iostream>
+#include <string>
+
+#ifdef HAVE_UNISTD_H
+#include <unistd.h>
+#endif
+
+#include "util/thread-pool.h"
+
+DEFINE_int32(accepted_cnxn_queue_depth, 10000,
+    "(Advanced) The size of the post-accept, pre-setup connection queue for Impala "
+    "internal connections");
+
+namespace apache {
+namespace thrift {
+namespace server {
+
+using boost::shared_ptr;
+using namespace std;
+using namespace apache::thrift;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::concurrency;
+using namespace impala;
+
+class TAcceptQueueServer::Task : public Runnable {
+ public:
+  Task(TAcceptQueueServer& server, shared_ptr<TProcessor> processor,
+      shared_ptr<TProtocol> input, shared_ptr<TProtocol> output,
+      shared_ptr<TTransport> transport)
+    : server_(server),
+      processor_(processor),
+      input_(input),
+      output_(output),
+      transport_(transport) {}
+
+  ~Task() {}
+
+  void run() {
+    boost::shared_ptr<TServerEventHandler> eventHandler = server_.getEventHandler();
+    void* connectionContext = NULL;
+    if (eventHandler != NULL) {
+      connectionContext = eventHandler->createContext(input_, output_);
+    }
+    try {
+      for (;;) {
+        if (eventHandler != NULL) {
+          eventHandler->processContext(connectionContext, transport_);
+        }
+        if (!processor_->process(input_, output_, connectionContext)
+            || !input_->getTransport()->peek()) {
+          break;
+        }
+      }
+    } catch (const TTransportException& ttx) {
+      if (ttx.getType() != TTransportException::END_OF_FILE) {
+        string errStr = string("TAcceptQueueServer client died: ") + ttx.what();
+        GlobalOutput(errStr.c_str());
+      }
+    } catch (const std::exception& x) {
+      GlobalOutput.printf(
+          "TAcceptQueueServer exception: %s: %s", typeid(x).name(), x.what());
+    } catch (...) {
+      GlobalOutput("TAcceptQueueServer uncaught exception.");
+    }
+    if (eventHandler != NULL) {
+      eventHandler->deleteContext(connectionContext, input_, output_);
+    }
+
+    try {
+      input_->getTransport()->close();
+    } catch (TTransportException& ttx) {
+      string errStr = string("TAcceptQueueServer input close failed: ") + ttx.what();
+      GlobalOutput(errStr.c_str());
+    }
+    try {
+      output_->getTransport()->close();
+    } catch (TTransportException& ttx) {
+      string errStr = string("TAcceptQueueServer output close failed: ") + ttx.what();
+      GlobalOutput(errStr.c_str());
+    }
+
+    // Remove this task from parent bookkeeping
+    {
+      Synchronized s(server_.tasksMonitor_);
+      server_.tasks_.erase(this);
+      if (server_.tasks_.empty()) {
+        server_.tasksMonitor_.notify();
+      }
+    }
+  }
+
+ private:
+  TAcceptQueueServer& server_;
+  friend class TAcceptQueueServer;
+
+  shared_ptr<TProcessor> processor_;
+  shared_ptr<TProtocol> input_;
+  shared_ptr<TProtocol> output_;
+  shared_ptr<TTransport> transport_;
+};
+
+void TAcceptQueueServer::init() {
+  stop_ = false;
+  metrics_enabled_ = false;
+  queue_size_metric_ = NULL;
+
+  if (!threadFactory_) {
+    threadFactory_.reset(new PlatformThreadFactory);
+  }
+}
+
+TAcceptQueueServer::~TAcceptQueueServer() {}
+
+// New.
+void TAcceptQueueServer::SetupConnection(boost::shared_ptr<TTransport> client) {
+  if (metrics_enabled_) queue_size_metric_->Increment(-1);
+  shared_ptr<TTransport> inputTransport;
+  shared_ptr<TTransport> outputTransport;
+  try {
+    inputTransport = inputTransportFactory_->getTransport(client);
+    outputTransport = outputTransportFactory_->getTransport(client);
+    shared_ptr<TProtocol> inputProtocol =
+        inputProtocolFactory_->getProtocol(inputTransport);
+    shared_ptr<TProtocol> outputProtocol =
+        outputProtocolFactory_->getProtocol(outputTransport);
+
+    shared_ptr<TProcessor> processor =
+        getProcessor(inputProtocol, outputProtocol, client);
+
+    TAcceptQueueServer::Task* task = new TAcceptQueueServer::Task(
+        *this, processor, inputProtocol, outputProtocol, client);
+
+    // Create a task
+    shared_ptr<Runnable> runnable = shared_ptr<Runnable>(task);
+
+    // Create a thread for this task
+    shared_ptr<Thread> thread = shared_ptr<Thread>(threadFactory_->newThread(runnable));
+
+    // Insert thread into the set of threads
+    {
+      Synchronized s(tasksMonitor_);
+      tasks_.insert(task);
+    }
+
+    // Start the thread!
+    thread->start();
+  } catch (TException& tx) {
+    if (inputTransport != NULL) {
+      inputTransport->close();
+    }
+    if (outputTransport != NULL) {
+      outputTransport->close();
+    }
+    if (client != NULL) {
+      client->close();
+    }
+    string errStr = string("TAcceptQueueServer: Caught TException: ") + tx.what();
+    GlobalOutput(errStr.c_str());
+  } catch (string s) {
+    if (inputTransport != NULL) {
+      inputTransport->close();
+    }
+    if (outputTransport != NULL) {
+      outputTransport->close();
+    }
+    if (client != NULL) {
+      client->close();
+    }
+    string errStr = "TAcceptQueueServer: Unknown exception: " + s;
+    GlobalOutput(errStr.c_str());
+  }
+}
+
+void TAcceptQueueServer::serve() {
+  // Start the server listening
+  serverTransport_->listen();
+
+  // Run the preServe event
+  if (eventHandler_ != NULL) {
+    eventHandler_->preServe();
+  }
+
+  // Only using one thread here is sufficient for performance, and it avoids potential
+  // thread safety issues with the thrift code called in SetupConnection.
+  constexpr int CONNECTION_SETUP_POOL_SIZE = 1;
+
+  // New - this is the thread pool used to process the internal accept queue.
+  ThreadPool<shared_ptr<TTransport>> connection_setup_pool("setup-server", "setup-worker",
+      CONNECTION_SETUP_POOL_SIZE, FLAGS_accepted_cnxn_queue_depth,
+      [this](int tid, const shared_ptr<TTransport>& item) {
+        this->SetupConnection(item);
+      });
+
+  while (!stop_) {
+    try {
+      // Fetch client from server
+      shared_ptr<TTransport> client = serverTransport_->accept();
+
+      // New - the work done to setup the connection has been moved to SetupConnection.
+      if (!connection_setup_pool.Offer(client)) {
+        string errStr = string("TAcceptQueueServer: thread pool unexpectedly shut down.");
+        GlobalOutput(errStr.c_str());
+        stop_ = true;
+        break;
+      }
+      if (metrics_enabled_) queue_size_metric_->Increment(1);
+    } catch (TTransportException& ttx) {
+      if (!stop_ || ttx.getType() != TTransportException::INTERRUPTED) {
+        string errStr =
+            string("TAcceptQueueServer: TServerTransport died on accept: ") + ttx.what();
+        GlobalOutput(errStr.c_str());
+      }
+      continue;
+    } catch (TException& tx) {
+      string errStr = string("TAcceptQueueServer: Caught TException: ") + tx.what();
+      GlobalOutput(errStr.c_str());
+      continue;
+    } catch (string s) {
+      string errStr = "TAcceptQueueServer: Unknown exception: " + s;
+      GlobalOutput(errStr.c_str());
+      break;
+    }
+  }
+
+  // If stopped manually, make sure to close server transport
+  if (stop_) {
+    try {
+      serverTransport_->close();
+      connection_setup_pool.Shutdown();
+    } catch (TException& tx) {
+      string errStr = string("TAcceptQueueServer: Exception shutting down: ") + tx.what();
+      GlobalOutput(errStr.c_str());
+    }
+    try {
+      Synchronized s(tasksMonitor_);
+      while (!tasks_.empty()) {
+        tasksMonitor_.wait();
+      }
+    } catch (TException& tx) {
+      string errStr =
+          string("TAcceptQueueServer: Exception joining workers: ") + tx.what();
+      GlobalOutput(errStr.c_str());
+    }
+    stop_ = false;
+  }
+}
+
+void TAcceptQueueServer::InitMetrics(MetricGroup* metrics, const string& key_prefix) {
+  DCHECK(metrics != NULL);
+  stringstream queue_size_ss;
+  queue_size_ss << key_prefix << ".connection-setup-queue-size";
+  queue_size_metric_ = metrics->AddGauge<int64_t>(queue_size_ss.str(), 0);
+  metrics_enabled_ = true;
+}
+
+} // namespace server
+} // namespace thrift
+} // namespace apache

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9c40595/be/src/rpc/TAcceptQueueServer.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.h b/be/src/rpc/TAcceptQueueServer.h
new file mode 100644
index 0000000..9a7a78b
--- /dev/null
+++ b/be/src/rpc/TAcceptQueueServer.h
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// This file was copied from apache::thrift::server::TThreadedServer.cpp v0.9.0, with the
+// significant changes noted inline below.
+
+#ifndef _THRIFT_SERVER_TACCEPTQUEUESERVER_H_
+#define _THRIFT_SERVER_TACCEPTQUEUESERVER_H_ 1
+
+#include <thrift/concurrency/Monitor.h>
+#include <thrift/concurrency/Thread.h>
+#include <thrift/server/TServer.h>
+#include <thrift/transport/TServerTransport.h>
+
+#include <boost/shared_ptr.hpp>
+
+#include "util/metrics.h"
+
+namespace apache {
+namespace thrift {
+namespace server {
+
+using apache::thrift::TProcessor;
+using apache::thrift::transport::TServerTransport;
+using apache::thrift::transport::TTransportFactory;
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::ThreadFactory;
+
+/**
+ * In TAcceptQueueServer, the main server thread calls accept() and then immediately
+ * places the returned TTransport on a queue to be processed by a separate thread,
+ * asynchronously.
+ *
+ * This helps solve IMPALA-4135, where connections were timing out while waiting in the
+ * OS accept queue, by ensuring that accept() is called as quickly as possible.
+ */
+class TAcceptQueueServer : public TServer {
+ public:
+  class Task;
+
+  template <typename ProcessorFactory>
+  TAcceptQueueServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
+      const boost::shared_ptr<TServerTransport>& serverTransport,
+      const boost::shared_ptr<TTransportFactory>& transportFactory,
+      const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+      THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
+
+  template <typename ProcessorFactory>
+  TAcceptQueueServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
+      const boost::shared_ptr<TServerTransport>& serverTransport,
+      const boost::shared_ptr<TTransportFactory>& transportFactory,
+      const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+      const boost::shared_ptr<ThreadFactory>& threadFactory,
+      THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
+
+  template <typename Processor>
+  TAcceptQueueServer(const boost::shared_ptr<Processor>& processor,
+      const boost::shared_ptr<TServerTransport>& serverTransport,
+      const boost::shared_ptr<TTransportFactory>& transportFactory,
+      const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+      THRIFT_OVERLOAD_IF(Processor, TProcessor));
+
+  template <typename Processor>
+  TAcceptQueueServer(const boost::shared_ptr<Processor>& processor,
+      const boost::shared_ptr<TServerTransport>& serverTransport,
+      const boost::shared_ptr<TTransportFactory>& transportFactory,
+      const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+      const boost::shared_ptr<ThreadFactory>& threadFactory,
+      THRIFT_OVERLOAD_IF(Processor, TProcessor));
+
+  virtual ~TAcceptQueueServer();
+
+  virtual void serve();
+
+  void stop() {
+    stop_ = true;
+    serverTransport_->interrupt();
+  }
+
+  // New - Adds a metric for the size of the queue of connections waiting to be setup to
+  // the provided MetricGroup, prefixing its key with key_prefix.
+  void InitMetrics(impala::MetricGroup* metrics, const string& key_prefix);
+
+ protected:
+  void init();
+
+  // New - this is the work function for the thread pool, which does the work of setting
+  // up the connection and starting a thread to handle it.
+  void SetupConnection(boost::shared_ptr<TTransport> client);
+
+  boost::shared_ptr<ThreadFactory> threadFactory_;
+  volatile bool stop_;
+
+  Monitor tasksMonitor_;
+  std::set<Task*> tasks_;
+
+  /// New - True if metrics are enabled
+  bool metrics_enabled_;
+
+  /// New - Number of connections that have been accepted and are waiting to be setup.
+  impala::IntGauge* queue_size_metric_;
+};
+
+template <typename ProcessorFactory>
+TAcceptQueueServer::TAcceptQueueServer(
+    const boost::shared_ptr<ProcessorFactory>& processorFactory,
+    const boost::shared_ptr<TServerTransport>& serverTransport,
+    const boost::shared_ptr<TTransportFactory>& transportFactory,
+    const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+    THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
+  : TServer(processorFactory, serverTransport, transportFactory, protocolFactory) {
+  init();
+}
+
+template <typename ProcessorFactory>
+TAcceptQueueServer::TAcceptQueueServer(
+    const boost::shared_ptr<ProcessorFactory>& processorFactory,
+    const boost::shared_ptr<TServerTransport>& serverTransport,
+    const boost::shared_ptr<TTransportFactory>& transportFactory,
+    const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+    const boost::shared_ptr<ThreadFactory>& threadFactory,
+    THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
+  : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
+    threadFactory_(threadFactory) {
+  init();
+}
+
+template <typename Processor>
+TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<Processor>& processor,
+    const boost::shared_ptr<TServerTransport>& serverTransport,
+    const boost::shared_ptr<TTransportFactory>& transportFactory,
+    const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+    THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
+  : TServer(processor, serverTransport, transportFactory, protocolFactory) {
+  init();
+}
+
+template <typename Processor>
+TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<Processor>& processor,
+    const boost::shared_ptr<TServerTransport>& serverTransport,
+    const boost::shared_ptr<TTransportFactory>& transportFactory,
+    const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+    const boost::shared_ptr<ThreadFactory>& threadFactory,
+    THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
+  : TServer(processor, serverTransport, transportFactory, protocolFactory),
+    threadFactory_(threadFactory) {
+  init();
+}
+} // namespace server
+} // namespace thrift
+} // namespace apache
+
+#endif // #ifndef _THRIFT_SERVER_TACCEPTQUEUESERVER_H_

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9c40595/be/src/rpc/thrift-server-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index efd35cb..244097d 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -170,4 +170,27 @@ TEST(SslTest, ClientBeforeServer) {
     ssl_client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest());
 }
 
+/// Test disabled because requires a high ulimit -n on build machines. Since the test does
+/// not always fail, we don't lose much coverage by disabling it until we fix the build
+/// infra issue.
+TEST(ConcurrencyTest, DISABLED_ManyConcurrentConnections) {
+  // Test that a large number of concurrent connections will all succeed and not time out
+  // waiting to be accepted. (IMPALA-4135)
+  // Note that without the fix for IMPALA-4135, this test won't always fail, depending on
+  // the hardware that it is run on.
+  int port = GetServerPort();
+  ThriftServer* server = new ThriftServer("DummyServer", MakeProcessor(), port);
+  ASSERT_OK(server->Start());
+
+  ThreadPool<int64_t> pool(
+      "group", "test", 256, 10000, [port](int tid, const int64_t& item) {
+        using Client = ThriftClient<ImpalaInternalServiceClient>;
+        Client* client = new Client("127.0.0.1", port, "", NULL, false);
+        Status status = client->Open();
+        ASSERT_OK(status);
+      });
+  for (int i = 0; i < 1024 * 16; ++i) pool.Offer(i);
+  pool.DrainAndShutdown();
+}
+
 IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9c40595/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 18a159a..a0a86a2 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -33,7 +33,9 @@
 #include <thrift/transport/TServerSocket.h>
 #include <gflags/gflags.h>
 
+#include <sstream>
 #include "gen-cpp/Types_types.h"
+#include "rpc/TAcceptQueueServer.h"
 #include "rpc/authentication.h"
 #include "rpc/thrift-server.h"
 #include "rpc/thrift-thread.h"
@@ -41,7 +43,6 @@
 #include "util/network-util.h"
 #include "util/os-util.h"
 #include "util/uid-util.h"
-#include <sstream>
 
 #include "common/names.h"
 
@@ -60,6 +61,7 @@ using namespace apache::thrift;
 DEFINE_int32(rpc_cnxn_attempts, 10, "Deprecated");
 DEFINE_int32(rpc_cnxn_retry_interval_ms, 2000, "Deprecated");
 
+DECLARE_bool(enable_accept_queue_server);
 DECLARE_string(principal);
 DECLARE_string(keytab_file);
 DECLARE_string(ssl_client_ca_certificate);
@@ -296,17 +298,18 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext(void* serverContext
 ThriftServer::ThriftServer(const string& name,
     const boost::shared_ptr<TProcessor>& processor, int port, AuthProvider* auth_provider,
     MetricGroup* metrics, int num_worker_threads, ServerType server_type)
-    : started_(false),
-      port_(port),
-      ssl_enabled_(false),
-      num_worker_threads_(num_worker_threads),
-      server_type_(server_type),
-      name_(name),
-      server_thread_(NULL),
-      server_(NULL),
-      processor_(processor),
-      connection_handler_(NULL),
-      auth_provider_(auth_provider) {
+  : started_(false),
+    port_(port),
+    ssl_enabled_(false),
+    num_worker_threads_(num_worker_threads),
+    server_type_(server_type),
+    name_(name),
+    server_thread_(NULL),
+    server_(NULL),
+    processor_(processor),
+    connection_handler_(NULL),
+    metrics_(NULL),
+    auth_provider_(auth_provider) {
   if (auth_provider_ == NULL) {
     auth_provider_ = AuthManager::GetInstance()->GetInternalAuthProvider();
   }
@@ -318,6 +321,7 @@ ThriftServer::ThriftServer(const string& name,
     stringstream max_ss;
     max_ss << "impala.thrift-server." << name << ".total-connections";
     total_connections_metric_ = metrics->AddCounter<int64_t>(max_ss.str(), 0);
+    metrics_ = metrics;
   } else {
     metrics_enabled_ = false;
   }
@@ -416,8 +420,19 @@ Status ThriftServer::Start() {
       }
       break;
     case Threaded:
-      server_.reset(new TThreadedServer(processor_, server_socket,
-          transport_factory, protocol_factory, thread_factory));
+      if (FLAGS_enable_accept_queue_server) {
+        server_.reset(new TAcceptQueueServer(processor_, server_socket, transport_factory,
+            protocol_factory, thread_factory));
+        if (metrics_ != NULL) {
+          stringstream key_prefix_ss;
+          key_prefix_ss << "impala.thrift-server." << name_;
+          (static_cast<TAcceptQueueServer*>(server_.get()))
+              ->InitMetrics(metrics_, key_prefix_ss.str());
+        }
+      } else {
+        server_.reset(new TThreadedServer(processor_, server_socket, transport_factory,
+            protocol_factory, thread_factory));
+      }
       break;
     default:
       stringstream error_msg;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9c40595/be/src/rpc/thrift-server.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index dd788ec..43485f5 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -211,6 +211,9 @@ class ThriftServer {
       ConnectionContextSet;
   ConnectionContextSet connection_contexts_;
 
+  /// Metrics subsystem access
+  MetricGroup* metrics_;
+
   /// True if metrics are enabled
   bool metrics_enabled_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a9c40595/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 70dbb12..c61c7fe 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -610,6 +610,16 @@
     "key": "impala.thrift-server.CatalogService.total-connections"
   },
   {
+    "description": "The number of connections to the Catalog Service that have been accepted and are waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Catalog Service Connections Queued for Setup",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.CatalogService.connection-setup-queue-size"
+  },
+  {
     "description": "The number of active connections to this StateStore's StateStore service.",
     "contexts": [
       "STATESTORE"
@@ -630,6 +640,16 @@
     "key": "impala.thrift-server.StatestoreService.total-connections"
   },
   {
+    "description": "The number of connections to the Statestore Service that have been accepted and are waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Statestore Service Connections Queued for Setup",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.StatestoreService.connection-setup-queue-size"
+  },
+  {
     "description": "The number of active Impala Backend client connections to this Impala Daemon.",
     "contexts": [
       "IMPALAD"
@@ -650,6 +670,16 @@
     "key": "impala.thrift-server.backend.total-connections"
   },
   {
+    "description": "The number of connections to the Impala Backend Server that have been accepted and are waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Impala Backend Server Connections Queued for Setup",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.backend.connection-setup-queue-size"
+  },
+  {
     "description": "The number of active Beeswax API connections to this Impala Daemon.",
     "contexts": [
       "IMPALAD"