You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2017/10/05 05:38:56 UTC

[1/3] incubator-impala git commit: IMPALA-5394: Change ThriftServer() to always use TAcceptQueueServer

Repository: incubator-impala
Updated Branches:
  refs/heads/master 72072f6e8 -> c9740b43d


IMPALA-5394: Change ThriftServer() to always use TAcceptQueueServer

- Previously TThreadPoolServer called getTransport() on a client from
  the Server thread (the thread that did the accepts).
  - TSaslServerTransport->getTransport() called TSaslTransport->open()
  - TSaslServerTransport->open() tried to negotiate SASL which calls
    read/write
    - If read/write blocks indefinitely, the TThreadPoolServer could
      not accept connections until tcp_keepalive kicked in.
- Set the underlying TSocket's recvTimeout and sendTimeout before the
  TSaslServerTransport->open() and reset them to 0 after open()
  completes.
- Added sasl_connect_tcp_timeout_ms flag that defaults to 300000
  milliseconds (5 minutes)
- Add the ability for TAcceptQueueServer to limit the maximum
  number of concurrent tasks
- Added a test case to thrift-server-test to test
  max_concurrent_connections enforcement
- Changed the remaining Thrift servers to use TAcceptQueueServer.
  (hs2/beeswax/network-perf-benchmark)
  - The timeout is still needed in TAcceptQueueServer since
    SetupConnection follows a similar pattern that TThreadPoolServer
    does.
- Removed support for TThreadPool from ThriftServer() since it is
  no longer used anywhere. ThriftServer() now always uses
  TAcceptQueueServer.
- Deprecated enable_accept_queue_server flag and removed supporting
  code.

Change-Id: I56a5f3d9cf931cff14eae7f236fea018236a6255
Reviewed-on: http://gerrit.cloudera.org:8080/7061
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4dd0f1b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4dd0f1b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4dd0f1b3

Branch: refs/heads/master
Commit: 4dd0f1b3d84f67eb40bf671160b057be9bbdb921
Parents: 72072f6
Author: John Sherman <jf...@arcadiadata.com>
Authored: Thu Jun 1 18:49:53 2017 +0000
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Oct 5 02:26:01 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/network-perf-benchmark.cc |  5 ++-
 be/src/common/global-flags.cc               |  6 +--
 be/src/rpc/TAcceptQueueServer.cpp           |  7 ++--
 be/src/rpc/TAcceptQueueServer.h             | 28 ++++++++++---
 be/src/rpc/thrift-server-test.cc            | 48 ++++++++++++++++++++++
 be/src/rpc/thrift-server.cc                 | 46 ++++-----------------
 be/src/rpc/thrift-server.h                  | 52 ++++++++----------------
 be/src/service/impala-server.cc             |  4 +-
 be/src/transport/TSaslServerTransport.cpp   | 51 +++++++++++++++--------
 common/thrift/metrics.json                  | 20 +++++++++
 10 files changed, 157 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/benchmarks/network-perf-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/network-perf-benchmark.cc b/be/src/benchmarks/network-perf-benchmark.cc
index 1a0de24..251399e 100644
--- a/be/src/benchmarks/network-perf-benchmark.cc
+++ b/be/src/benchmarks/network-perf-benchmark.cc
@@ -27,6 +27,7 @@
 #include "gen-cpp/NetworkTest_types.h"
 #include "gen-cpp/NetworkTestService.h"
 
+#include "common/init.h"
 #include "common/logging.h"
 #include "util/cpu-info.h"
 #include "util/stopwatch.h"
