You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/16 01:53:04 UTC

[1/4] impala git commit: IMPALA-4456: Address scalability issues of qs_map_lock_ and client_request_state_map_lock_

Repository: impala
Updated Branches:
  refs/heads/2.x 26309141d -> 354293d8c


IMPALA-4456: Address scalability issues of qs_map_lock_ and client_request_state_map_lock_

The following 2 locks have shown to be frequent points of contention
on recent perf runs:

- qs_map_lock_
- client_request_state_map_lock_

Since these are process wide locks, any threads waiting on these locks
potentially slow down the runtime of a query.

I tried to address this previously by converting the client_request_state_map_lock_
to a reader-writer lock. This showed great perf improvements in the general
case, however, there were edge cases with big regressions as well.
In the general case, strict readers of the map got through so quickly
that we were able to see a reduction in the number of client connections
created, since this lock was contended for in the context of Thrift threads too.
The bad case is when writers were starved trying to register a new query
since there were so many readers. Changing the starve option resulted in
worse read performance all round.

Another approach which is relatively simpler is to shard the locks, which
proves to be very effective with no regressions. The maps and locks are
sharded to a default of 4 buckets initally.

Query IDs are created by using boost::uuids::random_generator. We use the
high bits of a query ID to assign queries to buckets. I verified that the
distribution of the high bits of a query ID are even across buckets on
my local machine:

For 10,000 queries sharded across 4 buckets, the distribution was:
bucket[0]: 2500
bucket[1]: 2489
bucket[2]: 2566
bucket[3]: 2445

A micro-benchmark is added to measure the improvement in performance. This
benchmark creates multiple threads each of which creates a QueryState and
accesses it multiple times. We can see improvements in the range 2x - 3.5x.