@@ -203,7 +204,7 @@ bool ProcessCommand(const vector<string>& tokens) {
 
 int main(int argc, char** argv) {
   google::ParseCommandLineFlags(&argc, &argv, true);
-  CpuInfo::Init();
+  impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
 
   if (argc != 1) {
     // Just run client from command line args
@@ -223,7 +224,7 @@ int main(int argc, char** argv) {
   boost::shared_ptr<TProcessor> processor(new NetworkTestServiceProcessor(handler));
   ThriftServer* server;
   ABORT_IF_ERROR(ThriftServerBuilder("Network Test Server", processor, FLAGS_port)
-                     .thread_pool(100)
+                     .max_concurrent_connections(100)
                      .Build(&server));
   thread* server_thread = new thread(&TestServer::Server, handler.get(), server);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index e0a3384..1a8b027 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -153,11 +153,7 @@ DEFINE_int32(kudu_operation_timeout_ms, 3 * 60 * 1000, "Timeout (milliseconds) s
     "all Kudu operations. This must be a positive value, and there is no way to disable "
     "timeouts.");
 
-DEFINE_bool(enable_accept_queue_server, true,
-    "If true, uses a modified version of "
-    "TThreadedServer that accepts connections as quickly as possible and hands them off "
-    "to a thread pool to finish setup, reducing the chances that connections time out "
-    "waiting to be accepted.");
+DEFINE_bool_hidden(enable_accept_queue_server, true, "Deprecated");
 
 DEFINE_int64(inc_stats_size_limit_bytes, 200 * (1LL<<20), "Maximum size of "
     "incremental stats the catalog is allowed to serialize per table. "

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/TAcceptQueueServer.cpp
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 030d714..8a398a2 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -111,9 +111,7 @@ class TAcceptQueueServer::Task : public Runnable {
     {
       Synchronized s(server_.tasksMonitor_);
       server_.tasks_.erase(this);
-      if (server_.tasks_.empty()) {
-        server_.tasksMonitor_.notify();
-      }
+      server_.tasksMonitor_.notify();
     }
   }
 
@@ -167,6 +165,9 @@ void TAcceptQueueServer::SetupConnection(boost::shared_ptr<TTransport> client) {
     // Insert thread into the set of threads
     {
       Synchronized s(tasksMonitor_);
+      while (maxTasks_ > 0 && tasks_.size() >= maxTasks_) {
+        tasksMonitor_.wait();
+      }
       tasks_.insert(task);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/TAcceptQueueServer.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.h b/be/src/rpc/TAcceptQueueServer.h
index 3f5530a..61335f9 100644
--- a/be/src/rpc/TAcceptQueueServer.h
+++ b/be/src/rpc/TAcceptQueueServer.h
@@ -53,11 +53,13 @@ class TAcceptQueueServer : public TServer {
  public:
   class Task;
 
+  // TODO: Determine which c'tors are used and remove unused ones.
   template <typename ProcessorFactory>
   TAcceptQueueServer(const boost::shared_ptr<ProcessorFactory>& processorFactory,
       const boost::shared_ptr<TServerTransport>& serverTransport,
       const boost::shared_ptr<TTransportFactory>& transportFactory,
       const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+      int32_t maxTasks = 0,
       THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
 
   template <typename ProcessorFactory>
@@ -66,6 +68,7 @@ class TAcceptQueueServer : public TServer {
       const boost::shared_ptr<TTransportFactory>& transportFactory,
       const boost::shared_ptr<TProtocolFactory>& protocolFactory,
       const boost::shared_ptr<ThreadFactory>& threadFactory,
+      int32_t maxTasks = 0,
       THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
 
   template <typename Processor>
@@ -73,6 +76,7 @@ class TAcceptQueueServer : public TServer {
       const boost::shared_ptr<TServerTransport>& serverTransport,
       const boost::shared_ptr<TTransportFactory>& transportFactory,
       const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+      int32_t maxTasks = 0,
       THRIFT_OVERLOAD_IF(Processor, TProcessor));
 
   template <typename Processor>
@@ -81,6 +85,7 @@ class TAcceptQueueServer : public TServer {
       const boost::shared_ptr<TTransportFactory>& transportFactory,
       const boost::shared_ptr<TProtocolFactory>& protocolFactory,
       const boost::shared_ptr<ThreadFactory>& threadFactory,
+      int32_t maxTasks = 0,
       THRIFT_OVERLOAD_IF(Processor, TProcessor));
 
   virtual ~TAcceptQueueServer();
@@ -99,16 +104,21 @@ class TAcceptQueueServer : public TServer {
  protected:
   void init();
 
-  // New - this is the work function for the thread pool, which does the work of setting
-  // up the connection and starting a thread to handle it.
+  // This is the work function for the thread pool, which does the work of setting up the
+  // connection and starting a thread to handle it. Will block if there are currently
+  // maxTasks_ connections and maxTasks_ is non-zero.
   void SetupConnection(boost::shared_ptr<TTransport> client);
 
   boost::shared_ptr<ThreadFactory> threadFactory_;
   volatile bool stop_;
 
+  // Monitor protecting tasks_, notified on removal.
   Monitor tasksMonitor_;
   std::set<Task*> tasks_;
 
+  // The maximum number of running tasks allowed at a time.
+  const int32_t maxTasks_;
+
   /// New - True if metrics are enabled
   bool metrics_enabled_;
 
@@ -122,8 +132,10 @@ TAcceptQueueServer::TAcceptQueueServer(
     const boost::shared_ptr<TServerTransport>& serverTransport,
     const boost::shared_ptr<TTransportFactory>& transportFactory,
     const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+    int32_t maxTasks,
     THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
-  : TServer(processorFactory, serverTransport, transportFactory, protocolFactory) {
+  : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
+    maxTasks_(maxTasks) {
   init();
 }
 
@@ -134,9 +146,10 @@ TAcceptQueueServer::TAcceptQueueServer(
     const boost::shared_ptr<TTransportFactory>& transportFactory,
     const boost::shared_ptr<TProtocolFactory>& protocolFactory,
     const boost::shared_ptr<ThreadFactory>& threadFactory,
+    int32_t maxTasks,
     THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
   : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
-    threadFactory_(threadFactory) {
+    threadFactory_(threadFactory), maxTasks_(maxTasks) {
   init();
 }
 
@@ -145,8 +158,10 @@ TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<Processor>& proce
     const boost::shared_ptr<TServerTransport>& serverTransport,
     const boost::shared_ptr<TTransportFactory>& transportFactory,
     const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+    int32_t maxTasks,
     THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
-  : TServer(processor, serverTransport, transportFactory, protocolFactory) {
+  : TServer(processor, serverTransport, transportFactory, protocolFactory),
+    maxTasks_(maxTasks) {
   init();
 }
 
@@ -156,9 +171,10 @@ TAcceptQueueServer::TAcceptQueueServer(const boost::shared_ptr<Processor>& proce
     const boost::shared_ptr<TTransportFactory>& transportFactory,
     const boost::shared_ptr<TProtocolFactory>& protocolFactory,
     const boost::shared_ptr<ThreadFactory>& threadFactory,
+    int32_t maxTasks,
     THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
   : TServer(processor, serverTransport, transportFactory, protocolFactory),
-    threadFactory_(threadFactory) {
+    threadFactory_(threadFactory), maxTasks_(maxTasks) {
   init();
 }
 } // namespace server

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/thrift-server-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index ef50160..fbb00ef 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <atomic>
 #include <string>
 
 #include "gen-cpp/StatestoreService.h"
@@ -395,6 +396,53 @@ TEST(SslTest, OverlappingMatchedCiphers) {
       });
 }
 
+TEST(ConcurrencyTest, MaxConcurrentConnections) {
+  // Tests if max concurrent connections is being enforced by the ThriftServer
+  // implementation. It creates a ThriftServer with max_concurrent_connections set to 2
+  // and a ThreadPool of clients that attempt to connect concurrently and sleep for a
+  // small amount of time. The test fails if the number of concurrently connected clients
+  // exceeds the requested max_concurrent_connections limit. The test will also fail if
+  // the number of concurrently connected clients never reaches the limit of
+  // max_concurrent_connections.
+  int port = GetServerPort();
+  int max_connections = 2;
+  ThriftServer* server;
+  std::atomic<int> num_concurrent_connections{0};
+  std::atomic<bool> did_reach_max{false};
+  EXPECT_OK(ThriftServerBuilder("DummyStatestore", MakeProcessor(), port)
+      .max_concurrent_connections(max_connections)
+      .Build(&server));
+  EXPECT_OK(server->Start());
+
+  ThreadPool<int> pool("ConcurrentTest", "MaxConcurrentConnections", 10, 10,
+      [&num_concurrent_connections, &did_reach_max, max_connections, port](int tid,
+            const int& item) {
+        ThriftClient<StatestoreServiceClientWrapper> client("localhost", port, "",
+            nullptr, false);
+        EXPECT_OK(client.Open());
+        bool send_done = false;
+        TRegisterSubscriberResponse resp;
+        EXPECT_NO_THROW({
+            client.iface()->RegisterSubscriber(resp, TRegisterSubscriberRequest(),
+                &send_done);
+          });
+        int connection_count = ++num_concurrent_connections;
+        // Check that we have not exceeded the expected limit
+        EXPECT_TRUE(connection_count <= max_connections);
+        if (connection_count == max_connections) did_reach_max = true;
+        SleepForMs(100);
+        --num_concurrent_connections;
+  });
+  ASSERT_OK(pool.Init());
+
+  for (int i = 0; i < 10; ++i) pool.Offer(i);
+  pool.DrainAndShutdown();
+
+  // If we did not reach the maximum number of concurrent connections, the test was not
+  // effective.
+  EXPECT_TRUE(did_reach_max);
+}
+
 /// Test disabled because requires a high ulimit -n on build machines. Since the test does
 /// not always fail, we don't lose much coverage by disabling it until we fix the build
 /// infra issue.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index c385a66..5bf47b2 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -24,12 +24,9 @@
 #include <thrift/concurrency/Thread.h>
 #include <thrift/concurrency/ThreadManager.h>
 #include <thrift/protocol/TBinaryProtocol.h>
-#include <thrift/server/TThreadPoolServer.h>
-#include <thrift/server/TThreadedServer.h>
 #include <thrift/transport/TSocket.h>
 #include <thrift/transport/TSSLServerSocket.h>
 #include <thrift/transport/TSSLSocket.h>
-#include <thrift/server/TThreadPoolServer.h>
 #include <thrift/transport/TServerSocket.h>
 #include <gflags/gflags.h>
 
@@ -63,7 +60,6 @@ using namespace apache::thrift;
 DEFINE_int32_hidden(rpc_cnxn_attempts, 10, "Deprecated");
 DEFINE_int32_hidden(rpc_cnxn_retry_interval_ms, 2000, "Deprecated");
 
-DECLARE_bool(enable_accept_queue_server);
 DECLARE_string(principal);
 DECLARE_string(keytab_file);
 DECLARE_string(ssl_client_ca_certificate);
@@ -328,12 +324,11 @@ void ThriftServer::ThriftServerEventProcessor::deleteContext(void* serverContext
 
 ThriftServer::ThriftServer(const string& name,
     const boost::shared_ptr<TProcessor>& processor, int port, AuthProvider* auth_provider,
-    MetricGroup* metrics, int num_worker_threads, ServerType server_type)
+    MetricGroup* metrics, int max_concurrent_connections)
   : started_(false),
     port_(port),
     ssl_enabled_(false),
-    num_worker_threads_(num_worker_threads),
-    server_type_(server_type),
+    max_concurrent_connections_(max_concurrent_connections),
     name_(name),
     server_(NULL),
     processor_(processor),
@@ -451,37 +446,11 @@ Status ThriftServer::Start() {
   boost::shared_ptr<TTransportFactory> transport_factory;
   RETURN_IF_ERROR(CreateSocket(&server_socket));
   RETURN_IF_ERROR(auth_provider_->GetServerTransportFactory(&transport_factory));
-  switch (server_type_) {
-    case ThreadPool:
-      {
-        boost::shared_ptr<ThreadManager> thread_mgr(
-            ThreadManager::newSimpleThreadManager(num_worker_threads_));
-        thread_mgr->threadFactory(thread_factory);
-        thread_mgr->start();
-        server_.reset(new TThreadPoolServer(processor_, server_socket,
-                transport_factory, protocol_factory, thread_mgr));
-      }
-      break;
-    case Threaded:
-      if (FLAGS_enable_accept_queue_server) {
-        server_.reset(new TAcceptQueueServer(processor_, server_socket, transport_factory,
-            protocol_factory, thread_factory));
-        if (metrics_ != NULL) {
-          stringstream key_prefix_ss;
-          key_prefix_ss << "impala.thrift-server." << name_;
-          (static_cast<TAcceptQueueServer*>(server_.get()))
-              ->InitMetrics(metrics_, key_prefix_ss.str());
-        }
-      } else {
-        server_.reset(new TThreadedServer(processor_, server_socket, transport_factory,
-            protocol_factory, thread_factory));
-      }
-      break;
-    default:
-      stringstream error_msg;
-      error_msg << "Unsupported server type: " << server_type_;
-      LOG(ERROR) << error_msg.str();
-      return Status(error_msg.str());
+  server_.reset(new TAcceptQueueServer(processor_, server_socket, transport_factory,
+        protocol_factory, thread_factory, max_concurrent_connections_));
+  if (metrics_ != NULL) {
+    (static_cast<TAcceptQueueServer*>(server_.get()))->InitMetrics(metrics_,
+        Substitute("impala.thrift-server.$0", name_));
   }
   boost::shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor(
       new ThriftServer::ThriftServerEventProcessor(this));
@@ -504,7 +473,6 @@ void ThriftServer::Join() {
 void ThriftServer::StopForTesting() {
   DCHECK(server_thread_ != NULL);
   DCHECK(server_);
-  DCHECK_EQ(server_type_, Threaded);
   server_->stop();
   if (started_) Join();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/thrift-server.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index f889a4e..588904f 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -35,14 +35,12 @@ namespace impala {
 
 class AuthProvider;
 
-/// Utility class for all Thrift servers. Runs a threaded server by default, or a
-/// TThreadPoolServer with, by default, 2 worker threads, that exposes the interface
+/// Utility class for all Thrift servers. Runs a TAcceptQueueServer server with, by
+/// default, no enforced concurrent connection limit, that exposes the interface
 /// described by a user-supplied TProcessor object.
 ///
 /// Use a ThriftServerBuilder to construct a ThriftServer. ThriftServer's c'tors are
 /// private.
-///
-/// If TThreadPoolServer is used, client must use TSocket as transport.
 /// TODO: shutdown is buggy (which only harms tests)
 class ThriftServer {
  public:
@@ -91,14 +89,6 @@ class ThriftServer {
     virtual ~ConnectionHandlerIf() = default;
   };
 
-  static const int DEFAULT_WORKER_THREADS = 2;
-
-  /// There are 2 servers supported by Thrift with different threading models.
-  /// ThreadPool  -- Allocates a fixed number of threads. A thread is used by a
-  ///                connection until it closes.
-  /// Threaded    -- Allocates 1 thread per connection, as needed.
-  enum ServerType { ThreadPool = 0, Threaded };
-
   int port() const { return port_; }
 
   bool ssl_enabled() const { return ssl_enabled_; }
@@ -106,8 +96,7 @@ class ThriftServer {
   /// Blocks until the server stops and exits its main thread.
   void Join();
 
-  /// FOR TESTING ONLY; stop the server and block until the server is stopped; use it
-  /// only if it is a Threaded server.
+  /// FOR TESTING ONLY; stop the server and block until the server is stopped
   void StopForTesting();
 
   /// Starts the main server thread. Once this call returns, clients
@@ -151,12 +140,12 @@ class ThriftServer {
   ///  - auth_provider: Authentication scheme to use. If nullptr, use the global default
   ///    demon<->demon provider.
   ///  - metrics: if not nullptr, the server will register metrics on this object
-  ///  - num_worker_threads: the number of worker threads to use in any thread pool
-  ///  - server_type: the type of IO strategy this server should employ
+  ///  - max_concurrent_connections: The maximum number of concurrent connections allowed.
+  ///    If 0, there will be no enforced limit on the number of concurrent connections.
   ThriftServer(const std::string& name,
       const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port,
       AuthProvider* auth_provider = nullptr, MetricGroup* metrics = nullptr,
-      int num_worker_threads = DEFAULT_WORKER_THREADS, ServerType server_type = Threaded);
+      int max_concurrent_connections = 0);
 
   /// Enables secure access over SSL. Must be called before Start(). The first three
   /// arguments are the minimum SSL/TLS version, and paths to certificate and private key
@@ -198,12 +187,10 @@ class ThriftServer {
   /// The SSL/TLS protocol client versions that this server will allow to connect.
   apache::thrift::transport::SSLProtocol version_;
 
-  /// How many worker threads to use to serve incoming requests
-  /// (requests are queued if no thread is immediately available)
-  int num_worker_threads_;
-
-  /// ThreadPool or Threaded server
-  ServerType server_type_;
+  /// Maximum number of concurrent connections (connections will block until fewer than
+  /// max_concurrent_connections_ are concurrently active). If 0, there is no enforced
+  /// limit.
+  int max_concurrent_connections_;
 
   /// User-specified identifier that shows up in logs
   const std::string name_;
@@ -271,16 +258,10 @@ class ThriftServerBuilder {
     return *this;
   }
 
-  /// Make this server a thread-pool server with 'num_worker_threads' threads.
-  ThriftServerBuilder& thread_pool(int num_worker_threads) {
-    server_type_ = ThriftServer::ServerType::ThreadPool;
-    num_worker_threads_ = num_worker_threads;
-    return *this;
-  }
-
-  /// Make this server a threaded server (i.e. one thread per connection).
-  ThriftServerBuilder& threaded() {
-    server_type_ = ThriftServer::ServerType::Threaded;
+  /// Sets the maximum concurrent thread count for this server. Default is 0, which means
+  /// there is no enforced limit.
+  ThriftServerBuilder& max_concurrent_connections(int max_concurrent_connections) {
+    max_concurrent_connections_ = max_concurrent_connections;
     return *this;
   }
 
@@ -319,7 +300,7 @@ class ThriftServerBuilder {
   /// '*server'.
   Status Build(ThriftServer** server) {
     std::unique_ptr<ThriftServer> ptr(new ThriftServer(name_, processor_, port_,
-        auth_provider_, metrics_, num_worker_threads_, server_type_));
+        auth_provider_, metrics_, max_concurrent_connections_));
     if (enable_ssl_) {
       RETURN_IF_ERROR(ptr->EnableSsl(
           version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
@@ -329,8 +310,7 @@ class ThriftServerBuilder {
   }
 
  private:
-  ThriftServer::ServerType server_type_ = ThriftServer::ServerType::Threaded;
-  int num_worker_threads_ = ThriftServer::DEFAULT_WORKER_THREADS;
+  int max_concurrent_connections_ = 0;
   std::string name_;
   boost::shared_ptr<apache::thrift::TProcessor> processor_;
   int port_ = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index ac5b3a9..6ce20e9 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1993,7 +1993,7 @@ Status ImpalaServer::Init(int32_t thrift_be_port, int32_t beeswax_port, int32_t
     RETURN_IF_ERROR(
         builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
             .metrics(exec_env_->metrics())
-            .thread_pool(FLAGS_fe_service_threads)
+            .max_concurrent_connections(FLAGS_fe_service_threads)
             .Build(&server));
     beeswax_server_.reset(server);
     beeswax_server_->SetConnectionHandler(this);
@@ -2020,7 +2020,7 @@ Status ImpalaServer::Init(int32_t thrift_be_port, int32_t beeswax_port, int32_t
     RETURN_IF_ERROR(
         builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
             .metrics(exec_env_->metrics())
-            .thread_pool(FLAGS_fe_service_threads)
+            .max_concurrent_connections(FLAGS_fe_service_threads)
             .Build(&server));
     hs2_server_.reset(server);
     hs2_server_->SetConnectionHandler(this);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/transport/TSaslServerTransport.cpp
----------------------------------------------------------------------
diff --git a/be/src/transport/TSaslServerTransport.cpp b/be/src/transport/TSaslServerTransport.cpp
index a8000b1..15d548e 100644
--- a/be/src/transport/TSaslServerTransport.cpp
+++ b/be/src/transport/TSaslServerTransport.cpp
@@ -28,6 +28,7 @@
 #include <boost/thread/thread.hpp>
 
 #include <thrift/transport/TBufferTransports.h>
+#include <thrift/transport/TSocket.h>
 #include "rpc/thrift-server.h"
 #include "transport/TSaslTransport.h"
 #include "transport/TSaslServerTransport.h"
@@ -36,6 +37,9 @@
 
 #include "common/names.h"
 
+DEFINE_int32(sasl_connect_tcp_timeout_ms, 300000, "(Advanced) The underlying TSocket "
+    "send/recv timeout in milliseconds for the initial SASL handeshake.");
+
 using namespace sasl;
 
 namespace apache { namespace thrift { namespace transport {
@@ -126,7 +130,6 @@ void TSaslServerTransport::handleSaslStartMessage() {
 
 boost::shared_ptr<TTransport> TSaslServerTransport::Factory::getTransport(
     boost::shared_ptr<TTransport> trans) {
-  lock_guard<mutex> l(transportMap_mutex_);
   // Thrift servers use both an input and an output transport to communicate with
   // clients. In principal, these can be different, but for SASL clients we require them
   // to be the same so that the authentication state is identical for communication in
@@ -138,29 +141,43 @@ boost::shared_ptr<TTransport> TSaslServerTransport::Factory::getTransport(
   // However, the cache map would retain references to all the transports it ever
   // created. Instead, we remove an entry in the map after it has been found for the first
   // time, that is, after the second call to getTransport() with the same argument. That
-  // matches the calling pattern in TThreadedServer and TThreadPoolServer, which both call
-  // getTransport() twice in succession when a connection is established, and then never
-  // again. This is obviously brittle (what if for some reason getTransport() is called a
-  // third time?) but for our usage of Thrift it's a tolerable band-aid.
+  // matches the calling pattern in TAcceptQueueServer which calls getTransport() twice in
+  // succession when a connection is established, and then never again. This is obviously
+  // brittle (what if for some reason getTransport() is called a third time?) but for our
+  // usage of Thrift it's a tolerable band-aid.
   //
   // An alternative approach is to use the 'custom deleter' feature of shared_ptr to
   // ensure that when ret_transport is eventually deleted, its corresponding map entry is
   // removed. That is likely to be error prone given the locking involved; for now we go
   // with the simple solution.
-  TransportMap::iterator trans_map = transportMap_.find(trans);
-  VLOG_EVERY_N(2, 100) << "getTransport(): transportMap_ size is: "
-                       << transportMap_.size();
   boost::shared_ptr<TBufferedTransport> ret_transport;
-  if (trans_map == transportMap_.end()) {
-    boost::shared_ptr<TTransport> wrapped(
-        new TSaslServerTransport(serverDefinitionMap_, trans));
-    ret_transport.reset(new TBufferedTransport(wrapped,
-            impala::ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES));
-    ret_transport.get()->open();
+  {
+    lock_guard<mutex> l(transportMap_mutex_);
+    TransportMap::iterator trans_map = transportMap_.find(trans);
+    if (trans_map != transportMap_.end()) {
+      ret_transport = trans_map->second;
+      transportMap_.erase(trans_map);
+      return ret_transport;
+    }
+    // This method should never be called concurrently with the same 'trans' object.
+    // Therefore, it is safe to drop the transportMap_mutex_ here.
+  }
+  boost::shared_ptr<TTransport> wrapped(
+      new TSaslServerTransport(serverDefinitionMap_, trans));
+  // Set socket timeouts to prevent TSaslServerTransport->open from blocking the server
+  // from accepting new connections if a read/write blocks during the handshake
+  TSocket* socket = static_cast<TSocket*>(trans.get());
+  socket->setRecvTimeout(FLAGS_sasl_connect_tcp_timeout_ms);
+  socket->setSendTimeout(FLAGS_sasl_connect_tcp_timeout_ms);
+  ret_transport.reset(new TBufferedTransport(wrapped,
+        impala::ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES));
+  ret_transport.get()->open();
+  // Reset socket timeout back to zero, so idle clients do not timeout
+  socket->setRecvTimeout(0);
+  socket->setSendTimeout(0);
+  {
+    lock_guard<mutex> l(transportMap_mutex_);
     transportMap_[trans] = ret_transport;
-  } else {
-    ret_transport = trans_map->second;
-    transportMap_.erase(trans_map);
   }
   return ret_transport;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 4ba94be..4e67eae 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -702,6 +702,16 @@
     "key": "impala.thrift-server.beeswax-frontend.total-connections"
   },
   {
+    "description": "The number of Beeswax API connections to this Impala Daemon that have been accepted and are waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Beeswax API Connections Queued for Setup",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.beeswax-frontend.connection-setup-queue-size"
+  },
+  {
     "description": "The number of active HiveServer2 API connections to this Impala Daemon.",
     "contexts": [
       "IMPALAD"
@@ -722,6 +732,16 @@
     "key": "impala.thrift-server.hiveserver2-frontend.total-connections"
   },
   {
+    "description": "The number of HiveServer2 API connections to this Impala Daemon that have been accepted and are waiting to be setup.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 API Connections Queued for Setup",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "impala.thrift-server.hiveserver2-frontend.connection-setup-queue-size"
+  },
+  {
     "description": "The amount of memory freed by the last memory tracker garbage collection.",
     "contexts": [
       "IMPALAD"


[3/3] incubator-impala git commit: IMPALA-5908: Allow SET to unset modified query options.

Posted by sa...@apache.org.
IMPALA-5908: Allow SET to unset modified query options.

The query 'SET <option>=""' will now unset an option within the session,
reverting it to its default state.

This change became necessary when "SET" started returning an empty
string for unset options which don't have a default. The test
infrastructure (impala_test_suite.py) resets options to what it thinks
is its defaults, and, when this broke, some ASAN builds started to fail,
presumably due to a timing issue with how we re-use connections between
tests.

Previously, SessionState copied over the default options from the server
when the session was created and then mutated that. To support unsetting
options at the session layer, this change keeps a pointer to the default
server settings, keeps separately the mutations, and overlays the
options each time they're requested. Similarly, for configuration
overlays that happen per-query, the overlay is now done explicitly,
because empty per-query overlay values (key=..., value="") now have no effect.

Because "set key=''" is ambiguous between "set to the empty string" and
"unset", it's now impossible to set to the empty string, at the session
layer, an option that is configured at a previous layer. In practice,
this is just debug_action and request_pool. debug_action is essentially
an internal tool. For request_pool, this means that setting the default
request_pool via impalad command line is now a bad idea, as it can't
be cleared at a per-session level. For request_pool, the correct
course of action for users is to use placement rules, and to have a
default placement rule.

Testing:
* Added a simple test that triggered this side-effect without this code.
  Specifically, "impala-python infra/python/env/bin/py.test tests/metadata/test_set.py -s"
  with the modified set.test triggers.
* Amended tests/custom_cluster/test_admission_controller.py; it was
  useful for testing these code paths.
* Added cases to query-options-test to check behavior for both
  defaulted and non-defaulted values.
* Added a custom cluster test that checks that overlays are
  working against
* Ran an ASAN build where this was triggering previously.

Change-Id: Ia8c383e68064f839cb5000118901dff77b4e5cb9
Reviewed-on: http://gerrit.cloudera.org:8080/8070
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/c9740b43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/c9740b43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/c9740b43

Branch: refs/heads/master
Commit: c9740b43d1493a8249ad7497430e5bfbcc6ebf64
Parents: 226c99e
Author: Philip Zeyliger <ph...@cloudera.com>
Authored: Mon Sep 11 15:50:16 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Oct 5 03:04:38 2017 +0000

----------------------------------------------------------------------
 be/src/service/client-request-state.cc          |  4 +-
 be/src/service/impala-beeswax-server.cc         |  9 +-
 be/src/service/impala-hs2-server.cc             | 14 ++-
 be/src/service/impala-server.cc                 |  8 +-
 be/src/service/impala-server.h                  | 16 +++-
 be/src/service/query-options-test.cc            | 67 +++++++++++++
 be/src/service/query-options.cc                 | 25 ++++-
 be/src/service/query-options.h                  |  2 +-
 common/thrift/ImpalaInternalService.thrift      | 19 +++-
 .../functional-query/queries/QueryTest/set.test |  6 ++
 tests/common/impala_test_suite.py               |  2 +-
 .../custom_cluster/test_admission_controller.py | 18 +++-
 tests/custom_cluster/test_set_and_unset.py      | 98 ++++++++++++++++++++
 tests/hs2/hs2_test_suite.py                     |  4 +-
 14 files changed, 266 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index ef6a69d..cb542d0 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -199,14 +199,14 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) {
         RETURN_IF_ERROR(SetQueryOption(
             exec_request_.set_query_option_request.key,
             exec_request_.set_query_option_request.value,
-            &session_->default_query_options,
+            &session_->set_query_options,
             &session_->set_query_options_mask));
         SetResultSet({}, {});
       } else {
         // "SET" returns a table of all query options.
         map<string, string> config;
         TQueryOptionsToMap(
-            session_->default_query_options, &config);
+            session_->QueryOptions(), &config);
         vector<string> keys, values;
         map<string, string>::const_iterator itr = config.begin();
         for (; itr != config.end(); ++itr) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index bcf76b6..eb011a3 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -436,7 +436,7 @@ Status ImpalaServer::QueryToTQueryContext(const Query& query,
       // set yet, set it now.
       lock_guard<mutex> l(session->lock);
       if (session->connected_user.empty()) session->connected_user = query.hadoop_user;
-      query_ctx->client_request.query_options = session->default_query_options;
+      query_ctx->client_request.query_options = session->QueryOptions();
       set_query_options_mask = session->set_query_options_mask;
     }
     session->ToThrift(session_id, &query_ctx->session);
@@ -444,10 +444,13 @@ Status ImpalaServer::QueryToTQueryContext(const Query& query,
 
   // Override default query options with Query.Configuration
   if (query.__isset.configuration) {
+    TQueryOptions overlay;
+    QueryOptionsMask overlay_mask;
     for (const string& option: query.configuration) {
-      RETURN_IF_ERROR(ParseQueryOptions(option, &query_ctx->client_request.query_options,
-          &set_query_options_mask));
+      RETURN_IF_ERROR(ParseQueryOptions(option, &overlay, &overlay_mask));
     }
+    OverlayQueryOptions(overlay, overlay_mask, &query_ctx->client_request.query_options);
+    set_query_options_mask |= overlay_mask;
   }
 
   // Only query options not set in the session or confOverlay can be overridden by the

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index da8d606..20608b7 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -241,11 +241,13 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext(
     RETURN_IF_ERROR(GetSessionState(session_id, &session_state));
     session_state->ToThrift(session_id, &query_ctx->session);
     lock_guard<mutex> l(session_state->lock);
-    query_ctx->client_request.query_options = session_state->default_query_options;
+    query_ctx->client_request.query_options = session_state->QueryOptions();
     set_query_options_mask = session_state->set_query_options_mask;
   }
 
   if (execute_request.__isset.confOverlay) {
+    TQueryOptions overlay;
+    QueryOptionsMask overlay_mask;
     map<string, string>::const_iterator conf_itr = execute_request.confOverlay.begin();
     for (; conf_itr != execute_request.confOverlay.end(); ++conf_itr) {
       if (conf_itr->first == IMPALA_RESULT_CACHING_OPT) continue;
@@ -256,8 +258,10 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext(
         continue;
       }
       RETURN_IF_ERROR(SetQueryOption(conf_itr->first, conf_itr->second,
-          &query_ctx->client_request.query_options, &set_query_options_mask));
+          &overlay, &overlay_mask));
     }
+    OverlayQueryOptions(overlay, overlay_mask, &query_ctx->client_request.query_options);
+    set_query_options_mask |= overlay_mask;
   }
   // Only query options not set in the session or confOverlay can be overridden by the
   // pool options.
@@ -313,7 +317,7 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
   // Process the supplied configuration map.
   state->database = "default";
   state->session_timeout = FLAGS_idle_session_timeout;
-  state->default_query_options = default_query_options_;
+  state->server_default_query_options = &default_query_options_;
   if (request.__isset.configuration) {
     typedef map<string, string> ConfigurationMap;
     for (const ConfigurationMap::value_type& v: request.configuration) {
@@ -340,13 +344,13 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
       } else {
         // Normal configuration key. Use it to set session default query options.
         // Ignore failure (failures will be logged in SetQueryOption()).
-        discard_result(SetQueryOption(v.first, v.second, &state->default_query_options,
+        discard_result(SetQueryOption(v.first, v.second, &state->set_query_options,
             &state->set_query_options_mask));
       }
     }
   }
   RegisterSessionTimeout(state->session_timeout);
-  TQueryOptionsToMap(state->default_query_options, &return_val.configuration);
+  TQueryOptionsToMap(state->QueryOptions(), &return_val.configuration);
 
   // OpenSession() should return the coordinator's HTTP server address.
   const string& http_addr = lexical_cast<string>(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 6ce20e9..d9d2629 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1241,6 +1241,12 @@ void ImpalaServer::SessionState::ToThrift(const TUniqueId& session_id,
   state->__set_kudu_latest_observed_ts(kudu_latest_observed_ts);
 }
 
+TQueryOptions ImpalaServer::SessionState::QueryOptions() {
+  TQueryOptions ret = *server_default_query_options;
+  OverlayQueryOptions(set_query_options, set_query_options_mask, &ret);
+  return ret;
+}
+
 void ImpalaServer::CancelFromThreadPool(uint32_t thread_id,
     const CancellationWork& cancellation_work) {
   if (cancellation_work.unregister()) {
@@ -1730,7 +1736,7 @@ void ImpalaServer::ConnectionStart(
     session_state->session_timeout = FLAGS_idle_session_timeout;
     session_state->session_type = TSessionType::BEESWAX;
     session_state->network_address = connection_context.network_address;
-    session_state->default_query_options = default_query_options_;
+    session_state->server_default_query_options = &default_query_options_;
     session_state->kudu_latest_observed_ts = 0;
 
     // If the username was set by a lower-level transport, use it.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index baca128..dbd2877 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -360,13 +360,17 @@ class ImpalaServer : public ImpalaServiceIf,
     /// The default database (changed as a result of 'use' query execution).
     std::string database;
 
-    /// The default query options of this session. When the session is created, the
-    /// session inherits the global defaults from ImpalaServer::default_query_options_.
-    TQueryOptions default_query_options;
+    /// Reference to the ImpalaServer's query options
+    TQueryOptions* server_default_query_options;
 
-    /// BitSet indicating which query options in default_query_options have been
+    /// Query options that have been explicitly set in this session.
+    TQueryOptions set_query_options;
+
+    /// BitSet indicating which query options in set_query_options have been
     /// explicitly set in the session. Updated when a query option is specified using a
     /// SET command: the bit corresponding to the TImpalaQueryOptions enum is set.
+    /// If the option is subsequently reset via a SET with an empty value, the bit
+    /// is cleared.
     QueryOptionsMask set_query_options_mask;
 
     /// For HS2 only, the protocol version this session is expecting.
@@ -399,6 +403,10 @@ class ImpalaServer : public ImpalaServiceIf,
     /// Builds a Thrift representation of this SessionState for serialisation to
     /// the frontend.
     void ToThrift(const TUniqueId& session_id, TSessionState* session_state);
+
+    /// Builds the overlay of the default server query options and the options
+    /// explicitly set in this session.
+    TQueryOptions QueryOptions();
   };
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 1bec770..75b1567 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -147,4 +147,71 @@ TEST(QueryOptions, MapOptionalDefaultlessToEmptyString) {
   EXPECT_EQ(map["EXPLAIN_LEVEL"], "1");
 }
 
+/// Overlay a with b. batch_size is set in both places.
+/// num_nodes is set in a and is missing from the bit
+/// mask. mt_dop is only set in b.
+TEST(QueryOptions, TestOverlay) {
+  TQueryOptions a;
+  TQueryOptions b;
+  QueryOptionsMask mask;
+
+  // overlay
+  a.__set_batch_size(1);
+  b.__set_batch_size(2);
+  mask.set(TImpalaQueryOptions::BATCH_SIZE);
+
+  // no overlay
+  a.__set_num_nodes(3);
+
+  // missing mask on overlay
+  b.__set_debug_action("ignored"); // no mask set
+
+  // overlay; no original value
+  b.__set_mt_dop(4);
+  mask.set(TImpalaQueryOptions::MT_DOP);
+
+  TQueryOptions dst = a;
+  OverlayQueryOptions(b, mask, &dst);
+
+  EXPECT_EQ(2, dst.batch_size);
+  EXPECT_TRUE(dst.__isset.batch_size);
+  EXPECT_EQ(3, dst.num_nodes);
+  EXPECT_EQ(a.debug_action, dst.debug_action);
+  EXPECT_EQ(4, dst.mt_dop);
+  EXPECT_TRUE(dst.__isset.mt_dop);
+}
+
+TEST(QueryOptions, ResetToDefaultViaEmptyString) {
+  // MT_DOP has no default; resetting should do nothing.
+  {
+    TQueryOptions options;
+    EXPECT_TRUE(SetQueryOption("MT_DOP", "", &options, NULL).ok());
+    EXPECT_FALSE(options.__isset.mt_dop);
+  }
+
+  // Set and then reset; check mask too.
+  {
+    TQueryOptions options;
+    QueryOptionsMask mask;
+
+    EXPECT_FALSE(options.__isset.mt_dop);
+    EXPECT_TRUE(SetQueryOption("MT_DOP", "3", &options, &mask).ok());
+    EXPECT_TRUE(mask[TImpalaQueryOptions::MT_DOP]);
+    EXPECT_TRUE(SetQueryOption("MT_DOP", "", &options, &mask).ok());
+    EXPECT_FALSE(options.__isset.mt_dop);
+    // After reset, mask should be clear.
+    EXPECT_FALSE(mask[TImpalaQueryOptions::MT_DOP]);
+  }
+
+  // Reset should reset to defaults for something that has set defaults.
+  {
+    TQueryOptions options;
+
+    EXPECT_TRUE(SetQueryOption("EXPLAIN_LEVEL", "3", &options, NULL).ok());
+    EXPECT_TRUE(SetQueryOption("EXPLAIN_LEVEL", "", &options, NULL).ok());
+    EXPECT_TRUE(options.__isset.explain_level);
+    EXPECT_EQ(options.explain_level, 1);
+  }
+}
+
 IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index e950d31..d4f1f73 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -58,7 +58,7 @@ void impala::OverlayQueryOptions(const TQueryOptions& src, const QueryOptionsMas
   DCHECK_GT(mask.size(), _TImpalaQueryOptions_VALUES_TO_NAMES.size()) <<
       "Size of QueryOptionsMask must be increased.";
 #define QUERY_OPT_FN(NAME, ENUM)\
-  if (src.__isset.NAME && mask[TImpalaQueryOptions::ENUM]) dst->NAME = src.NAME;
+  if (src.__isset.NAME && mask[TImpalaQueryOptions::ENUM]) dst->__set_##NAME(src.NAME);
   QUERY_OPTS_TABLE
 #undef QUERY_OPT_FN
 }
@@ -79,6 +79,20 @@ void impala::TQueryOptionsToMap(const TQueryOptions& query_options,
 #undef QUERY_OPT_FN
 }
 
+// Resets query_options->option to its default value.
+static void ResetQueryOption(const int option, TQueryOptions* query_options) {
+  const static TQueryOptions defaults;
+  switch (option) {
+#define QUERY_OPT_FN(NAME, ENUM)\
+    case TImpalaQueryOptions::ENUM:\
+      query_options->__isset.NAME = defaults.__isset.NAME;\
+      query_options->NAME = defaults.NAME;\
+      break;
+  QUERY_OPTS_TABLE
+#undef QUERY_OPT_FN
+  }
+}
+
 string impala::DebugQueryOptions(const TQueryOptions& query_options) {
   const static TQueryOptions defaults;
   int i = 0;
@@ -96,7 +110,7 @@ string impala::DebugQueryOptions(const TQueryOptions& query_options) {
 
 // Returns the TImpalaQueryOptions enum for the given "key". Input is case insensitive.
 // Return -1 if the input is an invalid option.
-int GetQueryOptionForKey(const string& key) {
+static int GetQueryOptionForKey(const string& key) {
   map<int, const char*>::const_iterator itr =
       _TImpalaQueryOptions_VALUES_TO_NAMES.begin();
   for (; itr != _TImpalaQueryOptions_VALUES_TO_NAMES.end(); ++itr) {
@@ -115,6 +129,12 @@ Status impala::SetQueryOption(const string& key, const string& value,
   int option = GetQueryOptionForKey(key);
   if (option < 0) {
     return Status(Substitute("Invalid query option: $0", key));
+  } else if (value == "") {
+    ResetQueryOption(option, query_options);
+    if (set_query_options_mask != nullptr) {
+      DCHECK_LT(option, set_query_options_mask->size());
+      set_query_options_mask->reset(option);
+    }
   } else {
     switch (option) {
       case TImpalaQueryOptions::ABORT_ON_ERROR:
@@ -176,7 +196,6 @@ Status impala::SetQueryOption(const string& key, const string& value,
         break;
       }
       case TImpalaQueryOptions::COMPRESSION_CODEC: {
-        if (value.empty()) break;
         if (iequals(value, "none")) {
           query_options->__set_compression_codec(THdfsCompression::NONE);
         } else if (iequals(value, "gzip")) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 3dada7d..5ca8c5f 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -124,7 +124,7 @@ void OverlayQueryOptions(const TQueryOptions& src, const QueryOptionsMask& mask,
 
 /// Set the key/value pair in TQueryOptions. It will override existing setting in
 /// query_options. The bit corresponding to query option 'key' in set_query_options_mask
-/// is set.
+/// is set. An empty string value will reset the key to its default value.
 Status SetQueryOption(const std::string& key, const std::string& value,
     TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index db6ffbb..cda5083 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -72,9 +72,22 @@ enum TJoinDistributionMode {
 //
 // (1) and (2) are set by administrators and provide the default query options for a
 // session, in that order, so options set in (2) override those in (1). The user
-// can specify query options with (3) to override the defaults, which are stored in the
-// SessionState. Finally, the client can pass a config 'overlay' (4) in the request
-// metadata which overrides everything else.
+// can specify query options with (3) to override the preceding layers; these
+// overrides are stored in SessionState. Finally, the client can pass a config
+// 'overlay' (4) in the request metadata which overrides everything else.
+//
+// Session options (level 3, above) can be set by the user with SET <key>=<value>
+// or in the OpenSession RPC. They can be unset with SET <key>="". When unset,
+// it's unset in that level, and the values as specified by the defaults,
+// and levels 1 and 2 above take hold.
+//
+// Because of the ambiguity between null and the empty string here, string-typed
+// options where the empty string is a valid value can cause problems as follows:
+// * If their default is not the empty string, a user can't set it to the
+//   empty string with SET.
+// * Even if their default is the empty string, they may be set to something
+//   else via process defaults or resource pool defaults, and the user
+//   may not be able to override them back to the empty string.
 struct TQueryOptions {
   1: optional bool abort_on_error = 0
   2: optional i32 max_errors = 100

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/testdata/workloads/functional-query/queries/QueryTest/set.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test b/testdata/workloads/functional-query/queries/QueryTest/set.test
index 45c8343..364a01e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -1,11 +1,17 @@
 ====
 ---- QUERY
+# Set an option explicitly; the test infrastructure should clear it before the next test.
+# The next test tests that buffer_pool_limit is unset ("").
+set buffer_pool_limit=7;
+====
+---- QUERY
 set
 ---- RESULTS: VERIFY_IS_SUBSET
 'ABORT_ON_DEFAULT_LIMIT_EXCEEDED','0'
 'ABORT_ON_ERROR','0'
 'ALLOW_UNSUPPORTED_FORMATS','0'
 'BATCH_SIZE','0'
+'BUFFER_POOL_LIMIT',''
 'DEBUG_ACTION',''
 'DEFAULT_ORDER_BY_LIMIT','-1'
 'DISABLE_CACHED_READS','0'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/tests/common/impala_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 0732695..6c23aa8 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -211,7 +211,7 @@ class ImpalaTestSuite(BaseTestSuite):
       if not query_option in self.default_query_options:
         continue
       default_val = self.default_query_options[query_option]
-      query_str = 'SET '+ query_option + '="' + default_val + '"'
+      query_str = 'SET ' + query_option + '="' + default_val + '"'
       try:
         impalad_client.execute(query_str)
       except Exception as e:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index de97e7c..712ce57 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -156,8 +156,10 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
     assert len(confs) == len(expected_query_options)
     confs = map(str.lower, confs)
     for expected in expected_query_options:
-      assert expected.lower() in confs,\
-          "Expected query options '%s' to be set" % (",".join(expected_query_options))
+      if expected.lower() not in confs:
+        expected = ",".join(sorted(expected_query_options))
+        actual = ",".join(sorted(confs))
+        assert False, "Expected query options %s, got %s." % (expected, actual)
 
   def __check_hs2_query_opts(self, pool_name, mem_limit=None, expected_options=None):
     """ Submits a query via HS2 (optionally with a mem_limit in the confOverlay)
@@ -254,6 +256,18 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       self.__check_query_options(result.runtime_profile,\
           ['MEM_LIMIT=12345', 'QUERY_TIMEOUT_S=5', 'REQUEST_POOL=root.queueA',\
            'ABORT_ON_ERROR=1', 'MAX_IO_BUFFERS=100'])
+
+      # Once options are reset to their defaults, the queue
+      # configuration should kick back in. We'll see the
+      # queue-configured mem_limit, and we won't see
+      # abort on error, because it's back to being the default.
+      client.execute('set mem_limit=""')
+      client.execute('set abort_on_error=""')
+      client.set_configuration({ 'request_pool': 'root.queueA' })
+      result = client.execute("select 1")
+      self.__check_query_options(result.runtime_profile,
+            [queueA_mem_limit, 'REQUEST_POOL=root.queueA', 'QUERY_TIMEOUT_S=5'])
+
     finally:
       client.close()
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/tests/custom_cluster/test_set_and_unset.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_set_and_unset.py b/tests/custom_cluster/test_set_and_unset.py
new file mode 100644
index 0000000..3347ca7
--- /dev/null
+++ b/tests/custom_cluster/test_set_and_unset.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
+from TCLIService import TCLIService
+from ImpalaService import ImpalaHiveServer2Service
+
+class TestSetAndUnset(CustomClusterTestSuite, HS2TestSuite):
+  """
+  Test behavior of SET and UNSET within a session, and how
+  SET/UNSET override options configured at the impalad level.
+  """
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args="--default_query_options=debug_action=custom")
+  @needs_session(TCLIService.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6)
+  def test_set_and_unset(self):
+    """
+    Starts Impala cluster with a custom query option, and checks that option
+    overlaying works correctly.
+
+    The Beeswax API and the HiveServer2 implementations are slightly different,
+    so the same test is run in both contexts.
+    """
+    # Beeswax API:
+    result = self.execute_query_expect_success(self.client, "set")
+    assert "DEBUG_ACTION\tcustom" in result.data, "baseline"
+    self.execute_query_expect_success(self.client, "set debug_action=hey")
+    assert "DEBUG_ACTION\they" in \
+        self.execute_query_expect_success(self.client, "set").data, "session override"
+    self.execute_query_expect_success(self.client, 'set debug_action=""')
+    assert "DEBUG_ACTION\tcustom" in \
+        self.execute_query_expect_success(self.client, "set").data, "reset"
+    self.execute_query_expect_success(self.client, 'set batch_size=123')
+    # Use a "request overlay" to change the option for a specific
+    # request within a session. We run a real query and check its
+    # runtime profile, as SET shows session options without applying
+    # the request overlays to them.
+    assert "BATCH_SIZE=100" in self.execute_query_expect_success(self.client, 'select 1',
+            query_options=dict(batch_size="100")).runtime_profile, "request overlay"
+
+    # Overlaying an empty string (unset) has no effect; the session option
+    # takes hold and the "request overlay" is considered blank.
+    assert "BATCH_SIZE=123" in self.execute_query_expect_success(self.client, 'select 1',
+            query_options=dict(batch_size="")).runtime_profile, "null request overlay"
+
+    # Same dance, but with HS2:
+    assert ("DEBUG_ACTION", "custom") in self.get_set_results(), "baseline"
+    self.execute_statement("set debug_action='hey'")
+    assert ("DEBUG_ACTION", "hey") in self.get_set_results(), "session override"
+    self.execute_statement("set debug_action=''")
+    assert ("DEBUG_ACTION", "custom") in self.get_set_results(), "reset"
+
+    # Request Overlay
+    self.execute_statement("set batch_size=123")
+    execute_statement_resp = self.execute_statement("select 1", conf_overlay=dict(batch_size="100"))
+    get_profile_req = ImpalaHiveServer2Service.TGetRuntimeProfileReq()
+    get_profile_req.operationHandle = execute_statement_resp.operationHandle
+    get_profile_req.sessionHandle = self.session_handle
+    assert "BATCH_SIZE=100" in self.hs2_client.GetRuntimeProfile(get_profile_req).profile
+
+    # Null request overlay
+    self.execute_statement("set batch_size=999")
+    execute_statement_resp = self.execute_statement("select 1", conf_overlay=dict(batch_size=""))
+    get_profile_req = ImpalaHiveServer2Service.TGetRuntimeProfileReq()
+    get_profile_req.operationHandle = execute_statement_resp.operationHandle
+    get_profile_req.sessionHandle = self.session_handle
+    assert "BATCH_SIZE=999" in self.hs2_client.GetRuntimeProfile(get_profile_req).profile
+
+  def get_set_results(self):
+    """
+    Executes a "SET" HiveServer2 query and returns a list
+    of key-value tuples.
+    """
+    execute_statement_resp = self.execute_statement("set")
+    fetch_results_req = TCLIService.TFetchResultsReq()
+    fetch_results_req.operationHandle = execute_statement_resp.operationHandle
+    fetch_results_req.maxRows = 100
+    fetch_results_resp = self.hs2_client.FetchResults(fetch_results_req)
+    return zip(fetch_results_resp.results.columns[0].stringVal.values,
+            fetch_results_resp.results.columns[1].stringVal.values)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/c9740b43/tests/hs2/hs2_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/hs2/hs2_test_suite.py b/tests/hs2/hs2_test_suite.py
index 1b2f89f..da9f68b 100644
--- a/tests/hs2/hs2_test_suite.py
+++ b/tests/hs2/hs2_test_suite.py
@@ -228,11 +228,13 @@ class HS2TestSuite(ImpalaTestSuite):
     assert False, 'Did not reach expected operation state %s in time, actual state was ' \
         '%s' % (expected_state, get_operation_status_resp.operationState)
 
-  def execute_statement(self, statement):
+  def execute_statement(self, statement, conf_overlay=None):
     """Executes statement and returns response, which is checked."""
     execute_statement_req = TCLIService.TExecuteStatementReq()
     execute_statement_req.sessionHandle = self.session_handle
     execute_statement_req.statement = statement
+    if conf_overlay:
+      execute_statement_req.confOverlay = conf_overlay
     execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
     HS2TestSuite.check_response(execute_statement_resp)
     return execute_statement_resp


[2/3] incubator-impala git commit: IMPALA-5988: optimise MemPool::TryAllocate()

Posted by sa...@apache.org.
IMPALA-5988: optimise MemPool::TryAllocate()

Testing:
Ran core tests.

Perf:
Experiments using this on top of a WIP Avro patch for IMPALA-5307
showed noticable improvements in CPU efficiency - up to 10%

Change-Id: I088012084fe535a67a3cd1103ced5a02027f961a
Reviewed-on: http://gerrit.cloudera.org:8080/8145
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/226c99e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/226c99e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/226c99e0

Branch: refs/heads/master
Commit: 226c99e01c49d3d25840c5982474740a7dcb9c62
Parents: 4dd0f1b
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Sep 21 16:01:15 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Oct 5 02:50:36 2017 +0000

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc     |  3 ++-
 be/src/exec/data-source-scan-node.cc  |  3 ++-
 be/src/exec/hdfs-text-scanner.cc      |  2 +-
 be/src/exec/text-converter.inline.h   |  2 +-
 be/src/exprs/scalar-expr-evaluator.cc |  2 +-
 be/src/runtime/mem-pool.h             | 41 ++++++++++++++++++------------
 6 files changed, 32 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/226c99e0/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index af4f866..a3551f3 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -388,7 +388,8 @@ Status AnalyticEvalNode::AddResultTuple(int64_t stream_idx) {
     StringValue* sv = reinterpret_cast<StringValue*>(
         result_tuple->GetSlot(slot_desc->tuple_offset()));
     if (sv == nullptr || sv->len == 0) continue;
-    char* new_ptr = reinterpret_cast<char*>(cur_tuple_pool->TryAllocate(sv->len));
+    char* new_ptr = reinterpret_cast<char*>(
+        cur_tuple_pool->TryAllocateUnaligned(sv->len));
     if (UNLIKELY(new_ptr == nullptr)) {
       return cur_tuple_pool->mem_tracker()->MemLimitExceeded(nullptr,
           "Failed to allocate memory for analytic function's result.", sv->len);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/226c99e0/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 78ba492..2639510 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -222,7 +222,8 @@ Status DataSourceScanNode::MaterializeNextRow(MemPool* tuple_pool, Tuple* tuple)
           }
           const string& val = col.string_vals[val_idx];
           size_t val_size = val.size();
-          char* buffer = reinterpret_cast<char*>(tuple_pool->TryAllocate(val_size));
+          char* buffer = reinterpret_cast<char*>(
+              tuple_pool->TryAllocateUnaligned(val_size));
           if (UNLIKELY(buffer == NULL)) {
             string details = Substitute(ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow",
                 val_size, "string slot");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/226c99e0/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index eea4e80..a548ca1 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -870,7 +870,7 @@ Status HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) {
   bool needs_escape = data->len < 0;
   int copy_len = needs_escape ? -data->len : data->len;
   int64_t total_len = copy_len + boundary_column_.len();
-  char* str_data = reinterpret_cast<char*>(pool->TryAllocate(total_len));
+  char* str_data = reinterpret_cast<char*>(pool->TryAllocateUnaligned(total_len));
   if (UNLIKELY(str_data == nullptr)) {
     string details = Substitute("HdfsTextScanner::CopyBoundaryField() failed to allocate "
         "$0 bytes.", total_len);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/226c99e0/be/src/exec/text-converter.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/text-converter.inline.h b/be/src/exec/text-converter.inline.h
index d8dea91..49f4e8a 100644
--- a/be/src/exec/text-converter.inline.h
+++ b/be/src/exec/text-converter.inline.h
@@ -79,7 +79,7 @@ inline bool TextConverter::WriteSlot(const SlotDescriptor* slot_desc, Tuple* tup
         // 3. HdfsScanner::WriteCompleteTuple() always calls this function with
         //    'copy_string' == false.
         str.ptr = type.IsVarLenStringType() ?
-            reinterpret_cast<char*>(pool->TryAllocate(buffer_len)) :
+            reinterpret_cast<char*>(pool->TryAllocateUnaligned(buffer_len)) :
             reinterpret_cast<char*>(slot);
         if (UNLIKELY(str.ptr == NULL)) return false;
         if (need_escape) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/226c99e0/be/src/exprs/scalar-expr-evaluator.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/scalar-expr-evaluator.cc b/be/src/exprs/scalar-expr-evaluator.cc
index b2e0aa6..a327dd4 100644
--- a/be/src/exprs/scalar-expr-evaluator.cc
+++ b/be/src/exprs/scalar-expr-evaluator.cc
@@ -252,7 +252,7 @@ Status ScalarExprEvaluator::GetConstValue(RuntimeState* state, const ScalarExpr&
     StringVal* sv = reinterpret_cast<StringVal*>(*const_val);
     if (!sv->is_null && sv->len > 0) {
       // Make sure the memory is owned by this evaluator.
-      char* ptr_copy = reinterpret_cast<char*>(mem_pool_->TryAllocate(sv->len));
+      char* ptr_copy = reinterpret_cast<char*>(mem_pool_->TryAllocateUnaligned(sv->len));
       if (ptr_copy == nullptr) {
         return mem_pool_->mem_tracker()->MemLimitExceeded(
             state, "Could not allocate constant string value", sv->len);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/226c99e0/be/src/runtime/mem-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool.h b/be/src/runtime/mem-pool.h
index 648e382..b748f7f 100644
--- a/be/src/runtime/mem-pool.h
+++ b/be/src/runtime/mem-pool.h
@@ -118,6 +118,13 @@ class MemPool {
     return Allocate<true>(size, alignment);
   }
 
+  /// Same as TryAllocate() except returned memory is not aligned at all.
+  uint8_t* TryAllocateUnaligned(int64_t size) noexcept {
+    // Call templated implementation directly so that it is inlined here and the
+    // alignment logic can be optimised out.
+    return Allocate<true>(size, 1);
+  }
+
   /// Returns 'byte_size' to the current chunk back to the mem pool. This can
   /// only be used to return either all or part of the previous allocation returned
   /// by Allocate().
@@ -234,31 +241,33 @@ class MemPool {
   }
 
   template <bool CHECK_LIMIT_FIRST>
-  uint8_t* Allocate(int64_t size, int alignment) noexcept {
+  uint8_t* ALWAYS_INLINE Allocate(int64_t size, int alignment) noexcept {
     DCHECK_GE(size, 0);
     if (UNLIKELY(size == 0)) return reinterpret_cast<uint8_t*>(&zero_length_region_);
 
-    bool fits_in_chunk = false;
     if (current_chunk_idx_ != -1) {
+      ChunkInfo& info = chunks_[current_chunk_idx_];
       int64_t aligned_allocated_bytes = BitUtil::RoundUpToPowerOf2(
-          chunks_[current_chunk_idx_].allocated_bytes, alignment);
-      if (aligned_allocated_bytes + size <= chunks_[current_chunk_idx_].size) {
+          info.allocated_bytes, alignment);
+      if (aligned_allocated_bytes + size <= info.size) {
         // Ensure the requested alignment is respected.
-        total_allocated_bytes_ +=
-            aligned_allocated_bytes - chunks_[current_chunk_idx_].allocated_bytes;
-        chunks_[current_chunk_idx_].allocated_bytes = aligned_allocated_bytes;
-        fits_in_chunk = true;
+        int64_t padding = aligned_allocated_bytes - info.allocated_bytes;
+        uint8_t* result = info.data + aligned_allocated_bytes;
+        ASAN_UNPOISON_MEMORY_REGION(result, size);
+        DCHECK_LE(info.allocated_bytes + size, info.size);
+        info.allocated_bytes += padding + size;
+        total_allocated_bytes_ += padding + size;
+        DCHECK_LE(current_chunk_idx_, chunks_.size() - 1);
+        return result;
       }
     }
 
-    if (!fits_in_chunk) {
-      // If we couldn't allocate a new chunk, return NULL. malloc() guarantees alignment
-      // of alignof(std::max_align_t), so we do not need to do anything additional to
-      // guarantee alignment.
-      static_assert(
-          INITIAL_CHUNK_SIZE >= alignof(std::max_align_t), "Min chunk size too low");
-      if (UNLIKELY(!FindChunk(size, CHECK_LIMIT_FIRST))) return NULL;
-    }
+    // If we couldn't allocate a new chunk, return NULL. malloc() guarantees alignment
+    // of alignof(std::max_align_t), so we do not need to do anything additional to
+    // guarantee alignment.
+    static_assert(
+        INITIAL_CHUNK_SIZE >= alignof(std::max_align_t), "Min chunk size too low");
+    if (UNLIKELY(!FindChunk(size, CHECK_LIMIT_FIRST))) return NULL;
 
     ChunkInfo& info = chunks_[current_chunk_idx_];
     uint8_t* result = info.data + info.allocated_bytes;