BEFORE:
------------------Benchmark 1: Create and access Query States.
Total Time (#Queries: 5 #Accesses: 100) : 1ms
Total Time (#Queries: 50 #Accesses: 100) : 8ms
Total Time (#Queries: 50 #Accesses: 1000) : 54ms
Total Time (#Queries: 500 #Accesses: 100) : 76ms
Total Time (#Queries: 500 #Accesses: 1000) : 543ms

AFTER:
------------------Benchmark 1: Create and access Query States.
Total Time (#Queries: 5 #Accesses: 100) : 2173.59K clock cycles
Total Time (#Queries: 50 #Accesses: 100) : 4ms
Total Time (#Queries: 50 #Accesses: 1000) : 15ms
Total Time (#Queries: 500 #Accesses: 100) : 46ms
Total Time (#Queries: 500 #Accesses: 1000) : 151ms

This change introduces a ShardedQueryMap, which is used to replace
the QueryExecMgr::qs_map_ and the ImpalaServer::client_request_state_map_,
and their corresponding locks, thereby abstracting away the access to the
maps locks.

For operations that need to happen on every entry in the ShardedQueryMap
maps, a new function ShardedQueryMap::DoFuncForAllEntries() is
introduced which takes a user supplied lambda and passes it every individual
map entry and executes it.

NOTE: This microbenchmark has shown that SpinLock has better performance
than boost::mutex for the qs_map_lock_'s, so that change has been made
too.

TODO: Add benchmark for client_request_state_map_lock_ too. The APIs
around that are more complicated, so this patch only includes
the benchmarking of qs_map_lock_.

TODO 2: Consider adopting the ShardedQueryMapTemplate for the SessionStateMap.

Change-Id: I61089090e1095da45a8a64ed3ccc78bd310807f1
Reviewed-on: http://gerrit.cloudera.org:8080/8363
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/774656a7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/774656a7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/774656a7

Branch: refs/heads/2.x
Commit: 774656a736fa0761b395b2c8dce28d894e028d8f
Parents: 5558216
Author: Sailesh Mukil <sa...@apache.org>
Authored: Fri Oct 6 16:27:47 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 15 06:19:23 2018 +0000

----------------------------------------------------------------------
 be/src/benchmarks/CMakeLists.txt                |   1 +
 .../benchmarks/process-wide-locks-benchmark.cc  | 174 +++++++++++++++++++
 be/src/runtime/query-exec-mgr.cc                |  32 ++--
 be/src/runtime/query-exec-mgr.h                 |  10 +-
 be/src/service/impala-http-handler.cc           |  23 ++-
 be/src/service/impala-server.cc                 |  34 ++--
 be/src/service/impala-server.h                  |  29 ++--
 be/src/util/sharded-query-map-util.h            | 135 ++++++++++++++
 8 files changed, 379 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/774656a7/be/src/benchmarks/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/CMakeLists.txt b/be/src/benchmarks/CMakeLists.txt
index a569a66..02fbaad 100644
--- a/be/src/benchmarks/CMakeLists.txt
+++ b/be/src/benchmarks/CMakeLists.txt
@@ -48,6 +48,7 @@ ADD_BE_BENCHMARK(multiint-benchmark)
 ADD_BE_BENCHMARK(network-perf-benchmark)
 ADD_BE_BENCHMARK(overflow-benchmark)
 ADD_BE_BENCHMARK(parse-timestamp-benchmark)
+ADD_BE_BENCHMARK(process-wide-locks-benchmark)
 ADD_BE_BENCHMARK(row-batch-serialize-benchmark)
 ADD_BE_BENCHMARK(scheduler-benchmark)
 ADD_BE_BENCHMARK(status-benchmark)

http://git-wip-us.apache.org/repos/asf/impala/blob/774656a7/be/src/benchmarks/process-wide-locks-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/process-wide-locks-benchmark.cc b/be/src/benchmarks/process-wide-locks-benchmark.cc
new file mode 100644
index 0000000..ffe4268
--- /dev/null
+++ b/be/src/benchmarks/process-wide-locks-benchmark.cc
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <iostream>
+
+#include <boost/bind.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/uuid/uuid.hpp>
+#include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
+
+#include "runtime/query-exec-mgr.h"
+#include "runtime/query-state.h"
+#include "runtime/test-env.h"
+#include "scheduling/request-pool-service.h"
+#include "service/fe-support.h"
+#include "util/benchmark.h"
+#include "util/cpu-info.h"
+#include "util/debug-util.h"
+#include "util/metrics.h"
+#include "util/pretty-printer.h"
+#include "util/stopwatch.h"
+#include "util/thread.h"
+#include "util/uid-util.h"
+
+#include "common/init.h"
+#include "common/names.h"
+
+/// This tests the performance of the following process wide locks:
+//
+/// 1. qs_map_lock_ (Sharded)
+/// TODO: client_request_state_map_lock_ (Sharded)
+//
+/// A reasonable amount of queries are created and accessed multiple times via the
+/// QueryExecMgr's APIs to benchmark the time taken to acquire the lock and retrieve
+/// the QueryState.
+//
+/// ------------------Benchmark 1: Create and access Query States.
+/// Total Time (#Queries: 5 #Accesses: 100) : 2202.44K clock cycles
+/// Total Time (#Queries: 50 #Accesses: 100) : 4ms
+/// Total Time (#Queries: 50 #Accesses: 1000) : 16ms
+/// Total Time (#Queries: 500 #Accesses: 100) : 46ms
+/// Total Time (#Queries: 500 #Accesses: 1000) : 129ms
+/// Total Time (#Queries: 500 #Accesses: 5000) : 518ms
+/// Total Time (#Queries: 1000 #Accesses: 1000) : 246ms
+/// Total Time (#Queries: 1000 #Accesses: 5000) : 1s018ms
+//
+/// This was created to test improvements for IMPALA-4456.
+
+using boost::uuids::random_generator;
+
+using namespace impala;
+
+boost::scoped_ptr<TestEnv> test_env_;
+vector<TUniqueId> query_ids;
+
+// This function creates a QueryState and accesses it 'num_accesses' times, via the
+// QueryExecMgr APIs.
+// TODO: Add a similar funciton for ClientRequestStates.
+void CreateAndAccessQueryStates(const TUniqueId& query_id, int num_accesses) {
+  TQueryCtx query_ctx;
+  query_ctx.query_id = query_id;
+
+  string resolved_pool;
+  Status s = ExecEnv::GetInstance()->request_pool_service()->ResolveRequestPool(
+      query_ctx, &resolved_pool);
+
+  query_ctx.__set_request_pool(resolved_pool);
+
+  QueryState *query_state;
+  query_state = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx);
+  DCHECK(query_state != nullptr);
+  query_state->AcquireExecResourceRefcount();
+
+  for (int i=0; i < num_accesses ; ++i) {
+    QueryState* qs;
+    qs = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
+    DCHECK(qs != nullptr);
+    ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(qs);
+  }
+
+  query_state->ReleaseExecResourceRefcount();
+  // This should drop the last reference count to the QueryState and destroy it.
+  ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state);
+  // Make sure that the query doesn't exist in the map any longer.
+  DCHECK(ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id) == nullptr);
+
+}
+
+// Runs 'num_threads' Impala Threads and have each of them execute func().
+void ImpalaThreadStarter(void (*func) (const TUniqueId&, int), int num_threads,
+    int func_arg) {
+  vector<unique_ptr<Thread>> threads;
+  threads.reserve(num_threads);
+
+  for (int i=0; i < num_threads; ++i) {
+    unique_ptr<Thread> thread;
+    function<void ()> f =
+        bind(func, query_ids[i], func_arg);
+    Status s =
+        Thread::Create("mythreadgroup", "thread", f, &thread);
+    DCHECK(s.ok());
+    threads.push_back(move(thread));
+  }
+  for (unique_ptr<Thread>& thread: threads) {
+    thread->Join();
+  }
+}
+
+void RunBenchmark(int num_queries, int num_accesses) {
+  StopWatch total_time;
+  total_time.Start();
+  ImpalaThreadStarter(CreateAndAccessQueryStates, num_queries, num_accesses);
+  total_time.Stop();
+
+  cout << "Total Time " << "(#Queries: " << num_queries << " #Accesses: "
+       << num_accesses << ") : "
+       << PrettyPrinter::Print(total_time.ElapsedTime(), TUnit::CPU_TICKS) << endl;
+}
+
+// Create and store 'num_queries' Query IDs into 'query_ids'.
+void CreateQueryIds(int num_queries) {
+  for (int i=0; i < num_queries; ++i) {
+    query_ids[i] = UuidToQueryId(random_generator()());
+  }
+}
+
+int main(int argc, char **argv) {
+  // Though we don't use the JVM or require FeSupport, the TestEnv class requires it,
+  // so we start them up.
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+
+  const int MAX_QUERIES = 1000;
+
+  query_ids.reserve(MAX_QUERIES);
+
+  test_env_.reset(new TestEnv());
+  ABORT_IF_ERROR(test_env_->Init());
+
+  CreateQueryIds(MAX_QUERIES);
+
+  cout << "------------------Benchmark 1: Create and access Query States." << endl;
+  RunBenchmark(5, 100);
+  RunBenchmark(50, 100);
+  RunBenchmark(50, 1000);
+  RunBenchmark(500, 100);
+  RunBenchmark(500, 1000);
+  RunBenchmark(500, 5000);
+  RunBenchmark(1000, 1000);
+  RunBenchmark(1000, 5000);
+
+  cout << endl;
+
+  // TODO: Benchmark lock of ClientRequestStates too.
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/774656a7/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 316b712..967dc4b 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -82,9 +82,12 @@ QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
   QueryState* qs = nullptr;
   int refcnt;
   {
-    lock_guard<mutex> l(qs_map_lock_);
-    auto it = qs_map_.find(query_id);
-    if (it == qs_map_.end()) return nullptr;
+    ScopedShardedMapRef<QueryState*> map_ref(query_id,
+        &ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto it = map_ref->find(query_id);
+    if (it == map_ref->end()) return nullptr;
     qs = it->second;
     refcnt = qs->refcnt_.Add(1);
   }
@@ -98,12 +101,15 @@ QueryState* QueryExecMgr::GetOrCreateQueryState(
   QueryState* qs = nullptr;
   int refcnt;
   {
-    lock_guard<mutex> l(qs_map_lock_);
-    auto it = qs_map_.find(query_ctx.query_id);
-    if (it == qs_map_.end()) {
+    ScopedShardedMapRef<QueryState*> map_ref(query_ctx.query_id,
+        &ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto it = map_ref->find(query_ctx.query_id);
+    if (it == map_ref->end()) {
       // register new QueryState
       qs = new QueryState(query_ctx);
-      qs_map_.insert(make_pair(query_ctx.query_id, qs));
+      map_ref->insert(make_pair(query_ctx.query_id, qs));
       *created = true;
     } else {
       qs = it->second;
@@ -153,18 +159,20 @@ void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
 
   QueryState* qs_from_map = nullptr;
   {
-    // for now, gc right away
-    lock_guard<mutex> l(qs_map_lock_);
-    auto it = qs_map_.find(query_id);
+    ScopedShardedMapRef<QueryState*> map_ref(query_id,
+        &ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto it = map_ref->find(query_id);
     // someone else might have gc'd the entry
-    if (it == qs_map_.end()) return;
+    if (it == map_ref->end()) return;
     qs_from_map = it->second;
     DCHECK_EQ(qs_from_map->query_ctx().query_id, query_id);
     int32_t cnt = qs_from_map->refcnt_.Load();
     DCHECK_GE(cnt, 0);
     // someone else might have increased the refcnt in the meantime
     if (cnt > 0) return;
-    qs_map_.erase(it);
+    map_ref->erase(it);
   }
   // TODO: send final status report during gc, but do this from a different thread
   delete qs_from_map;

http://git-wip-us.apache.org/repos/asf/impala/blob/774656a7/be/src/runtime/query-exec-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index 8a0c884..bddd731 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -23,8 +23,8 @@
 #include <unordered_map>
 
 #include "common/status.h"
-#include "util/uid-util.h"
 #include "gen-cpp/Types_types.h"
+#include "util/sharded-query-map-util.h"
 
 namespace impala {
 
@@ -39,7 +39,7 @@ class FragmentInstanceState;
 /// entry point for gaining refcounted access to a QueryState. It also initiates
 /// query execution.
 /// Thread-safe.
-class QueryExecMgr {
+class QueryExecMgr : public CacheLineAligned {
  public:
   /// Creates QueryState if it doesn't exist and initiates execution of all fragment
   /// instance for this query. All fragment instances hold a reference to their
@@ -64,11 +64,9 @@ class QueryExecMgr {
   void ReleaseQueryState(QueryState* qs);
 
  private:
-  /// protects qs_map_
-  boost::mutex qs_map_lock_;
 
-  /// map from query id to QueryState (owned by us)
-  std::unordered_map<TUniqueId, QueryState*> qs_map_;
+  typedef ShardedQueryMap<QueryState*> QueryStateMap;
+  QueryStateMap qs_map_;
 
   /// Gets the existing QueryState or creates a new one if not present.
   /// 'created' is set to true if it was created, false otherwise.

http://git-wip-us.apache.org/repos/asf/impala/blob/774656a7/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index b633f2a..0156023 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -37,6 +37,7 @@
 #include "util/redactor.h"
 #include "util/summary-util.h"
 #include "util/time.h"
+#include "util/uid-util.h"
 #include "util/webserver.h"
 
 #include "common/names.h"
@@ -243,12 +244,11 @@ void ImpalaHttpHandler::QueryProfileEncodedHandler(const Webserver::ArgumentMap&
 
 void ImpalaHttpHandler::InflightQueryIdsHandler(const Webserver::ArgumentMap& args,
     Document* document) {
-  lock_guard<mutex> l(server_->client_request_state_map_lock_);
   stringstream ss;
-  for (const ImpalaServer::ClientRequestStateMap::value_type& request_state:
-       server_->client_request_state_map_) {
-    ss << request_state.second->query_id() << "\n";
-  }
+  server_->client_request_state_map_.DoFuncForAllEntries(
+      [&](const std::shared_ptr<ClientRequestState>& request_state) {
+          ss << request_state->query_id() << "\n";
+      });
   document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator());
   Value query_ids(ss.str().c_str(), document->GetAllocator());
   document->AddMember("contents", query_ids, document->GetAllocator());
@@ -366,14 +366,11 @@ void ImpalaHttpHandler::QueryStateHandler(const Webserver::ArgumentMap& args,
     Document* document) {
   set<ImpalaServer::QueryStateRecord, ImpalaServer::QueryStateRecordLessThan>
       sorted_query_records;
-  {
-    lock_guard<mutex> l(server_->client_request_state_map_lock_);
-    for (const ImpalaServer::ClientRequestStateMap::value_type& request_state:
-         server_->client_request_state_map_) {
-      // TODO: Do this in the browser so that sorts on other keys are possible.
-      sorted_query_records.insert(ImpalaServer::QueryStateRecord(*request_state.second));
-    }
-  }
+
+  server_->client_request_state_map_.DoFuncForAllEntries(
+      [&](const std::shared_ptr<ClientRequestState>& request_state) {
+          sorted_query_records.insert(ImpalaServer::QueryStateRecord(*request_state));
+      });
 
   Value in_flight_queries(kArrayType);
   int64_t num_waiting_queries = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/774656a7/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 7cbbd16..f20d96b 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -926,6 +926,7 @@ void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) {
   // single generator under a lock (since random_generator is not
   // thread-safe).
   query_ctx->query_id = UuidToQueryId(random_generator()());
+
 }
 
 Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
@@ -938,16 +939,19 @@ Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
   if (session_state->closed) return Status("Session has been closed, ignoring query.");
   const TUniqueId& query_id = request_state->query_id();
   {
-    lock_guard<mutex> l(client_request_state_map_lock_);
-    ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id);
-    if (entry != client_request_state_map_.end()) {
+    ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id,
+        &ExecEnv::GetInstance()->impala_server()->client_request_state_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto entry = map_ref->find(query_id);
+    if (entry != map_ref->end()) {
       // There shouldn't be an active query with that same id.
       // (query_id is globally unique)
       stringstream ss;
       ss << "query id " << PrintId(query_id) << " already exists";
       return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, ss.str()));
     }
-    client_request_state_map_.insert(make_pair(query_id, request_state));
+    map_ref->insert(make_pair(query_id, request_state));
   }
   // Metric is decremented in UnregisterQuery().
   ImpaladMetrics::NUM_QUERIES_REGISTERED->Increment(1L);
@@ -994,14 +998,17 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
 
   shared_ptr<ClientRequestState> request_state;
   {
-    lock_guard<mutex> l(client_request_state_map_lock_);
-    ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id);
-    if (entry == client_request_state_map_.end()) {
+    ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id,
+        &ExecEnv::GetInstance()->impala_server()->client_request_state_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto entry = map_ref->find(query_id);
+    if (entry == map_ref->end()) {
       return Status("Invalid or unknown query handle");
     } else {
       request_state = entry->second;
     }
-    client_request_state_map_.erase(entry);
+    map_ref->erase(entry);
   }
 
   request_state->Done();
@@ -2051,12 +2058,15 @@ void ImpalaServer::Join() {
 
 shared_ptr<ClientRequestState> ImpalaServer::GetClientRequestState(
     const TUniqueId& query_id) {
-  lock_guard<mutex> l(client_request_state_map_lock_);
-  ClientRequestStateMap::iterator i = client_request_state_map_.find(query_id);
-  if (i == client_request_state_map_.end()) {
+  ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id,
+      &ExecEnv::GetInstance()->impala_server()->client_request_state_map_);
+  DCHECK(map_ref.get() != nullptr);
+
+  auto entry = map_ref->find(query_id);
+  if (entry == map_ref->end()) {
     return shared_ptr<ClientRequestState>();
   } else {
-    return i->second;
+    return entry->second;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/774656a7/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 237b0cb..abe8694 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -22,11 +22,11 @@
 #include <boost/thread/mutex.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/scoped_ptr.hpp>
-#include <boost/unordered_map.hpp>
 #include <boost/unordered_set.hpp>
 #include <boost/uuid/uuid.hpp>
 #include <boost/uuid/uuid_generators.hpp>
 #include <boost/uuid/uuid_io.hpp>
+#include <unordered_map>
 
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/ImpalaHiveServer2Service.h"
@@ -39,10 +39,10 @@
 #include "util/condition-variable.h"
 #include "util/metrics.h"
 #include "util/runtime-profile.h"
+#include "util/sharded-query-map-util.h"
 #include "util/simple-logger.h"
 #include "util/thread-pool.h"
 #include "util/time.h"
-#include "util/uid-util.h"
 #include "runtime/coordinator.h"
 #include "runtime/runtime-state.h"
 #include "runtime/timestamp-value.h"
@@ -126,7 +126,6 @@ class ClientRequestState;
 /// * uuid_lock_
 /// * catalog_version_lock_
 /// * connection_to_sessions_map_lock_
-/// * client_request_state_map_lock_
 ///
 /// TODO: The state of a running query is currently not cleaned up if the
 /// query doesn't experience any errors at runtime and close() doesn't get called.
@@ -138,8 +137,10 @@ class ClientRequestState;
 class ImpalaServer : public ImpalaServiceIf,
                      public ImpalaHiveServer2ServiceIf,
                      public ThriftServer::ConnectionHandlerIf,
-                     public boost::enable_shared_from_this<ImpalaServer> {
+                     public boost::enable_shared_from_this<ImpalaServer>,
+                     public CacheLineAligned {
  public:
+
   ImpalaServer(ExecEnv* exec_env);
   ~ImpalaServer();
 
@@ -502,8 +503,8 @@ class ImpalaServer : public ImpalaServiceIf,
       std::shared_ptr<SessionState> session_state, bool* registered_exec_state,
       std::shared_ptr<ClientRequestState>* exec_state) WARN_UNUSED_RESULT;
 
-  /// Registers the query exec state with client_request_state_map_ using the globally
-  /// unique query_id and add the query id to session state's open query list.
+  /// Registers the query exec state with client_request_state_map_ using the
+  /// globally unique query_id and add the query id to session state's open query list.
   /// The caller must have checked out the session state.
   Status RegisterQuery(std::shared_ptr<SessionState> session_state,
       const std::shared_ptr<ClientRequestState>& exec_state) WARN_UNUSED_RESULT;
@@ -521,9 +522,9 @@ class ImpalaServer : public ImpalaServiceIf,
       const std::shared_ptr<ClientRequestState>& exec_state) WARN_UNUSED_RESULT;
 
   /// Unregister the query by cancelling it, removing exec_state from
-  /// client_request_state_map_, and removing the query id from session state's in-flight
-  /// query list.  If check_inflight is true, then return an error if the query is not
-  /// yet in-flight.  Otherwise, proceed even if the query isn't yet in-flight (for
+  /// client_request_state_map_, and removing the query id from session state's
+  /// in-flight query list.  If check_inflight is true, then return an error if the query
+  /// is not yet in-flight.  Otherwise, proceed even if the query isn't yet in-flight (for
   /// cleaning up after an error on the query issuing path).
   Status UnregisterQuery(const TUniqueId& query_id, bool check_inflight,
       const Status* cause = NULL) WARN_UNUSED_RESULT;
@@ -623,7 +624,7 @@ class ImpalaServer : public ImpalaServiceIf,
 
   /// Copies a query's state into the query log. Called immediately prior to a
   /// ClientRequestState's deletion. Also writes the query profile to the profile log
-  /// on disk. Must be called with client_request_state_map_lock_ held
+  /// on disk.
   void ArchiveQuery(const ClientRequestState& query);
 
   /// Checks whether the given user is allowed to delegate as the specified do_as_user.
@@ -868,16 +869,12 @@ class ImpalaServer : public ImpalaServiceIf,
   /// when there are sessions that have a timeout.
   ConditionVariable session_timeout_cv_;
 
-  /// map from query id to exec state; ClientRequestState is owned by us and referenced
+  /// maps from query id to exec state; ClientRequestState is owned by us and referenced
   /// as a shared_ptr to allow asynchronous deletion
-  typedef boost::unordered_map<TUniqueId, std::shared_ptr<ClientRequestState>>
+  typedef class ShardedQueryMap<std::shared_ptr<ClientRequestState>>
       ClientRequestStateMap;
   ClientRequestStateMap client_request_state_map_;
 
-  /// Protects client_request_state_map_. See "Locking" in the class comment for lock
-  /// acquisition order.
-  boost::mutex client_request_state_map_lock_;
-
   /// Default query options in the form of TQueryOptions and beeswax::ConfigVariable
   TQueryOptions default_query_options_;
   std::vector<beeswax::ConfigVariable> default_configs_;

http://git-wip-us.apache.org/repos/asf/impala/blob/774656a7/be/src/util/sharded-query-map-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/sharded-query-map-util.h b/be/src/util/sharded-query-map-util.h
new file mode 100644
index 0000000..59dc2d5
--- /dev/null
+++ b/be/src/util/sharded-query-map-util.h
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SHARDED_QUERY_MAP_UTIL_H
+#define SHARDED_QUERY_MAP_UTIL_H
+
+#include <boost/thread/lock_guard.hpp>
+#include <unordered_map>
+
+#include "gen-cpp/Types_types.h"
+#include "util/aligned-new.h"
+#include "util/spinlock.h"
+#include "util/uid-util.h"
+
+namespace impala {
+
+/// This is a template that can be used for any map that maps from a query ID (TUniqueId)
+/// to some object, and that needs to be sharded. It provides a SpinLock per shard to
+/// synchronize access to each shard of the map. The underlying shard is locked and
+/// accessed by instantiating a ScopedShardedMapRef.
+//
+/// Usage pattern:
+//
+///   typedef ShardedQueryMap<QueryState*> QueryStateMap;
+///   QueryStateMap qs_map_;
+//
+template<typename T>
+class ShardedQueryMap {
+ public:
+
+  // This function takes a lambda which should take a parameter of object 'T' and
+  // runs the lambda for all the entries in the map. The lambda should have a return
+  // type of 'void'..
+  // TODO: If necessary, refactor the lambda signature to allow returning Status objects.
+  void DoFuncForAllEntries(const std::function<void(const T&)>& call) {
+    for (int i = 0; i < NUM_QUERY_BUCKETS; ++i) {
+      boost::lock_guard<SpinLock> l(shards_[i].map_lock_);
+      for (const auto& map_value_ref: shards_[i].map_) {
+        call(map_value_ref.second);
+      }
+    }
+  }
+
+ private:
+  template <typename T2>
+  friend class ScopedShardedMapRef;
+
+  // Number of buckets to split the containers of query IDs into.
+  static constexpr uint32_t NUM_QUERY_BUCKETS = 4;
+
+  // We group the map and its corresponding lock together to avoid false sharing. Since
+  // we will always access a map and its corresponding lock together, it's better if
+  // they can be allocated on the same cache line.
+  struct MapShard : public CacheLineAligned {
+    std::unordered_map<TUniqueId, T> map_;
+    SpinLock map_lock_;
+  };
+  struct MapShard shards_[NUM_QUERY_BUCKETS];
+};
+
+/// Use this class to obtain a locked reference to the underlying map shard
+/// of a ShardedQueryMap, corresponding to the 'query_id'.
+//
+/// Pattern:
+/// {
+///   ScopedShardedMapRef map_ref(qid, sharded_map);
+///   DCHECK(map_ref != nullptr);  <nullptr should never be returned>
+///   ...
+/// }
+//
+/// The caller should ensure that the lifetime of the ShardedQueryMap should be longer
+/// than the lifetime of this scoped class.
+template <typename T>
+class ScopedShardedMapRef {
+ public:
+
+  // Finds the appropriate map that could/should contain 'query_id' and locks it.
+  ScopedShardedMapRef(
+      const TUniqueId& query_id, class ShardedQueryMap<T>* sharded_map) {
+    DCHECK(sharded_map != nullptr);
+    int qs_map_bucket = QueryIdToBucket(query_id);
+    shard_ = &sharded_map->shards_[qs_map_bucket];
+
+    // Lock the corresponding shard.
+    shard_->map_lock_.lock();
+  }
+
+  ~ScopedShardedMapRef() {
+    shard_->map_lock_.DCheckLocked();
+    shard_->map_lock_.unlock();
+  }
+
+  // Returns the shard (map) for the 'query_id' passed to the constructor.
+  // Should never return nullptr.
+  std::unordered_map<TUniqueId, T>* get() {
+    shard_->map_lock_.DCheckLocked();
+    return &shard_->map_;
+  }
+
+  std::unordered_map<TUniqueId, T>* operator->() {
+    shard_->map_lock_.DCheckLocked();
+    return get();
+  }
+
+ private:
+
+  // Return the correct bucket that a query ID would belong to.
+  inline int QueryIdToBucket(const TUniqueId& query_id) {
+    int bucket =
+        static_cast<int>(query_id.hi) % ShardedQueryMap<T>::NUM_QUERY_BUCKETS;
+    DCHECK(bucket < ShardedQueryMap<T>::NUM_QUERY_BUCKETS && bucket >= 0);
+    return bucket;
+  }
+
+  typename ShardedQueryMap<T>::MapShard* shard_;
+  DISALLOW_COPY_AND_ASSIGN(ScopedShardedMapRef);
+};
+
+} // namespace impala
+
+#endif /* SHARDED_QUERY_MAP_UTIL_H */


[2/4] impala git commit: IMPALA-6519: API to allocate unreserved buffer

Posted by ta...@apache.org.
IMPALA-6519: API to allocate unreserved buffer

The motivation is to allow allocation of buffers without reservation in
ExchangeNode. Currently this it not possible because
IncreaseReservationToFit() followed by AllocateBuffer() is non-atomic.
We need to handle concurrent allocations in ExchangeNode because there
may be multiple batches being received at a given time.

This is a temporary solution until we can implement proper reservations
in ExchangeNode (IMPALA-6524).

Testing:
Added basic unit test.

Change-Id: Ia4d17b3db25491f796484de22405fbdee7a0f983
Reviewed-on: http://gerrit.cloudera.org:8080/9250
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/55582168
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/55582168
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/55582168

Branch: refs/heads/2.x
Commit: 55582168e84bfe7a5c0ea4f9126c0815b817ee36
Parents: 2630914
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Feb 7 17:39:11 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 15 06:19:23 2018 +0000

----------------------------------------------------------------------
 .../runtime/bufferpool/buffer-pool-internal.h   | 18 +++++--
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 50 ++++++++++++-----
 be/src/runtime/bufferpool/buffer-pool.cc        | 57 +++++++++++++++-----
 be/src/runtime/bufferpool/buffer-pool.h         | 15 ++++++
 .../runtime/bufferpool/reservation-tracker.cc   | 12 +++++
 be/src/runtime/bufferpool/reservation-tracker.h | 12 ++++-
 6 files changed, 131 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/55582168/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 7094942..dee8e4f 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -238,11 +238,19 @@ class BufferPool::Client {
   /// page->pin_in_flight was set to true by StartMoveToPinned().
   Status FinishMoveEvictedToPinned(Page* page) WARN_UNUSED_RESULT;
 
-  /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer()
-  /// API to deduct from the client's reservation and update internal accounting. Cleans
-  /// dirty pages if needed to satisfy the buffer pool's internal invariants. No page or
-  /// client locks should be held by the caller.
-  Status PrepareToAllocateBuffer(int64_t len) WARN_UNUSED_RESULT;
+  /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer() or
+  /// AllocateUnreservedBuffer() APIs. Deducts from the client's reservation and updates
+  /// internal accounting. Cleans dirty pages if needed to satisfy the buffer pool's
+  /// internal invariants. No page or client locks should be held by the caller.
+  /// If 'reserved' is true, we assume that the memory is already reserved. If it is
+  /// false, tries to increase the reservation if needed.
+  ///
+  /// On success, returns OK and sets 'success' to true if non-NULL. If an error is
+  /// encountered, e.g. while cleaning pages, returns an error status. If the reservation
+  /// could not be increased for an unreserved allocation, returns OK and sets 'success'
+  /// to false (for unreserved allocations, 'success' must be non-NULL).
+  Status PrepareToAllocateBuffer(
+      int64_t len, bool reserved, bool* success) WARN_UNUSED_RESULT;
 
   /// Implementation of ClientHandle::DecreaseReservationTo().
   Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/55582168/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 0138a08..d6547d2 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -367,6 +367,7 @@ class BufferPoolTest : public ::testing::Test {
   }
 
   /// Parameterised test implementations.
+  void TestBufferAllocation(bool reserved);
   void TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core);
   void TestEvictionPolicy(int64_t page_size);
   void TestCleanPageLimit(int max_clean_pages, bool randomize_core);
@@ -550,27 +551,43 @@ TEST_F(BufferPoolTest, PageCreation) {
   global_reservations_.Close();
 }
 
-TEST_F(BufferPoolTest, BufferAllocation) {
+TEST_F(BufferPoolTest, ReservedBufferAllocation) {
+  TestBufferAllocation(true);
+}
+
+TEST_F(BufferPoolTest, UnreservedBufferAllocation) {
+  TestBufferAllocation(false);
+}
+
+void BufferPoolTest::TestBufferAllocation(bool reserved) {
   // Allocate many buffers, each a power-of-two multiple of the minimum buffer length.
-  int num_buffers = 16;
-  int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1);
-  int64_t total_mem = 2 * 2 * max_buffer_len;
-  global_reservations_.InitRootTracker(NULL, total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
+  const int NUM_BUFFERS = 16;
+  const int64_t MAX_BUFFER_LEN = TEST_BUFFER_LEN << (NUM_BUFFERS - 1);
+
+  // Total memory required to allocate TEST_BUFFER_LEN, 2*TEST_BUFFER_LEN, ...,
+  // MAX_BUFFER_LEN.
+  const int64_t TOTAL_MEM = 2 * MAX_BUFFER_LEN - TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
-      total_mem, NewProfile(), &client));
-  ASSERT_TRUE(client.IncreaseReservationToFit(total_mem));
+      TOTAL_MEM, NewProfile(), &client));
+  if (reserved) ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM));
 
-  vector<BufferPool::BufferHandle> handles(num_buffers);
+  vector<BufferPool::BufferHandle> handles(NUM_BUFFERS);
 
   // Create buffers of various valid sizes.
   int64_t total_allocated = 0;
-  for (int i = 0; i < num_buffers; ++i) {
+  for (int i = 0; i < NUM_BUFFERS; ++i) {
     int size_multiple = 1 << i;
     int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
     int64_t used_before = client.GetUsedReservation();
-    ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
+    if (reserved) {
+      ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
+    } else {
+      // Reservation should be automatically increased.
+      ASSERT_OK(pool.AllocateUnreservedBuffer(&client, buffer_len, &handles[i]));
+    }
     total_allocated += buffer_len;
     ASSERT_TRUE(handles[i].is_open());
     ASSERT_TRUE(handles[i].data() != NULL);
@@ -583,8 +600,15 @@ TEST_F(BufferPoolTest, BufferAllocation) {
     EXPECT_EQ(0, pool.GetFreeBufferBytes());
   }
 
+  if (!reserved) {
+    // Allocate all of the memory and test the failure path for unreserved allocations.
+    BufferPool::BufferHandle tmp_handle;
+    ASSERT_OK(pool.AllocateUnreservedBuffer(&client, TEST_BUFFER_LEN, &tmp_handle));
+    ASSERT_FALSE(tmp_handle.is_open()) << "No reservation for buffer";
+  }
+
   // Close the handles and check memory consumption.
-  for (int i = 0; i < num_buffers; ++i) {
+  for (int i = 0; i < NUM_BUFFERS; ++i) {
     int64_t used_before = client.GetUsedReservation();
     int buffer_len = handles[i].len();
     pool.FreeBuffer(&client, &handles[i]);
@@ -597,7 +621,7 @@ TEST_F(BufferPoolTest, BufferAllocation) {
   ASSERT_EQ(global_reservations_.GetReservation(), 0);
   // But freed memory is not released to the system immediately.
   EXPECT_EQ(total_allocated, pool.GetSystemBytesAllocated());
-  EXPECT_EQ(num_buffers, pool.GetNumFreeBuffers());
+  EXPECT_EQ(NUM_BUFFERS, pool.GetNumFreeBuffers());
   EXPECT_EQ(total_allocated, pool.GetFreeBufferBytes());
   global_reservations_.Close();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/55582168/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 6a111ff..2d06f7b 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -226,12 +226,23 @@ Status BufferPool::ExtractBuffer(
 
 Status BufferPool::AllocateBuffer(
     ClientHandle* client, int64_t len, BufferHandle* handle) {
-  RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len));
+  RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len, true, nullptr));
   Status status = allocator_->Allocate(client, len, handle);
-  if (!status.ok()) {
-    // Allocation failed - update client's accounting to reflect the failure.
-    client->impl_->FreedBuffer(len);
-  }
+  // If the allocation failed, update client's accounting to reflect the failure.
+  if (!status.ok()) client->impl_->FreedBuffer(len);
+  return status;
+}
+
+Status BufferPool::AllocateUnreservedBuffer(
+    ClientHandle* client, int64_t len, BufferHandle* handle) {
+  DCHECK(!handle->is_open());
+  bool success;
+  RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len, false, &success));
+  if (!success) return Status::OK(); // Leave 'handle' closed to indicate failure.
+
+  Status status = allocator_->Allocate(client, len, handle);
+  // If the allocation failed, update client's accounting to reflect the failure.
+  if (!status.ok()) client->impl_->FreedBuffer(len);
   return status;
 }
 
@@ -546,14 +557,34 @@ Status BufferPool::Client::FinishMoveEvictedToPinned(Page* page) {
   return Status::OK();
 }
 
-Status BufferPool::Client::PrepareToAllocateBuffer(int64_t len) {
-  unique_lock<mutex> lock(lock_);
-  // Clean enough pages to allow allocation to proceed without violating our eviction
-  // policy. This can fail, so only update the accounting once success is ensured.
-  RETURN_IF_ERROR(CleanPages(&lock, len));
-  reservation_.AllocateFrom(len);
-  buffers_allocated_bytes_ += len;
-  DCHECK_CONSISTENCY();
+Status BufferPool::Client::PrepareToAllocateBuffer(
+    int64_t len, bool reserved, bool* success) {
+  if (success != nullptr) *success = false;
+  // Don't need to hold the client's 'lock_' yet because 'reservation_' operations are
+  // threadsafe.
+  if (reserved) {
+    // The client must have already reserved the memory.
+    reservation_.AllocateFrom(len);
+  } else {
+    DCHECK(success != nullptr);
+    // The client may not have reserved the memory.
+    if (!reservation_.IncreaseReservationToFitAndAllocate(len)) return Status::OK();
+  }
+
+  {
+    unique_lock<mutex> lock(lock_);
+    // Clean enough pages to allow allocation to proceed without violating our eviction
+    // policy.
+    Status status = CleanPages(&lock, len);
+    if (!status.ok()) {
+      // Reverse the allocation.
+      reservation_.ReleaseTo(len);
+      return status;
+    }
+    buffers_allocated_bytes_ += len;
+    DCHECK_CONSISTENCY();
+  }
+  if (success != nullptr) *success = true;
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/55582168/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 5b98579..285aacb 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -239,6 +239,21 @@ class BufferPool : public CacheLineAligned {
   Status AllocateBuffer(
       ClientHandle* client, int64_t len, BufferHandle* handle) WARN_UNUSED_RESULT;
 
+  /// Like AllocateBuffer(), except used when the client may not have the reservation
+  /// to allocate the buffer. Tries to increase reservation on the behalf of the client
+  /// if needed to allocate the buffer. If the reservation isn't available, 'handle'
+  /// isn't opened and OK is returned. If an unexpected error occurs, an error is
+  /// returned and any reservation increase remains in effect. Safe to call concurrently
+  /// with any other operations for 'client', except for operations on the same 'handle'.
+  ///
+  /// This function is a transitional mechanism for components to allocate memory from
+  /// the buffer pool without implementing the reservation accounting required to operate
+  /// within a predetermined memory constraint. Wherever possible, clients should reserve
+  /// memory ahead of time and allocate out of that instead of relying on this "best
+  /// effort" interface.
+  Status AllocateUnreservedBuffer(
+      ClientHandle* client, int64_t len, BufferHandle* handle) WARN_UNUSED_RESULT;
+
   /// If 'handle' is open, close 'handle', free the buffer and decrease the reservation
   /// usage from 'client'. Idempotent. Safe to call concurrently with other operations
   /// for 'client', except for operations on the same 'handle'.

http://git-wip-us.apache.org/repos/asf/impala/blob/55582168/be/src/runtime/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc b/be/src/runtime/bufferpool/reservation-tracker.cc
index aba5dce..f0e1839 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -141,6 +141,14 @@ bool ReservationTracker::IncreaseReservationToFit(int64_t bytes, Status* error_s
   return IncreaseReservationInternalLocked(bytes, true, false, error_status);
 }
 
+bool ReservationTracker::IncreaseReservationToFitAndAllocate(
+    int64_t bytes, Status* error_status) {
+  lock_guard<SpinLock> l(lock_);
+  if (!IncreaseReservationInternalLocked(bytes, true, false, error_status)) return false;
+  AllocateFromLocked(bytes);
+  return true;
+}
+
 bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
     bool use_existing_reservation, bool is_child_reservation, Status* error_status) {
   DCHECK(initialized_);
@@ -359,6 +367,10 @@ vector<ReservationTracker*> ReservationTracker::FindPathToRoot() {
 
 void ReservationTracker::AllocateFrom(int64_t bytes) {
   lock_guard<SpinLock> l(lock_);
+  AllocateFromLocked(bytes);
+}
+
+void ReservationTracker::AllocateFromLocked(int64_t bytes) {
   DCHECK(initialized_);
   DCHECK_GE(bytes, 0);
   DCHECK_LE(bytes, unused_reservation());

http://git-wip-us.apache.org/repos/asf/impala/blob/55582168/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h
index ff4b77e..3bf2de1 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -129,8 +129,13 @@ class ReservationTracker {
   /// Returns true if the reservation increase was successful or not necessary. Otherwise
   /// returns false and if 'error_status' is non-null, it returns an appropriate status
   /// message in it.
-  bool IncreaseReservationToFit(int64_t bytes, Status* error_status = nullptr)
-      WARN_UNUSED_RESULT;
+  bool IncreaseReservationToFit(
+      int64_t bytes, Status* error_status = nullptr) WARN_UNUSED_RESULT;
+
+  /// Like IncreaseReservationToFit(), except 'bytes' is also allocated from
+  /// the reservation on success.
+  bool IncreaseReservationToFitAndAllocate(
+      int64_t bytes, Status* error_status = nullptr) WARN_UNUSED_RESULT;
 
   /// Decrease reservation by 'bytes' on this tracker and all ancestors. This tracker's
   /// reservation must be at least 'bytes' before calling this method.
@@ -247,6 +252,9 @@ class ReservationTracker {
   /// 'lock_' must be held by caller.
   void CheckConsistency() const;
 
+  /// Same as AllocateFrom() except 'lock_' must be held by caller.
+  void AllocateFromLocked(int64_t bytes);
+
   /// Increase or decrease 'used_reservation_' and update profile counters accordingly.
   /// 'lock_' must be held by caller.
   void UpdateUsedReservation(int64_t delta);


[4/4] impala git commit: IMPALA-6416: extend Thread::Create to track instance id

Posted by ta...@apache.org.
IMPALA-6416: extend Thread::Create to track instance id

This commit builds upon IMPALA-3703. Each thread that
was created through Thread::Create() has a ThreadDebugInfo
object on the stack frame of Thread::SuperviseThread().
This object has stack allocated char buffers that can be
read during a debug session even if we only have minidumps.

However, with the old solution ThreadDebugInfo::instance_id
was set manually for each thread. It is too easy to forget
to set instance_id every time we create a new thread.

This commit has the assumption that if a thread has an
instance id associated, then the threads spawned by it will
always work on the same instance id. In Thread::StartThread
the parent thread passes its ThreadDebugInfo object to
its child who copies the instance id, and also stores the
name and system thread id of its parent.

This means if we set ThreadDebugInfo::instance_id in some
"root thread", then all descendant threads will annotate
themselves with the instance id automatically. Since threads
also record the name (and a system thread id) of their parent,
it might be also possible to reconstruct the thread creation
graph.

With GDB I tested if it copies the instance id at every
place where we previously needed to set it manually.

I added an automated test to thread-debug-info-test.cc

Change-Id: I27de2962cf0b224c17b685d77dcba3bf2e9db187
Reviewed-on: http://gerrit.cloudera.org:8080/9053
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/354293d8
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/354293d8
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/354293d8

Branch: refs/heads/2.x
Commit: 354293d8c4ca0f8c874b88cfbc1fee54e82877e8
Parents: cd7dc4b
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
Authored: Thu Jan 18 17:03:32 2018 +0100
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 15 22:10:15 2018 +0000

----------------------------------------------------------------------
 be/src/common/thread-debug-info-test.cc   | 31 ++++++++++++++++++++++++
 be/src/common/thread-debug-info.cc        |  1 -
 be/src/common/thread-debug-info.h         | 33 +++++++++++++++++++++-----
 be/src/exec/blocking-join-node.cc         |  2 --
 be/src/exec/hdfs-scan-node.cc             |  7 +-----
 be/src/runtime/fragment-instance-state.cc |  6 +----
 be/src/util/thread.cc                     | 10 ++++----
 be/src/util/thread.h                      | 10 +++++++-
 8 files changed, 75 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/354293d8/be/src/common/thread-debug-info-test.cc
----------------------------------------------------------------------
diff --git a/be/src/common/thread-debug-info-test.cc b/be/src/common/thread-debug-info-test.cc
index 1700c48..c2ac4bb 100644
--- a/be/src/common/thread-debug-info-test.cc
+++ b/be/src/common/thread-debug-info-test.cc
@@ -19,6 +19,7 @@
 
 #include "common/thread-debug-info.h"
 #include "testutil/gtest-util.h"
+#include "util/thread.h"
 
 #include "common/names.h"
 
@@ -66,6 +67,36 @@ TEST(ThreadDebugInfo, Global) {
   EXPECT_EQ(&thread_debug_info, global_thread_debug_info);
 }
 
+TEST(ThreadDebugInfo, ThreadCreateRelationships) {
+  // Checks if child thread extracts debug info from parent automatically.
+  // Child's thread name is given in Thread::Create
+  // Child's instance_id_ should be the same as parent's instance_id_
+  // Child should store a copy of its parent's thread name.
+  // Child should store its parent's system thread id.
+  string parent_name = "Parent";
+  string child_name = "Child";
+
+  ThreadDebugInfo parent_tdi;
+  parent_tdi.SetThreadName(parent_name);
+  TUniqueId uid;
+  uid.hi = 123;
+  uid.lo = 456;
+  parent_tdi.SetInstanceId(uid);
+
+  std::unique_ptr<Thread> child_thread;
+  auto f = [uid, child_name, parent_name, &parent_tdi]() {
+    // In child's thread the global ThreadDebugInfo object points to the child's own
+    // ThreadDebugInfo object which was automatically created in Thread::SuperviseThread
+    ThreadDebugInfo* child_tdi = GetThreadDebugInfo();
+    EXPECT_EQ(child_name, child_tdi->GetThreadName());
+    EXPECT_EQ(PrintId(uid), child_tdi->GetInstanceId());
+    EXPECT_EQ(parent_name, child_tdi->GetParentThreadName());
+    EXPECT_EQ(parent_tdi.GetSystemThreadId(), child_tdi->GetParentSystemThreadId());
+  };
+  ASSERT_OK(Thread::Create("Test", child_name, f, &child_thread));
+  child_thread->Join();
+}
+
 }
 
 IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/impala/blob/354293d8/be/src/common/thread-debug-info.cc
----------------------------------------------------------------------
diff --git a/be/src/common/thread-debug-info.cc b/be/src/common/thread-debug-info.cc
index 5e642e6..b775dc0 100644
--- a/be/src/common/thread-debug-info.cc
+++ b/be/src/common/thread-debug-info.cc
@@ -33,7 +33,6 @@ void ThreadDebugInfo::CloseThreadDebugInfo() {
 }
 
 ThreadDebugInfo* GetThreadDebugInfo() {
-  DCHECK(thread_debug_info != nullptr);
   return thread_debug_info;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/354293d8/be/src/common/thread-debug-info.h
----------------------------------------------------------------------
diff --git a/be/src/common/thread-debug-info.h b/be/src/common/thread-debug-info.h
index 6219ee8..d5100c0 100644
--- a/be/src/common/thread-debug-info.h
+++ b/be/src/common/thread-debug-info.h
@@ -19,13 +19,14 @@
 #define IMPALA_COMMON_THREAD_DEBUG_INFO_H
 
 #include <string>
+#include <sys/syscall.h>
+#include <unistd.h>
 
 #include "glog/logging.h"
 #include "gutil/macros.h"
+#include "gutil/strings/util.h"
 #include "util/debug-util.h"
 
-#include "common/names.h"
-
 namespace impala {
 
 /// Stores information about the current thread that can be useful in a debug session.
@@ -39,6 +40,8 @@ public:
   /// Only one ThreadDebugInfo object can be alive per thread at a time.
   /// This object is not copyable, nor movable
   ThreadDebugInfo() {
+    system_thread_id_ = syscall(SYS_gettid);
+
     // This call makes the global (thread local) pointer point to this object.
     InitializeThreadDebugInfo(this);
   }
@@ -50,10 +53,13 @@ public:
 
   const char* GetInstanceId() const { return instance_id_; }
   const char* GetThreadName() const { return thread_name_; }
+  int64_t GetSystemThreadId() const { return system_thread_id_; }
+  int64_t GetParentSystemThreadId() const { return parent_.system_thread_id_; }
+  const char* GetParentThreadName() const { return parent_.thread_name_; }
 
   /// Saves the string representation of param 'instance_id' to member 'instance_id_'
   void SetInstanceId(const TUniqueId& instance_id) {
-    string id_str = PrintId(instance_id);
+    std::string id_str = PrintId(instance_id);
     DCHECK_LT(id_str.length(), TUNIQUE_ID_STRING_SIZE);
     id_str.copy(instance_id_, id_str.length());
   }
@@ -62,7 +68,7 @@ public:
   /// If the length of param 'thread_name' is larger than THREAD_NAME_SIZE,
   /// we store the front of 'thread_name' + '...' + the last few bytes
   /// of thread name, e.g.: "Long Threadname with more te...001afec4)"
-  void SetThreadName(const string& thread_name) {
+  void SetThreadName(const std::string& thread_name) {
     const int64_t length = thread_name.length();
 
     if (length < THREAD_NAME_SIZE) {
@@ -81,6 +87,13 @@ public:
     }
   }
 
+  void SetParentInfo(const ThreadDebugInfo* parent) {
+    if (parent == nullptr) return;
+    parent_.system_thread_id_ = parent->system_thread_id_;
+    strings::strlcpy(instance_id_, parent->instance_id_, TUNIQUE_ID_STRING_SIZE);
+    strings::strlcpy(parent_.thread_name_, parent->thread_name_, THREAD_NAME_SIZE);
+  }
+
 private:
   /// Initializes a thread local pointer with thread_debug_info.
   static void InitializeThreadDebugInfo(ThreadDebugInfo* thread_debug_info);
@@ -91,14 +104,22 @@ private:
   static constexpr int64_t THREAD_NAME_SIZE = 256;
   static constexpr int64_t THREAD_NAME_TAIL_LENGTH = 8;
 
-  char instance_id_[TUNIQUE_ID_STRING_SIZE] = {};
+  /// This struct contains information we want to store about the parent.
+  struct ParentInfo {
+    int64_t system_thread_id_ = 0;
+    char thread_name_[THREAD_NAME_SIZE] = {};
+  };
+
+  ParentInfo parent_;
+  int64_t system_thread_id_ = 0;
   char thread_name_[THREAD_NAME_SIZE] = {};
+  char instance_id_[TUNIQUE_ID_STRING_SIZE] = {};
 
   DISALLOW_COPY_AND_ASSIGN(ThreadDebugInfo);
 };
 
 /// Returns a pointer to the ThreadDebugInfo object for this thread.
-/// The ThreadDebugInfo object needs to be created before this function is called.
+/// Returns nullptr if there is no ThreadDebugInfo object for the current thread.
 ThreadDebugInfo* GetThreadDebugInfo();
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/354293d8/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 944aba8..7adea7f 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -19,7 +19,6 @@
 
 #include <sstream>
 
-#include "common/thread-debug-info.h"
 #include "exec/data-sink.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/fragment-instance-state.h"
@@ -196,7 +195,6 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
     unique_ptr<Thread> build_thread;
     Status thread_status = Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
         thread_name, [this, state, build_sink, status=&build_side_status]() {
-          GetThreadDebugInfo()->SetInstanceId(state->fragment_instance_id());
           ProcessBuildInputAsync(state, build_sink, status);
         }, &build_thread, true);
     if (!thread_status.ok()) {

http://git-wip-us.apache.org/repos/asf/impala/blob/354293d8/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 235c799..710a8af 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -21,7 +21,6 @@
 #include <sstream>
 
 #include "common/logging.h"
-#include "common/thread-debug-info.h"
 #include "exec/base-sequence-scanner.h"
 #include "exec/hdfs-scanner.h"
 #include "exec/scanner-context.h"
@@ -348,11 +347,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
         PrintId(runtime_state_->fragment_instance_id()), id(),
         num_scanner_threads_started_counter_->value());
 
-    auto fn = [this]() {
-      RuntimeState* state = this->runtime_state();
-      GetThreadDebugInfo()->SetInstanceId(state->fragment_instance_id());
-      this->ScannerThread();
-    };
+    auto fn = [this]() { this->ScannerThread(); };
     std::unique_ptr<Thread> t;
     status =
       Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);

http://git-wip-us.apache.org/repos/asf/impala/blob/354293d8/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 91b33a7..c5c57a3 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -26,7 +26,6 @@
 #include <boost/date_time/posix_time/posix_time_types.hpp>
 
 #include "common/names.h"
-#include "common/thread-debug-info.h"
 #include "codegen/llvm-codegen.h"
 #include "exec/plan-root-sink.h"
 #include "exec/exec-node.h"
@@ -225,10 +224,7 @@ Status FragmentInstanceState::Prepare() {
     string thread_name = Substitute("profile-report (finst:$0)", PrintId(instance_id()));
     unique_lock<mutex> l(report_thread_lock_);
     RETURN_IF_ERROR(Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
-        thread_name, [this]() {
-          GetThreadDebugInfo()->SetInstanceId(this->instance_id());
-          this->ReportProfileThread();
-        }, &report_thread_, true));
+        thread_name, [this]() { this->ReportProfileThread(); }, &report_thread_, true));
     // Make sure the thread started up, otherwise ReportProfileThread() might get into
     // a race with StopReportThread().
     while (!report_thread_active_) report_thread_started_cv_.Wait(l);

http://git-wip-us.apache.org/repos/asf/impala/blob/354293d8/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index 8397f35..e3f07ba 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -311,7 +311,7 @@ Status Thread::StartThread(const std::string& category, const std::string& name,
   try {
     t->thread_.reset(
         new boost::thread(&Thread::SuperviseThread, t->name_, t->category_, functor,
-            &thread_started));
+            GetThreadDebugInfo(), &thread_started));
   } catch (boost::thread_resource_error& e) {
     return Status(TErrorCode::THREAD_CREATION_FAILED, name, category, e.what());
   }
@@ -327,7 +327,8 @@ Status Thread::StartThread(const std::string& category, const std::string& name,
 }
 
 void Thread::SuperviseThread(const string& name, const string& category,
-    Thread::ThreadFunctor functor, Promise<int64_t>* thread_started) {
+    Thread::ThreadFunctor functor, const ThreadDebugInfo* parent_thread_info,
+    Promise<int64_t>* thread_started) {
   int64_t system_tid = syscall(SYS_gettid);
   if (system_tid == -1) {
     string error_msg = GetStrErrMsg();
@@ -340,12 +341,13 @@ void Thread::SuperviseThread(const string& name, const string& category,
 
   // Use boost's get_id rather than the system thread ID as the unique key for this thread
   // since the latter is more prone to being recycled.
-
   thread_mgr_ref->AddThread(this_thread::get_id(), name_copy, category_copy, system_tid);
-  thread_started->Set(system_tid);
 
   ThreadDebugInfo thread_debug_info;
   thread_debug_info.SetThreadName(name_copy);
+  thread_debug_info.SetParentInfo(parent_thread_info);
+
+  thread_started->Set(system_tid);
 
   // Any reference to any parameter not copied in by value may no longer be valid after
   // this point, since the caller that is waiting on *tid != 0 may wake, take the lock and

http://git-wip-us.apache.org/repos/asf/impala/blob/354293d8/be/src/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index 6898517..b114e26 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -32,6 +32,7 @@
 namespace impala {
 
 class MetricGroup;
+class ThreadDebugInfo;
 class Webserver;
 
 /// Thin wrapper around boost::thread that can register itself with the singleton
@@ -179,8 +180,15 @@ class Thread {
   /// As a result, the 'functor' parameter is deliberately copied into this method, since
   /// it is used after the notification completes.h The tid parameter is written to
   /// exactly once before SuperviseThread() notifies the caller.
+  ///
+  /// parent_thread_info points to the parent thread's ThreadDebugInfo object if the
+  /// parent has one, otherwise it's a nullptr. As part of the initialisation
+  /// SuperviseThread() copies the useful information from the parent's ThreadDebugInfo
+  /// info object to its own TDI object. This way the TDI objects can preserve the thread
+  /// creation graph.
   static void SuperviseThread(const std::string& name, const std::string& category,
-      ThreadFunctor functor, Promise<int64_t>* thread_started);
+      Thread::ThreadFunctor functor, const ThreadDebugInfo* parent_thread_info,
+      Promise<int64_t>* thread_started);
 };
 
 /// Utility class to group together a set of threads. A replacement for


[3/4] impala git commit: IMPALA-6512: Fix test_exchange_delays for KRPC

Posted by ta...@apache.org.
IMPALA-6512: Fix test_exchange_delays for KRPC

The sender timed out error message diverges between Thrift and KRPC
slightly due to the source address being not readily available with
Thrift RPC implementation. This leads to failure in test_exchange_delays
when KRPC is enabled.

This change fixes the problem by shortening the error message string
to match against.

Testing done: Tested with KRPC enabled in the code and verified the tests passed.

Change-Id: Idd9410381dbb931231c92f084917265e5067b4c9
Reviewed-on: http://gerrit.cloudera.org:8080/9331
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/cd7dc4be
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/cd7dc4be
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/cd7dc4be

Branch: refs/heads/2.x
Commit: cd7dc4bee4e728e33f33354080fdeded845486ec
Parents: 774656a
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Feb 14 16:54:39 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 15 11:10:17 2018 +0000

----------------------------------------------------------------------
 .../queries/QueryTest/exchange-delays-zero-rows.test               | 2 +-
 .../functional-query/queries/QueryTest/exchange-delays.test        | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/cd7dc4be/testdata/workloads/functional-query/queries/QueryTest/exchange-delays-zero-rows.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/exchange-delays-zero-rows.test b/testdata/workloads/functional-query/queries/QueryTest/exchange-delays-zero-rows.test
index 79b7db4..9c8a096 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/exchange-delays-zero-rows.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/exchange-delays-zero-rows.test
@@ -7,5 +7,5 @@ where l_linenumber = -1
 group by l_orderkey
 ---- RESULTS
 ---- CATCH
-Sender timed out waiting for receiver fragment instance
+timed out waiting for receiver fragment instance
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/cd7dc4be/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test b/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test
index 0dac1d9..716085a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/exchange-delays.test
@@ -6,5 +6,5 @@ from tpch.lineitem
 ---- RESULTS
 6001215
 ---- CATCH
-Sender timed out waiting for receiver fragment instance
+timed out waiting for receiver fragment instance
 ====