You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jb...@apache.org on 2017/09/07 03:50:19 UTC

[1/8] incubator-impala git commit: IMPALA-5892: Allow reporting status independent of fragment instance

Repository: incubator-impala
Updated Branches:
  refs/heads/master 545eab6d6 -> e993b9712


IMPALA-5892: Allow reporting status independent of fragment instance

Queries can hit an error that is not specific to a
particular fragment instance. For example, QueryState::StartFInstances()
calls DescriptorTbl::Create() before any fragment instances
start. This location has no reason to report status via a
particular fragment, and there is currently no way to report
status otherwise. This leads to a query hang, because the
status is never propagated back to the coordinator.

This adds the ability to report status that is not associated
with a particular fragment instance. By reporting status,
the coordinator will now correctly abort the query in the
case of the QueryState::StartFInstances() scenario described.

Change-Id: I4cd98022f1d62a999c7c80ff5474fa8d069eb12c
Reviewed-on: http://gerrit.cloudera.org:8080/7943
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/91f7bc19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/91f7bc19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/91f7bc19

Branch: refs/heads/master
Commit: 91f7bc1947c1800e689fee040d4820fd8dbf94e4
Parents: 545eab6
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Fri Sep 1 14:50:45 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 6 21:26:24 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/coordinator-backend-state.cc | 14 ++++++++++-
 be/src/runtime/coordinator-backend-state.h  | 21 +++++++++++++---
 be/src/runtime/coordinator.cc               | 32 ++++++++++++++----------
 be/src/runtime/coordinator.h                | 15 +++++++----
 be/src/runtime/query-state.cc               |  1 +
 common/thrift/ImpalaInternalService.thrift  | 12 +++++++++
 6 files changed, 73 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 34e0671..1b7fd20 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -211,9 +211,12 @@ void Coordinator::BackendState::Exec(
   VLOG_FILE << "rpc succeeded: ExecQueryFInstances query_id=" << PrintId(query_id_);
 }
 
-Status Coordinator::BackendState::GetStatus(TUniqueId* failed_instance_id) {
+Status Coordinator::BackendState::GetStatus(bool* is_fragment_failure,
+    TUniqueId* failed_instance_id) {
   lock_guard<mutex> l(lock_);
+  DCHECK_EQ(is_fragment_failure == nullptr, failed_instance_id == nullptr);
   if (!status_.ok() && failed_instance_id != nullptr) {
+    *is_fragment_failure = is_fragment_failure_;
     *failed_instance_id = failed_instance_id_;
   }
   return status_;
@@ -278,6 +281,7 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
       if (status_.ok() || status_.IsCancelled()) {
         status_ = instance_status;
         failed_instance_id_ = instance_exec_status.fragment_instance_id;
+        is_fragment_failure_ = true;
       }
     }
     DCHECK_GT(num_remaining_instances_, 0);
@@ -302,6 +306,14 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
     }
   }
 
+  // status_ has incorporated the status from all fragment instances. If the overall
+  // backend status is not OK, but no specific fragment instance reported an error, then
+  // this is a general backend error. Incorporate the general error into status_.
+  Status overall_backend_status(backend_exec_status.status);
+  if (!overall_backend_status.ok() && (status_.ok() || status_.IsCancelled())) {
+    status_ = overall_backend_status;
+  }
+
   // Log messages aggregated by type
   if (backend_exec_status.__isset.error_log && backend_exec_status.error_log.size() > 0) {
     // Append the log messages from each update with the global state of the query

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index ccc3618..4ea2f33 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -90,9 +90,19 @@ class Coordinator::BackendState {
   /// if cancellation was attempted, false otherwise.
   bool Cancel();
 
-  /// Return the overall execution status. For an error status, also return the id
-  /// of the instance that caused that status, if failed_instance_id != nullptr.
-  Status GetStatus(TUniqueId* failed_instance_id = nullptr) WARN_UNUSED_RESULT;
+  /// Return the overall execution status. For an error status, the error could come
+  /// from the fragment instance level or it can be a general error from the backend
+  /// (with no specific fragment responsible). For a caller to distinguish between
+  /// these errors and to determine the specific fragment instance (if applicable),
+  /// both 'is_fragment_failure' and 'failed_instance_id' must be non-null.
+  /// A general error will set *is_fragment_failure to false and leave
+  /// failed_instance_id untouched.
+  /// A fragment-specific error will set *is_fragment_failure to true and set
+  /// *failed_instance_id to the id of the fragment instance that failed.
+  /// If the caller does not need this information, both 'is_fragment_failure' and
+  /// 'failed_instance_id' must be omitted (using the default value of nullptr).
+  Status GetStatus(bool* is_fragment_failure = nullptr,
+      TUniqueId* failed_instance_id = nullptr) WARN_UNUSED_RESULT;
 
   /// Return peak memory consumption.
   int64_t GetPeakConsumption();
@@ -199,6 +209,11 @@ class Coordinator::BackendState {
   /// initiated; either way, execution must not be cancelled.
   Status status_;
 
+  /// Used to distinguish between errors reported by a specific fragment instance,
+  /// which would set failed_instance_id_, rather than an error independent of any
+  /// specific fragment.
+  bool is_fragment_failure_ = false;
+
   /// Id of the first fragment instance that reports an error status.
   /// Invalid if no fragment instance has reported an error status.
   TUniqueId failed_instance_id_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 029e0bc..c8df1f5 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -471,8 +471,8 @@ Status Coordinator::GetStatus() {
   return query_status_;
 }
 
-Status Coordinator::UpdateStatus(const Status& status, const TUniqueId& instance_id,
-    const string& instance_hostname) {
+  Status Coordinator::UpdateStatus(const Status& status, const string& backend_hostname,
+     bool is_fragment_failure, const TUniqueId& instance_id) {
   {
     lock_guard<mutex> l(lock_);
 
@@ -490,10 +490,14 @@ Status Coordinator::UpdateStatus(const Status& status, const TUniqueId& instance
     CancelInternal();
   }
 
-  // Log the id of the fragment that first failed so we can track it down more easily.
-  VLOG_QUERY << "Query id=" << query_id() << " failed because instance id="
-             << instance_id << " on host=" << instance_hostname << " failed.";
-
+  if (is_fragment_failure) {
+    // Log the id of the fragment that first failed so we can track it down more easily.
+    VLOG_QUERY << "Query id=" << query_id() << " failed because instance id="
+               << instance_id << " on host=" << backend_hostname << " failed.";
+  } else {
+    VLOG_QUERY << "Query id=" << query_id() << " failed due to error on host="
+               << backend_hostname;
+  }
   return query_status_;
 }
 
@@ -822,8 +826,8 @@ Status Coordinator::Wait() {
 
   if (stmt_type_ == TStmtType::QUERY) {
     DCHECK(coord_instance_ != nullptr);
-    return UpdateStatus(coord_instance_->WaitForOpen(),
-        runtime_state()->fragment_instance_id(), FLAGS_hostname);
+    return UpdateStatus(coord_instance_->WaitForOpen(), FLAGS_hostname, true,
+        runtime_state()->fragment_instance_id());
   }
 
   DCHECK_EQ(stmt_type_, TStmtType::DML);
@@ -867,8 +871,8 @@ Status Coordinator::GetNext(QueryResultSet* results, int max_rows, bool* eos) {
   // if there was an error, we need to return the query's error status rather than
   // the status we just got back from the local executor (which may well be CANCELLED
   // in that case).  Coordinator fragment failed in this case so we log the query_id.
-  RETURN_IF_ERROR(
-      UpdateStatus(status, runtime_state()->fragment_instance_id(), FLAGS_hostname));
+  RETURN_IF_ERROR(UpdateStatus(status, FLAGS_hostname, true,
+      runtime_state()->fragment_instance_id()));
 
   if (*eos) {
     returned_all_results_ = true;
@@ -950,11 +954,13 @@ Status Coordinator::UpdateBackendExecStatus(const TReportExecStatusParams& param
     // true (UpdateStatus() initiates cancellation, if it hasn't already been)
     // TODO: clarify control flow here, it's unclear we should even process this status
     // report if returned_all_results_ is true
+    bool is_fragment_failure;
     TUniqueId failed_instance_id;
-    Status status = backend_state->GetStatus(&failed_instance_id);
+    Status status = backend_state->GetStatus(&is_fragment_failure, &failed_instance_id);
     if (!status.ok() && !returned_all_results_) {
-      Status ignored = UpdateStatus(status, failed_instance_id,
-          TNetworkAddressToString(backend_state->impalad_address()));
+      Status ignored =
+          UpdateStatus(status, TNetworkAddressToString(backend_state->impalad_address()),
+              is_fragment_failure, failed_instance_id);
       return Status::OK();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 4edef88..e67ef13 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -363,12 +363,17 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
   void CancelInternal();
 
   /// Acquires lock_ and updates query_status_ with 'status' if it's not already
-  /// an error status, and returns the current query_status_.
+  /// an error status, and returns the current query_status_. The status may be
+  /// due to an error in a specific fragment instance, or it can be a general error
+  /// not tied to a specific fragment instance.
   /// Calls CancelInternal() when switching to an error status.
-  /// failed_fragment is the fragment_id that has failed, used for error reporting along
-  /// with instance_hostname.
-  Status UpdateStatus(const Status& status, const TUniqueId& failed_fragment,
-      const std::string& instance_hostname) WARN_UNUSED_RESULT;
+  /// When an error is due to a specific fragment instance, 'is_fragment_failure' must
+  /// be true and 'failed_fragment' is the fragment_id that has failed, used for error
+  /// reporting. For a general error not tied to a specific instance,
+  /// 'is_fragment_failure' must be false and 'failed_fragment' will be ignored.
+  /// 'backend_hostname' is used for error reporting in either case.
+  Status UpdateStatus(const Status& status, const std::string& backend_hostname,
+      bool is_fragment_failure, const TUniqueId& failed_fragment) WARN_UNUSED_RESULT;
 
   /// Update per_partition_status_ and files_to_move_.
   void UpdateInsertExecStatus(const TInsertExecStatus& insert_exec_status);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 5ac4998..4311e27 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -207,6 +207,7 @@ void QueryState::ReportExecStatusAux(bool done, const Status& status,
   params.__set_query_id(query_ctx().query_id);
   DCHECK(rpc_params().__isset.coord_state_idx);
   params.__set_coord_state_idx(rpc_params().coord_state_idx);
+  status.SetTStatus(&params);
 
   if (fis != nullptr) {
     // create status for 'fis'

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/91f7bc19/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 39df289..db6ffbb 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -624,6 +624,18 @@ struct TReportExecStatusParams {
   // New errors that have not been reported to the coordinator by any of the
   // instances included in instance_exec_status
   6: optional map<ErrorCodes.TErrorCode, TErrorLogEntry> error_log;
+
+  // Cumulative status for this backend. A backend can have an error from a specific
+  // fragment instance, or it can have a general error that is independent of any
+  // individual fragment. If reporting a single error, this status is always set to
+  // the error being reported. If reporting multiple errors, the status is set by the
+  // following rules:
+  // 1. A general error takes precedence over any fragment instance error.
+  // 2. Any fragment instance error takes precedence over any cancelled status.
+  // 3. If multiple fragments have errors, prefer the error that comes first in the
+  // 'instance_exec_status' list.
+  // This status is only OK if all fragment instances included are OK.
+  7: optional Status.TStatus status;
 }
 
 struct TReportExecStatusResult {


[4/8] incubator-impala git commit: IMPALA-5902: add ThreadSanitizer build

Posted by jb...@apache.org.
IMPALA-5902: add ThreadSanitizer build

This is sufficient to get Impala to come up and run queries with
thread sanitizer enabled.

I have not triaged or fixed the data races that are reported, that
is left for follow-on work.

Change-Id: I22f8faeefa5e157279c5973fe28bc573b7606d50
Reviewed-on: http://gerrit.cloudera.org:8080/7977
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b1edaf21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b1edaf21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b1edaf21

Branch: refs/heads/master
Commit: b1edaf215e537d8cef5ffb305973b6c5baa63583
Parents: be98aaa
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Aug 31 23:18:20 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Sep 7 01:22:41 2017 +0000

----------------------------------------------------------------------
 CMakeLists.txt                                |  3 ++-
 be/CMakeLists.txt                             | 16 ++++++++++---
 be/src/common/init.cc                         | 18 +++++++-------
 be/src/gutil/atomicops-internals-tsan.h       | 18 ++++++++++++++
 be/src/runtime/bufferpool/system-allocator.cc |  2 +-
 be/src/runtime/exec-env.cc                    |  2 +-
 be/src/runtime/query-exec-mgr.cc              |  4 ++--
 be/src/util/default-path-handlers.cc          |  6 ++---
 be/src/util/memory-metrics.cc                 | 10 ++++----
 be/src/util/memory-metrics.h                  | 16 ++++++-------
 be/src/util/metrics-test.cc                   |  2 +-
 be/src/util/pprof-path-handlers.cc            | 12 +++++-----
 bin/make_impala.sh                            |  3 ++-
 bin/run-backend-tests.sh                      |  1 +
 bin/start-catalogd.sh                         |  1 +
 bin/start-impalad.sh                          |  1 +
 bin/start-statestored.sh                      |  1 +
 buildall.sh                                   |  7 ++++++
 common/thrift/metrics.json                    |  6 ++---
 tests/common/environ.py                       | 28 +++++++++++++++-------
 20 files changed, 105 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index e5c2fdb..e8a2355 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -209,7 +209,8 @@ find_package(LlvmBinaries REQUIRED)
 if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG"
     OR "${CMAKE_BUILD_TYPE}" STREQUAL "ADDRESS_SANITIZER"
     OR "${CMAKE_BUILD_TYPE}" STREQUAL "TIDY"
-    OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN")
+    OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN"
+    OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
   # Use the LLVM libaries with assertions for debug builds.
   set(LLVM_ROOT ${LLVM_DEBUG_ROOT})
 endif()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 5b0d89d..bf7aa26 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -110,6 +110,11 @@ SET(CXX_FLAGS_UBSAN "${CXX_FLAGS_UBSAN} -fno-wrapv")
 # To ease debugging, turn off all optimizations:
 SET(CXX_FLAGS_UBSAN "${CXX_FLAGS_UBSAN} -O0")
 
+# Set the flags to the thread sanitizer, also known as "tsan"
+# Turn on sanitizer and debug symbols to get stack traces:
+SET(CXX_FLAGS_TSAN "${CXX_CLANG_FLAGS} -O1 -ggdb3 -fno-omit-frame-pointer")
+SET(CXX_FLAGS_TSAN "${CXX_FLAGS_TSAN} -fsanitize=thread -DTHREAD_SANITIZER")
+
 SET(CXX_FLAGS_TIDY "${CXX_CLANG_FLAGS}")
 # Catching unused variables requires an optimization level greater than 0
 SET(CXX_FLAGS_TIDY "${CXX_FLAGS_TIDY} -O1")
@@ -136,6 +141,8 @@ elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TIDY")
   SET(CMAKE_CXX_FLAGS "${CXX_FLAGS_TIDY}")
 elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN")
   SET(CMAKE_CXX_FLAGS "${CXX_FLAGS_UBSAN}")
+elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
+  SET(CMAKE_CXX_FLAGS "${CXX_FLAGS_TSAN}")
 else()
   message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
 endif()
@@ -159,7 +166,8 @@ if (CCACHE AND NOT DEFINED ENV{DISABLE_CCACHE})
   set(RULE_LAUNCH_PREFIX ccache)
   if ("${CMAKE_BUILD_TYPE}" STREQUAL "ADDRESS_SANITIZER"
       OR "${CMAKE_BUILD_TYPE}" STREQUAL "TIDY"
-      OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN")
+      OR "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN"
+      OR "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
     # Need to set CCACHE_CPP so that ccache calls clang with the original source file for
     # both preprocessing and compilation. Otherwise, ccache will use clang to preprocess
     # the file and then call clang with the preprocessed output if not cached. However,
@@ -294,7 +302,8 @@ add_definitions(-fPIC)
 # set compile output directory
 if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG" OR
     "${CMAKE_BUILD_TYPE}" STREQUAL "ADDRESS_SANITIZER" OR
-    "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN")
+    "${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN" OR
+    "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
   set(BUILD_OUTPUT_ROOT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/build/debug/")
 else()
   set(BUILD_OUTPUT_ROOT_DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}/build/release/")
@@ -436,7 +445,8 @@ set (IMPALA_LINK_LIBS ${IMPALA_LINK_LIBS}
 # sanitizer build. Address sanitizer is incompatible with tcmalloc (they both intercept
 # malloc/free)
 set (IMPALA_LINK_LIBS_NO_TCMALLOC ${IMPALA_LINK_LIBS})
-if (NOT "${CMAKE_BUILD_TYPE}" STREQUAL "ADDRESS_SANITIZER")
+if (NOT "${CMAKE_BUILD_TYPE}" STREQUAL "ADDRESS_SANITIZER" AND
+    NOT "${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
   set (IMPALA_LINK_LIBS ${IMPALA_LINK_LIBS} tcmallocstatic)
 endif()
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index f2df173..ce55067 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -137,17 +137,15 @@ static scoped_ptr<impala::Thread> pause_monitor;
       BufferPool* buffer_pool = env->buffer_pool();
       if (buffer_pool != nullptr) buffer_pool->Maintenance();
 
-#ifndef ADDRESS_SANITIZER
-      // When using tcmalloc, the process limit as measured by our trackers will
-      // be out of sync with the process usage. The metric is refreshed whenever
-      // memory is consumed or released via a MemTracker, so on a system with
-      // queries executing it will be refreshed frequently. However if the system
-      // is idle, we need to refresh the tracker occasionally since untracked
-      // memory may be allocated or freed, e.g. by background threads.
+      // The process limit as measured by our trackers may get out of sync with the
+      // process usage if memory is allocated or freed without updating a MemTracker.
+      // The metric is refreshed whenever memory is consumed or released via a MemTracker,
+      // so on a system with queries executing it will be refreshed frequently. However
+      // if the system is idle, we need to refresh the tracker occasionally since
+      // untracked memory may be allocated or freed, e.g. by background threads.
       if (env->process_mem_tracker() != nullptr) {
         env->process_mem_tracker()->RefreshConsumptionFromMetric();
       }
-#endif
     }
     // Periodically refresh values of the aggregate memory metrics to ensure they are
     // somewhat up-to-date.
@@ -198,7 +196,9 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
   impala::InitGoogleLoggingSafe(argv[0]);
   // Breakpad needs flags and logging to initialize.
   ABORT_IF_ERROR(RegisterMinidump(argv[0]));
+#ifndef THREAD_SANITIZER
   AtomicOps_x86CPUFeaturesInit();
+#endif
   impala::InitThreading();
   impala::TimestampParser::Init();
   impala::SeedOpenSSLRNG();
@@ -243,7 +243,7 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
 
   if (impala::KuduIsAvailable()) impala::InitKuduLogging();
 
-#ifndef ADDRESS_SANITIZER
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
   // tcmalloc and address sanitizer can not be used together
   if (FLAGS_enable_process_lifetime_heap_profiling) {
     HeapProfilerStart(FLAGS_heap_profile_dir.c_str());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/be/src/gutil/atomicops-internals-tsan.h
----------------------------------------------------------------------
diff --git a/be/src/gutil/atomicops-internals-tsan.h b/be/src/gutil/atomicops-internals-tsan.h
index aecaefc..a1fa71b 100644
--- a/be/src/gutil/atomicops-internals-tsan.h
+++ b/be/src/gutil/atomicops-internals-tsan.h
@@ -202,6 +202,24 @@ inline Atomic64 Release_CompareAndSwap(volatile Atomic64 *ptr,
   return cmp;
 }
 
+inline Atomic32 Barrier_CompareAndSwap(volatile Atomic32 *ptr,
+                                  Atomic32 old_value,
+                                  Atomic32 new_value) {
+  Atomic32 cmp = old_value;
+  __tsan_atomic32_compare_exchange_strong(ptr, &cmp, new_value,
+      __tsan_memory_order_acq_rel, __tsan_memory_order_relaxed);
+  return cmp;
+}
+
+inline Atomic64 Barrier_CompareAndSwap(volatile Atomic64 *ptr,
+                                  Atomic64 old_value,
+                                  Atomic64 new_value) {
+  Atomic64 cmp = old_value;
+  __tsan_atomic64_compare_exchange_strong(ptr, &cmp, new_value,
+      __tsan_memory_order_acq_rel, __tsan_memory_order_relaxed);
+  return cmp;
+}
+
 inline void MemoryBarrier() {
   __tsan_atomic_thread_fence(__tsan_memory_order_seq_cst);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/be/src/runtime/bufferpool/system-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/system-allocator.cc b/be/src/runtime/bufferpool/system-allocator.cc
index 8bbce70..b3ba2b8 100644
--- a/be/src/runtime/bufferpool/system-allocator.cc
+++ b/be/src/runtime/bufferpool/system-allocator.cc
@@ -46,7 +46,7 @@ static int64_t HUGE_PAGE_SIZE = 2LL * 1024 * 1024;
 SystemAllocator::SystemAllocator(int64_t min_buffer_len)
   : min_buffer_len_(min_buffer_len) {
   DCHECK(BitUtil::IsPowerOf2(min_buffer_len));
-#ifndef ADDRESS_SANITIZER
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
   // Free() assumes that aggressive decommit is enabled for TCMalloc.
   size_t aggressive_decommit_enabled;
   MallocExtension::instance()->GetNumericProperty(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 2907768..fe4825d 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -294,7 +294,7 @@ Status ExecEnv::StartServices() {
         BufferPoolMetric::UNUSED_RESERVATION_BYTES));
   obj_pool_->Add(new MemTracker(negated_unused_reservation, -1,
       "Buffer Pool: Unused Reservation", mem_tracker_.get()));
-#ifndef ADDRESS_SANITIZER
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
   // Aggressive decommit is required so that unused pages in the TCMalloc page heap are
   // not backed by physical pages and do not contribute towards memory consumption.
   size_t aggressive_decommit_enabled = 0;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 22c2826..901ddbd 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -111,8 +111,8 @@ QueryState* QueryExecMgr::GetOrCreateQueryState(
 void QueryExecMgr::StartQueryHelper(QueryState* qs) {
   qs->StartFInstances();
 
-#ifndef ADDRESS_SANITIZER
-  // tcmalloc and address sanitizer cannot be used together
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
+  // tcmalloc and address or thread sanitizer cannot be used together
   if (FLAGS_log_mem_usage_interval > 0) {
     uint64_t num_complete = ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->value();
     if (num_complete % FLAGS_log_mem_usage_interval == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/be/src/util/default-path-handlers.cc
----------------------------------------------------------------------
diff --git a/be/src/util/default-path-handlers.cc b/be/src/util/default-path-handlers.cc
index 9334316..88d23f1 100644
--- a/be/src/util/default-path-handlers.cc
+++ b/be/src/util/default-path-handlers.cc
@@ -135,8 +135,8 @@ void MemUsageHandler(MemTracker* mem_tracker, MetricGroup* metric_group,
   document->AddMember("consumption", consumption, document->GetAllocator());
 
   stringstream ss;
-#ifdef ADDRESS_SANITIZER
-  ss << "Memory tracking is not available with address sanitizer builds.";
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
+  ss << "Memory tracking is not available with address or thread sanitizer builds.";
 #else
   char buf[2048];
   MallocExtension::instance()->GetStats(buf, 2048);
@@ -238,7 +238,7 @@ void AddDefaultUrlCallbacks(
     webserver->RegisterUrlCallback("/memz", "memz.tmpl", callback);
   }
 
-#ifndef ADDRESS_SANITIZER
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
   // Remote (on-demand) profiling is disabled if the process is already being profiled.
   if (!FLAGS_enable_process_lifetime_heap_profiling) {
     AddPprofUrlCallbacks(webserver);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index 7c41bdf..d1e7a16 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -47,7 +47,7 @@ TcmallocMetric* TcmallocMetric::TOTAL_BYTES_RESERVED = nullptr;
 TcmallocMetric* TcmallocMetric::PAGEHEAP_UNMAPPED_BYTES = nullptr;
 TcmallocMetric::PhysicalBytesMetric* TcmallocMetric::PHYSICAL_BYTES_RESERVED = nullptr;
 
-AsanMallocMetric* AsanMallocMetric::BYTES_ALLOCATED = nullptr;
+SanitizerMallocMetric* SanitizerMallocMetric::BYTES_ALLOCATED = nullptr;
 
 BufferPoolMetric* BufferPoolMetric::LIMIT = nullptr;
 BufferPoolMetric* BufferPoolMetric::SYSTEM_ALLOCATED = nullptr;
@@ -81,10 +81,10 @@ Status impala::RegisterMemoryMetrics(MetricGroup* metrics, bool register_jvm_met
     used_metrics.push_back(BufferPoolMetric::SYSTEM_ALLOCATED);
   }
 
-#ifdef ADDRESS_SANITIZER
-  AsanMallocMetric::BYTES_ALLOCATED = metrics->RegisterMetric(
-      new AsanMallocMetric(MetricDefs::Get("asan-total-bytes-allocated")));
-  used_metrics.push_back(AsanMallocMetric::BYTES_ALLOCATED);
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
+  SanitizerMallocMetric::BYTES_ALLOCATED = metrics->RegisterMetric(
+      new SanitizerMallocMetric(MetricDefs::Get("sanitizer-total-bytes-allocated")));
+  used_metrics.push_back(SanitizerMallocMetric::BYTES_ALLOCATED);
 #else
   MetricGroup* tcmalloc_metrics = metrics->GetOrCreateChildGroup("tcmalloc");
   // We rely on TCMalloc for our global memory metrics, so skip setting them up

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index e1f1e4a..5491f8c 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -23,7 +23,7 @@
 #include <boost/bind.hpp>
 #include <boost/thread/mutex.hpp>
 #include <gperftools/malloc_extension.h>
-#ifdef ADDRESS_SANITIZER
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
 #include <sanitizer/allocator_interface.h>
 #endif
 
@@ -125,7 +125,7 @@ class TcmallocMetric : public IntGauge {
       : IntGauge(def, 0), tcmalloc_var_(tcmalloc_var) { }
 
   virtual void CalculateValue() {
-#ifndef ADDRESS_SANITIZER
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
     DCHECK_EQ(sizeof(size_t), sizeof(value_));
     MallocExtension::instance()->GetNumericProperty(tcmalloc_var_.c_str(),
         reinterpret_cast<size_t*>(&value_));
@@ -133,15 +133,15 @@ class TcmallocMetric : public IntGauge {
   }
 };
 
-/// Alternative to TCMallocMetric if we're running under Address Sanitizer, which
-/// does not provide the same metrics.
-class AsanMallocMetric : public IntGauge {
+/// Alternative to TCMallocMetric if we're running under a sanitizer that replaces
+/// malloc(), e.g. address or thread sanitizer.
+class SanitizerMallocMetric : public IntGauge {
  public:
-  AsanMallocMetric(const TMetricDef& def) : IntGauge(def, 0) {}
-  static AsanMallocMetric* BYTES_ALLOCATED;
+  SanitizerMallocMetric(const TMetricDef& def) : IntGauge(def, 0) {}
+  static SanitizerMallocMetric* BYTES_ALLOCATED;
  private:
   virtual void CalculateValue() override {
-#ifdef ADDRESS_SANITIZER
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
     value_ = __sanitizer_get_current_allocated_bytes();
 #endif
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/be/src/util/metrics-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/metrics-test.cc b/be/src/util/metrics-test.cc
index 08ca266..0126281 100644
--- a/be/src/util/metrics-test.cc
+++ b/be/src/util/metrics-test.cc
@@ -215,7 +215,7 @@ TEST_F(MetricsTest, StatsMetricsSingle) {
 }
 
 TEST_F(MetricsTest, MemMetric) {
-#ifndef ADDRESS_SANITIZER
+#if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
   MetricGroup metrics("MemMetrics");
   ASSERT_OK(RegisterMemoryMetrics(&metrics, false, nullptr, nullptr));
   // Smoke test to confirm that tcmalloc metrics are returning reasonable values.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/be/src/util/pprof-path-handlers.cc
----------------------------------------------------------------------
diff --git a/be/src/util/pprof-path-handlers.cc b/be/src/util/pprof-path-handlers.cc
index b8f245c..bc04b2c 100644
--- a/be/src/util/pprof-path-handlers.cc
+++ b/be/src/util/pprof-path-handlers.cc
@@ -54,9 +54,9 @@ void PprofCmdLineHandler(const Webserver::ArgumentMap& args, stringstream* outpu
 // by calling HeapProfileStart(filename), continue to do work, and then, some number of
 // seconds later, call GetHeapProfile() followed by HeapProfilerStop().
 void PprofHeapHandler(const Webserver::ArgumentMap& args, stringstream* output) {
-#ifdef ADDRESS_SANITIZER
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
   (void)PPROF_DEFAULT_SAMPLE_SECS; // Avoid unused variable warning.
-  (*output) << "Heap profiling is not available with address sanitizer builds.";
+  (*output) << "Heap profiling is not available with address/thread sanitizer builds.";
 #else
   Webserver::ArgumentMap::const_iterator it = args.find("seconds");
   int seconds = PPROF_DEFAULT_SAMPLE_SECS;
@@ -78,8 +78,8 @@ void PprofHeapHandler(const Webserver::ArgumentMap& args, stringstream* output)
 // The server should respond by calling ProfilerStart(), continuing to do its work,
 // and then, XX seconds later, calling ProfilerStop().
 void PprofCpuProfileHandler(const Webserver::ArgumentMap& args, stringstream* output) {
-#ifdef ADDRESS_SANITIZER
-  (*output) << "CPU profiling is not available with address sanitizer builds.";
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
+  (*output) << "CPU profiling is not available with address/thread sanitizer builds.";
 #else
   Webserver::ArgumentMap::const_iterator it = args.find("seconds");
   int seconds = PPROF_DEFAULT_SAMPLE_SECS;
@@ -106,8 +106,8 @@ void PprofCpuProfileHandler(const Webserver::ArgumentMap& args, stringstream* ou
 // The server should respond by calling:
 // MallocExtension::instance()->GetHeapGrowthStacks(&output);
 void PprofGrowthHandler(const Webserver::ArgumentMap& args, stringstream* output) {
-#ifdef ADDRESS_SANITIZER
-  (*output) << "Growth profiling is not available with address sanitizer builds.";
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
+  (*output) << "Growth profiling is not available with address/thread sanitizer builds.";
 #else
   string heap_growth_stack;
   MallocExtension::instance()->GetHeapGrowthStacks(&heap_growth_stack);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/bin/make_impala.sh
----------------------------------------------------------------------
diff --git a/bin/make_impala.sh b/bin/make_impala.sh
index 8d164c2..cf05278 100755
--- a/bin/make_impala.sh
+++ b/bin/make_impala.sh
@@ -149,7 +149,8 @@ then
 
       if [[ ("$TARGET_BUILD_TYPE" == "ADDRESS_SANITIZER") \
                 || ("$TARGET_BUILD_TYPE" == "TIDY") \
-                || ("$TARGET_BUILD_TYPE" == "UBSAN") ]]
+                || ("$TARGET_BUILD_TYPE" == "UBSAN") \
+                || ("$TARGET_BUILD_TYPE" == "TSAN") ]]
       then
         CMAKE_ARGS+=(-DCMAKE_TOOLCHAIN_FILE=$IMPALA_HOME/cmake_modules/clang_toolchain.cmake)
       else

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/bin/run-backend-tests.sh
----------------------------------------------------------------------
diff --git a/bin/run-backend-tests.sh b/bin/run-backend-tests.sh
index 98630b0..d4d3142 100755
--- a/bin/run-backend-tests.sh
+++ b/bin/run-backend-tests.sh
@@ -40,5 +40,6 @@ export CTEST_OUTPUT_ON_FAILURE=1
 export ASAN_OPTIONS="handle_segv=0 detect_leaks=0 allocator_may_return_null=1"
 export UBSAN_OPTIONS="print_stacktrace=1"
 UBSAN_OPTIONS="${UBSAN_OPTIONS} suppressions=${IMPALA_HOME}/bin/ubsan-suppressions.txt"
+export TSAN_OPTIONS="halt_on_error=1 history_size=7"
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
 "${MAKE_CMD:-make}" test ARGS="${BE_TEST_ARGS}"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/bin/start-catalogd.sh
----------------------------------------------------------------------
diff --git a/bin/start-catalogd.sh b/bin/start-catalogd.sh
index 3eeaf2e..4ec6846 100755
--- a/bin/start-catalogd.sh
+++ b/bin/start-catalogd.sh
@@ -73,5 +73,6 @@ fi
 export ASAN_OPTIONS="handle_segv=0 detect_leaks=0 allocator_may_return_null=1"
 export UBSAN_OPTIONS="print_stacktrace=1"
 UBSAN_OPTIONS="${UBSAN_OPTIONS} suppressions=${IMPALA_HOME}/bin/ubsan-suppressions.txt"
+export TSAN_OPTIONS="halt_on_error=0 history_size=7"
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
 exec ${BINARY_BASE_DIR}/${BUILD_TYPE}/catalog/catalogd ${CATALOGD_ARGS}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/bin/start-impalad.sh
----------------------------------------------------------------------
diff --git a/bin/start-impalad.sh b/bin/start-impalad.sh
index d4602b5..76a5f2c 100755
--- a/bin/start-impalad.sh
+++ b/bin/start-impalad.sh
@@ -101,5 +101,6 @@ fi
 export ASAN_OPTIONS="handle_segv=0 detect_leaks=0 allocator_may_return_null=1"
 export UBSAN_OPTIONS="print_stacktrace=1"
 UBSAN_OPTIONS="${UBSAN_OPTIONS} suppressions=${IMPALA_HOME}/bin/ubsan-suppressions.txt"
+export TSAN_OPTIONS="halt_on_error=0 history_size=7"
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
 exec ${TOOL_PREFIX} ${IMPALA_CMD} ${IMPALAD_ARGS}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/bin/start-statestored.sh
----------------------------------------------------------------------
diff --git a/bin/start-statestored.sh b/bin/start-statestored.sh
index 85fe221..02cf09f 100755
--- a/bin/start-statestored.sh
+++ b/bin/start-statestored.sh
@@ -62,5 +62,6 @@ fi
 export ASAN_OPTIONS="handle_segv=0 detect_leaks=0 allocator_may_return_null=1"
 export UBSAN_OPTIONS="print_stacktrace=1"
 UBSAN_OPTIONS="${UBSAN_OPTIONS} suppressions=${IMPALA_HOME}/bin/ubsan-suppressions.txt"
+export TSAN_OPTIONS="halt_on_error=0 history_size=7"
 export PATH="${IMPALA_TOOLCHAIN}/llvm-${IMPALA_LLVM_VERSION}/bin:${PATH}"
 exec ${BINARY_BASE_DIR}/${BUILD_TYPE}/statestore/statestored ${STATESTORED_ARGS}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/buildall.sh
----------------------------------------------------------------------
diff --git a/buildall.sh b/buildall.sh
index bcc6ae3..d14c2ea 100755
--- a/buildall.sh
+++ b/buildall.sh
@@ -60,6 +60,7 @@ BUILD_ASAN=0
 BUILD_FE_ONLY=0
 BUILD_TIDY=0
 BUILD_UBSAN=0
+BUILD_TSAN=0
 # Export MAKE_CMD so it is visible in scripts that invoke make, e.g. copy-udfs-udas.sh
 export MAKE_CMD=make
 LZO_CMAKE_ARGS=
@@ -118,6 +119,9 @@ do
     -ubsan)
       BUILD_UBSAN=1
       ;;
+    -tsan)
+      BUILD_TSAN=1
+      ;;
     -testpairwise)
       EXPLORATION_STRATEGY=pairwise
       ;;
@@ -268,6 +272,9 @@ fi
 if [[ ${BUILD_UBSAN} -eq 1 ]]; then
   CMAKE_BUILD_TYPE=UBSAN
 fi
+if [[ ${BUILD_TSAN} -eq 1 ]]; then
+  CMAKE_BUILD_TYPE=TSAN
+fi
 
 MAKE_IMPALA_ARGS+=" -build_type=${CMAKE_BUILD_TYPE}"
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 567e1bf..4ba94be 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1114,16 +1114,16 @@
     "key": "tcmalloc.total-bytes-reserved"
   },
   {
-    "description": "Bytes allocated from Address Sanitizer's malloc (Address Sanitizer debug builds only)",
+    "description": "Bytes allocated from the sanitizer malloc (Sanitizer debug builds only)",
     "contexts": [
       "STATESTORE",
       "CATALOGSERVER",
       "IMPALAD"
     ],
-    "label": "Address Sanitizer Malloc Bytes Allocated",
+    "label": "Sanitizer Malloc Bytes Allocated",
     "units": "BYTES",
     "kind": "GAUGE",
-    "key": "asan-total-bytes-allocated"
+    "key": "sanitizer-total-bytes-allocated"
   },
   {
     "description": "Maximum allowed bytes allocated by the buffer pool.",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b1edaf21/tests/common/environ.py
----------------------------------------------------------------------
diff --git a/tests/common/environ.py b/tests/common/environ.py
index 571dc65..48c4cba 100644
--- a/tests/common/environ.py
+++ b/tests/common/environ.py
@@ -35,7 +35,6 @@ except ImportError as e:
 
 LOG = logging.getLogger('tests.common.environ')
 
-
 test_start_cluster_args = os.environ.get("TEST_START_CLUSTER_ARGS", "")
 
 # Find the likely BuildType of the running Impala. Assume it's found through the path
@@ -58,8 +57,8 @@ IMPALAD_PATH = os.path.join(impalad_basedir, 'service', 'impalad')
 
 class SpecificImpaladBuildTypes:
   """
-  Represent a specific build type. In reality, there 5 specific build types. These
-  specific build types are needed by Python test code.
+  Represent a specific build type. These specific build types are needed by Python test
+  code.
 
   The specific build types and their *most distinguishing* compiler options are:
 
@@ -68,6 +67,7 @@ class SpecificImpaladBuildTypes:
   3. DEBUG_CODE_COVERAGE (gcc -ggdb -ftest-coverage)
   4. RELEASE (gcc)
   5. RELEASE_CODE_COVERAGE (gcc -ftest-coverage)
+  6. THREAD_SANITIZER (clang -fsanitize=thread)
   """
   # ./buildall.sh -asan
   ADDRESS_SANITIZER = 'address_sanitizer'
@@ -79,6 +79,8 @@ class SpecificImpaladBuildTypes:
   RELEASE = 'release'
   # ./buildall.sh -release -codecoverage
   RELEASE_CODE_COVERAGE = 'release_code_coverage'
+  # ./buildall.sh -tsan
+  THREAD_SANITIZER = 'thread_sanitizer'
 
 
 class ImpaladBuild(object):
@@ -111,6 +113,12 @@ class ImpaladBuild(object):
     """
     return self.specific_build_type == SpecificImpaladBuildTypes.ADDRESS_SANITIZER
 
+  def is_tsan(self):
+    """
+    Return whether the Impala under test was compiled with TSAN.
+    """
+    return self.specific_build_type == SpecificImpaladBuildTypes.THREAD_SANITIZER
+
   def is_dev(self):
     """
     Return whether the Impala under test is a development build (i.e., any debug or ASAN
@@ -118,14 +126,15 @@ class ImpaladBuild(object):
     """
     return self.specific_build_type in (
         SpecificImpaladBuildTypes.ADDRESS_SANITIZER, SpecificImpaladBuildTypes.DEBUG,
-        SpecificImpaladBuildTypes.DEBUG_CODE_COVERAGE)
+        SpecificImpaladBuildTypes.DEBUG_CODE_COVERAGE,
+        SpecificImpaladBuildTypes.THREAD_SANITIZER)
 
   def runs_slowly(self):
     """
     Return whether the Impala under test "runs slowly". For our purposes this means
-    either compiled with code coverage enabled or ASAN.
+    either compiled with code coverage enabled, ASAN or TSAN.
     """
-    return self.has_code_coverage() or self.is_asan()
+    return self.has_code_coverage() or self.is_asan() or self.is_tsan()
 
   def _get_impalad_dwarf_info(self):
     """
@@ -170,7 +179,8 @@ class ImpaladBuild(object):
     assuming a debug build and log a warning.
     """
     ASAN_CU_NAME = 'asan_preinit.cc'
-    NON_ASAN_CU_NAME = 'daemon-main.cc'
+    TSAN_CU_NAME = 'tsan_clock.cc'
+    DEFAULT_CU_NAME = 'daemon-main.cc'
     GDB_FLAG = '-ggdb'
     CODE_COVERAGE_FLAG = '-ftest-coverage'
 
@@ -185,7 +195,9 @@ class ImpaladBuild(object):
 
     if die_name.endswith(ASAN_CU_NAME):
       specific_build_type = SpecificImpaladBuildTypes.ADDRESS_SANITIZER
-    elif not die_name.endswith(NON_ASAN_CU_NAME):
+    if die_name.endswith(TSAN_CU_NAME):
+      specific_build_type = SpecificImpaladBuildTypes.THREAD_SANITIZER
+    elif not die_name.endswith(DEFAULT_CU_NAME):
       LOG.warn('Unexpected DW_AT_name in first CU: {0}; choosing '
                'DEBUG'.format(die_name))
       specific_build_type = SpecificImpaladBuildTypes.DEBUG


[2/8] incubator-impala git commit: IMPALA-5812: Fix NPE when joining on empty const select

Posted by jb...@apache.org.
IMPALA-5812: Fix NPE when joining on empty const select

A NPE is thrown during the creation of the single node plan of a query
consisting of a cross join with a constant select that returns
an empty result set. This happens because when an empty-set plan node
is created, its tupleIds_ and tblRefIds_ are initialized with the
tuple ID of a newly create tuple that does not map to any existing
tableRefs. This causes a null pre-check to fail during the creation
of the join node when it tries to fetch the tableRef from that new
tuple Id in the empty-set node but doesn't find one.

Testing:
Added a planner test.

Change-Id: I6e425dbcb442aeeac687e103774823d3f50e6436
Reviewed-on: http://gerrit.cloudera.org:8080/7971
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4c9b46a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4c9b46a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4c9b46a9

Branch: refs/heads/master
Commit: 4c9b46a904c5b660ab5faa33cf1c58ab6417d799
Parents: 91f7bc1
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Tue Sep 5 15:56:36 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Sep 6 22:49:05 2017 +0000

----------------------------------------------------------------------
 .../apache/impala/planner/SingleNodePlanner.java   |  2 ++
 .../queries/PlannerTest/empty.test                 | 17 +++++++++++++++++
 2 files changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4c9b46a9/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 8d82409..d72e818 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -1057,6 +1057,8 @@ public class SingleNodePlanner {
           // true for an outer-joined inline view that has no table refs.
           Preconditions.checkState(!analyzer.isOuterJoined(inlineViewRef.getId()));
           emptySetNode.setOutputSmap(inlineViewRef.getSmap());
+          // The tblRef materialized by this node is still the 'inlineViewRef'.
+          emptySetNode.setTblRefIds(Lists.newArrayList(inlineViewRef.getId()));
           return emptySetNode;
         }
         // Analysis should have generated a tuple id into which to materialize the exprs.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4c9b46a9/testdata/workloads/functional-planner/queries/PlannerTest/empty.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/empty.test b/testdata/workloads/functional-planner/queries/PlannerTest/empty.test
index 7ea44e0..c85d8ec 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/empty.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/empty.test
@@ -550,3 +550,20 @@ PLAN-ROOT SINK
 |
 00:EMPTYSET
 ====
+---- QUERY
+# IMPALA-5812: Test that a cross join with a constant select that returns an empty result
+# set translates into an EMPTYSET in the final plan
+select count(*) from functional.alltypes x cross join (select 1 as j) y where j is null
+---- PLAN
+PLAN-ROOT SINK
+|
+03:AGGREGATE [FINALIZE]
+|  output: count(*)
+|
+02:NESTED LOOP JOIN [CROSS JOIN]
+|
+|--01:EMPTYSET
+|
+00:SCAN HDFS [functional.alltypes x]
+   partitions=24/24 files=24 size=478.45KB
+====


[3/8] incubator-impala git commit: IMPALA-5888: free other local allocations in Parquet

Posted by jb...@apache.org.
IMPALA-5888: free other local allocations in Parquet

Testing:
I wasn't able to produce any abnormal memory consumption from dictionary
or min/max filters so haven't included a regression test.

Change-Id: I7792552510b54aa95044e44218e3351a36d6f9a8
Reviewed-on: http://gerrit.cloudera.org:8080/7933
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/be98aaac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/be98aaac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/be98aaac

Branch: refs/heads/master
Commit: be98aaacada6966ca39741696d8f95d24f2d3117
Parents: 4c9b46a
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Aug 31 22:19:48 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Sep 7 00:52:56 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be98aaac/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index f20818a..4cd4340 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -556,7 +556,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
       // comparisons cannot happen here, since predicates with NULL literals are filtered
       // in the frontend.
       *skip_row_group = true;
-      return Status::OK();
+      break;
     }
 
     if (pos_field) {
@@ -597,10 +597,13 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts(
       row.SetTuple(0, min_max_tuple);
       if (!ExecNode::EvalPredicate(eval, &row)) {
         *skip_row_group = true;
-        return Status::OK();
+        break;
       }
     }
   }
+
+  // Free any local allocations accumulated during conjunct evaluation.
+  ScalarExprEvaluator::FreeLocalAllocations(min_max_conjunct_evals_);
   return Status::OK();
 }
 
@@ -911,6 +914,11 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
     void* slot = dict_filter_tuple->GetSlot(slot_desc->tuple_offset());
     bool column_has_match = false;
     for (int dict_idx = 0; dict_idx < dictionary->num_entries(); ++dict_idx) {
+      if (dict_idx % 1024 == 0) {
+        // Don't let local allocations accumulate too much for large dictionaries or
+        // many row groups.
+        ScalarExprEvaluator::FreeLocalAllocations(dict_filter_conjunct_evals);
+      }
       dictionary->GetValue(dict_idx, slot);
 
       // We can only eliminate this row group if no value from the dictionary matches.
@@ -923,6 +931,8 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
         break;
       }
     }
+    // Free all local allocations now that we're done with the filter.
+    ScalarExprEvaluator::FreeLocalAllocations(dict_filter_conjunct_evals);
 
     if (!column_has_match) {
       // The column contains no value that matches the conjunct. The row group


[5/8] incubator-impala git commit: IMPALA-5317: add DATE_TRUNC() function

Posted by jb...@apache.org.
IMPALA-5317: add DATE_TRUNC() function

Added a UDF builtin function date_trunc.
Reuse many of the Trunc functions implemented already for trunc() including
truncate unit and except strToTruncUnit
Added checks to ensure that truncation results that fall outside of
posix timestamp range are returned as NULL.
Added ctest for the date_trunc function.

Change-Id: I953ba006cbb166dcc78e8c0c12dfbf70f093b584
Reviewed-on: http://gerrit.cloudera.org:8080/7313
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f538b439
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f538b439
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f538b439

Branch: refs/heads/master
Commit: f538b43911eb4bdc4ce5269038f0011e3e83a3de
Parents: b1edaf2
Author: Sandeep Akinapelli <sa...@cloudera.com>
Authored: Fri Jun 16 11:56:10 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Sep 7 01:29:01 2017 +0000

----------------------------------------------------------------------
 be/src/exprs/expr-test.cc                    | 115 ++++++++-
 be/src/exprs/udf-builtins-ir.cc              |   5 +
 be/src/exprs/udf-builtins.cc                 | 272 ++++++++++++++++++----
 be/src/exprs/udf-builtins.h                  |  32 ++-
 common/function-registry/impala_functions.py |   4 +
 5 files changed, 382 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f538b439/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index e04abc5..3899a7a 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -7300,9 +7300,122 @@ TEST_F(ExprTest, UuidTest) {
   EXPECT_TRUE(string_set.size() == NUM_UUIDS);
 }
 
+TEST_F(ExprTest, DateTruncTest) {
+  TestTimestampValue("date_trunc('MILLENNIUM', '2016-05-08 10:30:00')",
+      TimestampValue::Parse("2000-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('MILLENNIUM', '2000-01-01 00:00:00')",
+      TimestampValue::Parse("2000-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('CENTURY', '2016-05-08 10:30:00')",
+      TimestampValue::Parse("2000-01-01 00:00:00  "));
+  TestTimestampValue("date_trunc('CENTURY', '2116-05-08 10:30:00')",
+      TimestampValue::Parse("2100-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('DECADE', '2116-05-08 10:30:00')",
+      TimestampValue::Parse("2110-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('YEAR', '2016-05-08 10:30:00')",
+      TimestampValue::Parse("2016-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('MONTH', '2016-05-08 00:00:00')",
+      TimestampValue::Parse("2016-05-01 00:00:00"));
+  TestTimestampValue("date_trunc('WEEK', '2116-05-08 10:30:00')",
+      TimestampValue::Parse("2116-05-04 00:00:00"));
+  TestTimestampValue("date_trunc('WEEK', '2017-01-01 10:37:03.455722111')",
+      TimestampValue::Parse("2016-12-26 00:00:00"));
+  TestTimestampValue("date_trunc('WEEK', '2017-01-02 10:37:03.455722111')",
+      TimestampValue::Parse("2017-01-02 00:00:00"));
+  TestTimestampValue("date_trunc('WEEK', '2017-01-07 10:37:03.455722111')",
+      TimestampValue::Parse("2017-01-02 00:00:00"));
+  TestTimestampValue("date_trunc('WEEK', '2017-01-08 10:37:03.455722111')",
+      TimestampValue::Parse("2017-01-02 00:00:00"));
+  TestTimestampValue("date_trunc('WEEK', '2017-01-09 10:37:03.455722111')",
+      TimestampValue::Parse("2017-01-09 00:00:00"));
+  TestTimestampValue("date_trunc('DAY', '1416-05-08 10:37:03.455722111')",
+      TimestampValue::Parse("1416-05-08 00:00:00"));
+
+  TestTimestampValue("date_trunc('HOUR', '1416-05-08 10:30:03.455722111')",
+      TimestampValue::Parse("1416-05-08 10:00:00"));
+  TestTimestampValue("date_trunc('HOUR', '1416-05-08 23:30:03.455722111')",
+      TimestampValue::Parse("1416-05-08 23:00:00"));
+  TestTimestampValue("date_trunc('MINUTE', '1416-05-08 10:37:03.455722111')",
+      TimestampValue::Parse("1416-05-08 10:37:00"));
+  TestTimestampValue("date_trunc('SECOND', '1416-05-08 10:37:03.455722111')",
+      TimestampValue::Parse("1416-05-08 10:37:03"));
+  TestTimestampValue("date_trunc('MILLISECONDS', '1416-05-08 10:37:03.455722111')",
+      TimestampValue::Parse("1416-05-08 10:37:03.455000000"));
+  TestTimestampValue("date_trunc('MICROSECONDS', '1416-05-08 10:37:03.455722111')",
+      TimestampValue::Parse("1416-05-08 10:37:03.455722000"));
+
+  // Test corner cases.
+  TestTimestampValue("date_trunc('MILLENNIUM', '9999-12-31 23:59:59.999999999')",
+      TimestampValue::Parse("9000-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('CENTURY', '9999-12-31 23:59:59.999999999')",
+      TimestampValue::Parse("9900-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('DECADE', '9999-12-31 23:59:59.999999999')",
+      TimestampValue::Parse("9990-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('YEAR', '9999-12-31 23:59:59.999999999')",
+      TimestampValue::Parse("9999-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('MONTH', '9999-12-31 23:59:59.999999999')",
+      TimestampValue::Parse("9999-12-01 00:00:00"));
+  TestTimestampValue("date_trunc('WEEK', '9999-12-31 23:59:59.999999999')",
+      TimestampValue::Parse("9999-12-27 00:00:00"));
+  TestTimestampValue("date_trunc('WEEK', '1400-01-06 23:59:59.999999999')",
+      TimestampValue::Parse("1400-01-06 00:00:00"));
+  TestTimestampValue("date_trunc('WEEK', '1400-01-07 23:59:59.999999999')",
+      TimestampValue::Parse("1400-01-06 00:00:00"));
+  TestTimestampValue("date_trunc('DAY', '9999-12-31 23:59:59.999999999')",
+      TimestampValue::Parse("9999-12-31 00:00:00"));
+  TestTimestampValue("date_trunc('HOUR', '9999-12-31 23:59:59.999999999')",
+      TimestampValue::Parse("9999-12-31 23:00:00"));
+  TestTimestampValue("date_trunc('MINUTE', '9999-12-31 23:59:59.999999999')",
+      TimestampValue::Parse("9999-12-31 23:59:00"));
+  TestTimestampValue("date_trunc('SECOND', '9999-12-31 23:59:59.999999999')",
+      TimestampValue::Parse("9999-12-31 23:59:59"));
+  TestTimestampValue("date_trunc('MILLISECONDS', '9999-12-31 23:59:59.999999999')",
+      TimestampValue::Parse("9999-12-31 23:59:59.999"));
+  TestTimestampValue("date_trunc('MICROSECONDS', '9999-12-31 23:59:59.999999999')",
+      TimestampValue::Parse("9999-12-31 23:59:59.999999"));
+
+  TestTimestampValue("date_trunc('CENTURY', '1400-01-01 00:00:00')",
+      TimestampValue::Parse("1400-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('DECADE', '1400-01-01 00:00:00')",
+      TimestampValue::Parse("1400-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('YEAR', '1400-01-01 00:00:00')",
+      TimestampValue::Parse("1400-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('MONTH', '1400-01-01 00:00:00')",
+      TimestampValue::Parse("1400-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('DAY', '1400-01-01 00:00:00')",
+      TimestampValue::Parse("1400-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('HOUR', '1400-01-01 00:00:00')",
+      TimestampValue::Parse("1400-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('MINUTE', '1400-01-01 00:00:00')",
+      TimestampValue::Parse("1400-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('SECOND', '1400-01-01 00:00:00')",
+      TimestampValue::Parse("1400-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('MILLISECONDS', '1400-01-01 00:00:00')",
+      TimestampValue::Parse("1400-01-01 00:00:00"));
+  TestTimestampValue("date_trunc('MICROSECONDS', '1400-01-01 00:00:00')",
+      TimestampValue::Parse("1400-01-01 00:00:00"));
+
+  // valid input with invalid output
+  TestIsNull("date_trunc('MILLENNIUM', '1416-05-08 10:30:00')", TYPE_TIMESTAMP);
+  TestIsNull("date_trunc('MILLENNIUM', '1999-12-31 11:59:59.999999')", TYPE_TIMESTAMP);
+  TestIsNull("date_trunc('WEEK', '1400-01-01 00:00:00')", TYPE_TIMESTAMP);
+  TestIsNull("date_trunc('WEEK', '1400-01-05 00:00:00')", TYPE_TIMESTAMP);
+
+  // Test invalid input.
+  TestIsNull("date_trunc('HOUR', '12202010')", TYPE_TIMESTAMP);
+  TestIsNull("date_trunc('HOUR', '')", TYPE_TIMESTAMP);
+  TestIsNull("date_trunc('HOUR', NULL)", TYPE_TIMESTAMP);
+  TestIsNull("date_trunc('HOUR', '02-13-2014')", TYPE_TIMESTAMP);
+  TestIsNull("date_trunc('CENTURY', '16-05-08 10:30:00')", TYPE_TIMESTAMP);
+  TestIsNull("date_trunc('CENTURY', '1116-05-08 10:30:00')", TYPE_TIMESTAMP);
+  TestIsNull("date_trunc('DAY', '00:00:00')", TYPE_TIMESTAMP);
+  TestError("date_trunc('YsEAR', '2016-05-08 10:30:00')");
+  TestError("date_trunc('D', '2116-05-08 10:30:00')");
+  TestError("date_trunc('2017-01-09', '2017-01-09 10:37:03.455722111' )");
+  TestError("date_trunc('2017-01-09 10:00:00', 'HOUR')");
+}
 } // namespace impala
 
-int main(int argc, char **argv) {
+int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST);
   ABORT_IF_ERROR(TimezoneDatabase::Initialize());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f538b439/be/src/exprs/udf-builtins-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/udf-builtins-ir.cc b/be/src/exprs/udf-builtins-ir.cc
index b02a422..03b88ba 100644
--- a/be/src/exprs/udf-builtins-ir.cc
+++ b/be/src/exprs/udf-builtins-ir.cc
@@ -108,6 +108,11 @@ TimestampVal UdfBuiltins::Trunc(FunctionContext* context, const TimestampVal& tv
   return TruncImpl(context, tv, unit_str);
 }
 
+TimestampVal UdfBuiltins::DateTrunc(
+    FunctionContext* context, const StringVal& unit_str, const TimestampVal& tv) {
+  return DateTruncImpl(context, tv, unit_str);
+}
+
 // Maps the user facing name of a unit to a TExtractField
 // Returns the TExtractField for the given unit
 TExtractField::type StrToExtractField(FunctionContext* ctx, const StringVal& unit_str) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f538b439/be/src/exprs/udf-builtins.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/udf-builtins.cc b/be/src/exprs/udf-builtins.cc
index 4662da0..01178e3 100644
--- a/be/src/exprs/udf-builtins.cc
+++ b/be/src/exprs/udf-builtins.cc
@@ -32,23 +32,31 @@ using boost::gregorian::date;
 using boost::gregorian::date_duration;
 using boost::posix_time::ptime;
 using boost::posix_time::time_duration;
+using boost::posix_time::milliseconds;
+using boost::posix_time::microseconds;
 using namespace impala;
 using namespace strings;
 
 // The units which can be used when Truncating a Timestamp
-struct TruncUnit {
-  enum Type {
-    UNIT_INVALID,
-    YEAR,
-    QUARTER,
-    MONTH,
-    WW,
-    W,
-    DAY,
-    DAY_OF_WEEK,
-    HOUR,
-    MINUTE
-  };
+enum class TruncUnit {
+  UNIT_INVALID,
+  YEAR,
+  QUARTER,
+  MONTH,
+  WW,
+  W,
+  DAY,
+  DAY_OF_WEEK,
+  HOUR,
+  MINUTE,
+  // Below units are only used by DateTrunc
+  MILLENNIUM,
+  CENTURY,
+  DECADE,
+  WEEK,
+  SECOND,
+  MILLISECONDS,
+  MICROSECONDS,
 };
 
 // Put non-exported functions in anonymous namespace to encourage inlining.
@@ -68,13 +76,48 @@ date GoBackToWeekday(const date& orig_date, int week_day) {
   return orig_date - date_duration(7 + diff);
 }
 
+// Maps the user facing name of a unit to a TruncUnit used by DateTrunc function
+// Returns the TruncUnit for the given string
+TruncUnit StrToDateTruncUnit(FunctionContext* ctx, const StringVal& unit_str) {
+  StringVal unit = UdfBuiltins::Lower(ctx, unit_str);
+  if (UNLIKELY(unit.is_null)) return TruncUnit::UNIT_INVALID;
+
+  if (unit == "millennium") {
+    return TruncUnit::MILLENNIUM;
+  } else if (unit == "century") {
+    return TruncUnit::CENTURY;
+  } else if (unit == "decade") {
+    return TruncUnit::DECADE;
+  } else if (unit == "year") {
+    return TruncUnit::YEAR;
+  } else if (unit == "month") {
+    return TruncUnit::MONTH;
+  } else if (unit == "week") {
+    return TruncUnit::WEEK;
+  } else if (unit == "day") {
+    return TruncUnit::DAY;
+  } else if (unit == "hour") {
+    return TruncUnit::HOUR;
+  } else if (unit == "minute") {
+    return TruncUnit::MINUTE;
+  } else if (unit == "second") {
+    return TruncUnit::SECOND;
+  } else if (unit == "milliseconds") {
+    return TruncUnit::MILLISECONDS;
+  } else if (unit == "microseconds") {
+    return TruncUnit::MICROSECONDS;
+  } else {
+    return TruncUnit::UNIT_INVALID;
+  }
+}
+
 // Maps the user facing name of a unit to a TruncUnit
 // Returns the TruncUnit for the given string
-TruncUnit::Type StrToTruncUnit(FunctionContext* ctx, const StringVal& unit_str) {
+TruncUnit StrToTruncUnit(FunctionContext* ctx, const StringVal& unit_str) {
   StringVal unit = UdfBuiltins::Lower(ctx, unit_str);
   if (UNLIKELY(unit.is_null)) return TruncUnit::UNIT_INVALID;
-  if ((unit == "syyyy") || (unit == "yyyy") || (unit == "year") || (unit == "syear") ||
-      (unit == "yyy") || (unit == "yy") || (unit == "y")) {
+  if ((unit == "syyyy") || (unit == "yyyy") || (unit == "year") || (unit == "syear")
+      || (unit == "yyy") || (unit == "yy") || (unit == "y")) {
     return TruncUnit::YEAR;
   } else if (unit == "q") {
     return TruncUnit::QUARTER;
@@ -97,6 +140,27 @@ TruncUnit::Type StrToTruncUnit(FunctionContext* ctx, const StringVal& unit_str)
   }
 }
 
+// Truncate to first day of Milllenium
+TimestampValue TruncMillenium(const date& orig_date) {
+  date new_date(orig_date.year() / 1000 * 1000, 1, 1);
+  time_duration new_time(0, 0, 0, 0);
+  return TimestampValue(new_date, new_time);
+}
+
+// Truncate to first day of century
+TimestampValue TruncCentury(const date& orig_date) {
+  date new_date(orig_date.year() / 100 * 100, 1, 1);
+  time_duration new_time(0, 0, 0, 0);
+  return TimestampValue(new_date, new_time);
+}
+
+// Truncate to first day of decade
+TimestampValue TruncDecade(const date& orig_date) {
+  date new_date(orig_date.year() / 10 * 10, 1, 1);
+  time_duration new_time(0, 0, 0, 0);
+  return TimestampValue(new_date, new_time);
+}
+
 // Truncate to first day of year
 TimestampValue TruncYear(const date& orig_date) {
   date new_date(orig_date.year(), 1, 1);
@@ -119,6 +183,14 @@ TimestampValue TruncMonth(const date& orig_date) {
   return TimestampValue(new_date, new_time);
 }
 
+// Truncate to first day of the week (monday)
+TimestampValue TruncWeek(const date& orig_date) {
+  // ISO-8601 week starts on monday. go back to monday
+  date new_date = GoBackToWeekday(orig_date, 1);
+  time_duration new_time(0, 0, 0, 0);
+  return TimestampValue(new_date, new_time);
+}
+
 // Same day of the week as the first day of the year
 TimestampValue TruncWW(const date& orig_date) {
   const date& first_day_of_year = TruncYear(orig_date).date();
@@ -160,37 +232,53 @@ TimestampValue TruncMinute(const date& orig_date, const time_duration& orig_time
   time_duration new_time(orig_time.hours(), orig_time.minutes(), 0, 0);
   return TimestampValue(orig_date, new_time);
 }
+
+// Truncate parts of seconds
+TimestampValue TruncSecond(const date& orig_date, const time_duration& orig_time) {
+  time_duration new_time(orig_time.hours(), orig_time.minutes(), orig_time.seconds());
+  return TimestampValue(orig_date, new_time);
 }
 
-TimestampVal UdfBuiltins::TruncImpl(FunctionContext* context, const TimestampVal& tv,
-    const StringVal &unit_str) {
-  if (tv.is_null) return TimestampVal::null();
-  const TimestampValue& ts = TimestampValue::FromTimestampVal(tv);
-  const date& orig_date = ts.date();
-  const time_duration& orig_time = ts.time();
+// Truncate parts of milliseconds
+TimestampValue TruncMilliSeconds(const date& orig_date, const time_duration& orig_time) {
+  time_duration new_time(orig_time.hours(), orig_time.minutes(), orig_time.seconds());
+  // Fractional seconds are nanoseconds because Boost is configured to use nanoseconds
+  // precision.
+  time_duration fraction = milliseconds(orig_time.fractional_seconds() / 1000000);
+  new_time = new_time + fraction;
+  return TimestampValue(orig_date, new_time);
+}
 
-  // resolve trunc_unit using the prepared state if possible, o.w. parse now
-  // TruncPrepare() can only parse trunc_unit if user passes it as a string literal
-  // TODO: it would be nice to resolve the branch before codegen so we can optimise
-  // this better.
-  TruncUnit::Type trunc_unit;
-  void* state = context->GetFunctionState(FunctionContext::THREAD_LOCAL);
-  if (state != NULL) {
-    trunc_unit = *reinterpret_cast<TruncUnit::Type*>(state);
-  } else {
-    trunc_unit = StrToTruncUnit(context, unit_str);
-    if (trunc_unit == TruncUnit::UNIT_INVALID) {
-      string string_unit(reinterpret_cast<char*>(unit_str.ptr), unit_str.len);
-      context->SetError(Substitute("Invalid Truncate Unit: $0", string_unit).c_str());
-      return TimestampVal::null();
-    }
-  }
+// Truncate parts of microseconds
+TimestampValue TruncMicroSeconds(const date& orig_date, const time_duration& orig_time) {
+  time_duration new_time(orig_time.hours(), orig_time.minutes(), orig_time.seconds());
+  // Fractional seconds are nanoseconds because Boost is configured to use nanoseconds
+  // precision.
+  time_duration fraction = microseconds(orig_time.fractional_seconds() / 1000);
+  new_time = new_time + fraction;
+  return TimestampValue(orig_date, new_time);
+}
 
+// used by both Trunc and DateTrunc functions to perform the truncation
+TimestampVal DoTrunc(
+    const TimestampValue ts, TruncUnit trunc_unit, FunctionContext* context) {
+  const date& orig_date = ts.date();
+  const time_duration& orig_time = ts.time();
   TimestampValue ret;
   TimestampVal ret_val;
 
   // check for invalid or malformed timestamps
   switch (trunc_unit) {
+    case TruncUnit::MILLENNIUM:
+      // for millenium < 2000 year value goes to 1000 (outside the supported range)
+      if (orig_date.is_special()) return TimestampVal::null();
+      if (orig_date.year() < 2000) return TimestampVal::null();
+      break;
+    case TruncUnit::WEEK:
+      // anything less than 1400-1-6 we have to move to year 1399
+      if (orig_date.is_special()) return TimestampVal::null();
+      if (orig_date < date(1400, 1, 6)) return TimestampVal::null();
+      break;
     case TruncUnit::YEAR:
     case TruncUnit::QUARTER:
     case TruncUnit::MONTH:
@@ -198,10 +286,15 @@ TimestampVal UdfBuiltins::TruncImpl(FunctionContext* context, const TimestampVal
     case TruncUnit::W:
     case TruncUnit::DAY:
     case TruncUnit::DAY_OF_WEEK:
+    case TruncUnit::CENTURY:
+    case TruncUnit::DECADE:
       if (orig_date.is_special()) return TimestampVal::null();
       break;
     case TruncUnit::HOUR:
     case TruncUnit::MINUTE:
+    case TruncUnit::SECOND:
+    case TruncUnit::MILLISECONDS:
+    case TruncUnit::MICROSECONDS:
       if (orig_time.is_special()) return TimestampVal::null();
       break;
     case TruncUnit::UNIT_INVALID:
@@ -236,27 +329,73 @@ TimestampVal UdfBuiltins::TruncImpl(FunctionContext* context, const TimestampVal
     case TruncUnit::MINUTE:
       ret = TruncMinute(orig_date, orig_time);
       break;
+    case TruncUnit::MILLENNIUM:
+      ret = TruncMillenium(orig_date);
+      break;
+    case TruncUnit::CENTURY:
+      ret = TruncCentury(orig_date);
+      break;
+    case TruncUnit::DECADE:
+      ret = TruncDecade(orig_date);
+      break;
+    case TruncUnit::WEEK:
+      ret = TruncWeek(orig_date);
+      break;
+    case TruncUnit::SECOND:
+      ret = TruncSecond(orig_date, orig_time);
+      break;
+    case TruncUnit::MILLISECONDS:
+      ret = TruncMilliSeconds(orig_date, orig_time);
+      break;
+    case TruncUnit::MICROSECONDS:
+      ret = TruncMicroSeconds(orig_date, orig_time);
+      break;
     default:
       // internal error: implies StrToTruncUnit out of sync with this switch
-      context->SetError(Substitute("truncate unit $0 not supported", trunc_unit).c_str());
+      context->SetError("truncate unit not supported");
       return TimestampVal::null();
   }
 
   ret.ToTimestampVal(&ret_val);
   return ret_val;
 }
+}
 
-void UdfBuiltins::TruncPrepare(FunctionContext* ctx,
-    FunctionContext::FunctionStateScope scope) {
+TimestampVal UdfBuiltins::TruncImpl(
+    FunctionContext* context, const TimestampVal& tv, const StringVal& unit_str) {
+  if (tv.is_null) return TimestampVal::null();
+  const TimestampValue& ts = TimestampValue::FromTimestampVal(tv);
+
+  // resolve trunc_unit using the prepared state if possible, o.w. parse now
+  // TruncPrepare() can only parse trunc_unit if user passes it as a string literal
+  // TODO: it would be nice to resolve the branch before codegen so we can optimise
+  // this better.
+  TruncUnit trunc_unit;
+  void* state = context->GetFunctionState(FunctionContext::THREAD_LOCAL);
+  if (state != NULL) {
+    trunc_unit = *reinterpret_cast<TruncUnit*>(state);
+  } else {
+    trunc_unit = StrToTruncUnit(context, unit_str);
+    if (trunc_unit == TruncUnit::UNIT_INVALID) {
+      string string_unit(reinterpret_cast<char*>(unit_str.ptr), unit_str.len);
+      context->SetError(Substitute("Invalid Truncate Unit: $0", string_unit).c_str());
+      return TimestampVal::null();
+    }
+  }
+  return DoTrunc(ts, trunc_unit, context);
+}
+
+void UdfBuiltins::TruncPrepare(
+    FunctionContext* ctx, FunctionContext::FunctionStateScope scope) {
   // Parse the unit up front if we can, otherwise do it on the fly in Trunc()
   if (ctx->IsArgConstant(1)) {
     StringVal* unit_str = reinterpret_cast<StringVal*>(ctx->GetConstantArg(1));
-    TruncUnit::Type trunc_unit = StrToTruncUnit(ctx, *unit_str);
+    TruncUnit trunc_unit = StrToTruncUnit(ctx, *unit_str);
     if (trunc_unit == TruncUnit::UNIT_INVALID) {
       string string_unit(reinterpret_cast<char*>(unit_str->ptr), unit_str->len);
       ctx->SetError(Substitute("Invalid Truncate Unit: $0", string_unit).c_str());
     } else {
-      TruncUnit::Type* state = ctx->Allocate<TruncUnit::Type>();
+      TruncUnit* state = ctx->Allocate<TruncUnit>();
       RETURN_IF_NULL(ctx, state);
       *state = trunc_unit;
       ctx->SetFunctionState(scope, state);
@@ -271,3 +410,50 @@ void UdfBuiltins::TruncClose(FunctionContext* ctx,
   ctx->SetFunctionState(scope, nullptr);
 }
 
+TimestampVal UdfBuiltins::DateTruncImpl(
+    FunctionContext* context, const TimestampVal& tv, const StringVal& unit_str) {
+  if (tv.is_null) return TimestampVal::null();
+  const TimestampValue& ts = TimestampValue::FromTimestampVal(tv);
+
+  // resolve date_trunc_unit using the prepared state if possible, o.w. parse now
+  // DateTruncPrepare() can only parse trunc_unit if user passes it as a string literal
+  TruncUnit date_trunc_unit;
+  void* state = context->GetFunctionState(FunctionContext::THREAD_LOCAL);
+  if (state != NULL) {
+    date_trunc_unit = *reinterpret_cast<TruncUnit*>(state);
+  } else {
+    date_trunc_unit = StrToDateTruncUnit(context, unit_str);
+    if (date_trunc_unit == TruncUnit::UNIT_INVALID) {
+      string string_unit(reinterpret_cast<char*>(unit_str.ptr), unit_str.len);
+      context->SetError(
+          Substitute("Invalid Date Truncate Unit: $0", string_unit).c_str());
+      return TimestampVal::null();
+    }
+  }
+  return DoTrunc(ts, date_trunc_unit, context);
+}
+
+void UdfBuiltins::DateTruncPrepare(
+    FunctionContext* ctx, FunctionContext::FunctionStateScope scope) {
+  // Parse the unit up front if we can, otherwise do it on the fly in DateTrunc()
+  if (ctx->IsArgConstant(0)) {
+    StringVal* unit_str = reinterpret_cast<StringVal*>(ctx->GetConstantArg(0));
+    TruncUnit date_trunc_unit = StrToDateTruncUnit(ctx, *unit_str);
+    if (date_trunc_unit == TruncUnit::UNIT_INVALID) {
+      string string_unit(reinterpret_cast<char*>(unit_str->ptr), unit_str->len);
+      ctx->SetError(Substitute("Invalid Date Truncate Unit: $0", string_unit).c_str());
+    } else {
+      TruncUnit* state = ctx->Allocate<TruncUnit>();
+      RETURN_IF_NULL(ctx, state);
+      *state = date_trunc_unit;
+      ctx->SetFunctionState(scope, state);
+    }
+  }
+}
+
+void UdfBuiltins::DateTruncClose(
+    FunctionContext* ctx, FunctionContext::FunctionStateScope scope) {
+  void* state = ctx->GetFunctionState(scope);
+  ctx->Free(reinterpret_cast<uint8_t*>(state));
+  ctx->SetFunctionState(scope, nullptr);
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f538b439/be/src/exprs/udf-builtins.h
----------------------------------------------------------------------
diff --git a/be/src/exprs/udf-builtins.h b/be/src/exprs/udf-builtins.h
index 2ef439b..4eeb33a 100644
--- a/be/src/exprs/udf-builtins.h
+++ b/be/src/exprs/udf-builtins.h
@@ -79,8 +79,36 @@ class UdfBuiltins {
       const StringVal& unit_str);
   static void TruncPrepare(FunctionContext* context,
       FunctionContext::FunctionStateScope scope);
-  static void TruncClose(FunctionContext* context,
-      FunctionContext::FunctionStateScope scope);
+  static void TruncClose(
+      FunctionContext* context, FunctionContext::FunctionStateScope scope);
+
+  /// Rounds (truncating down) a Timestamp to the specified unit.
+  ///    Units:
+  ///    MILLENNIUM: The millennium number.
+  ///    CENTURY: The century number.
+  ///    DECADE: The year field divided by 10.
+  ///    YEAR: The year field (1400 - 9999).
+  ///    MONTH: The number of the month within the year (1–12)
+  ///    WEEK: The number of the week of the year that the day is in.
+  ///    DAY: The day (of the month) field (1–31).
+  ///    HOUR: The hour field (0–23).
+  ///    MINUTE: The minutes field (0–59).
+  ///    SECOND: The seconds field (0–59).
+  ///    MILLISECONDS: The milliseconds fraction in the seconds.
+  ///    MICROSECONDS: The microseconds fraction in the seconds.
+
+  ///    Reference:
+  ///    https://my.vertica.com/docs/8.1.x/HTML/index.htm#Authoring/
+  ///       SQLReferenceManual/Functions/Date-Time/DATE_TRUNC.htm
+  static TimestampVal DateTrunc(
+      FunctionContext* context, const StringVal& unit_str, const TimestampVal& date);
+  /// Implementation of DateTrunc, not cross-compiled.
+  static TimestampVal DateTruncImpl(
+      FunctionContext* context, const TimestampVal& date, const StringVal& unit_str);
+  static void DateTruncPrepare(
+      FunctionContext* context, FunctionContext::FunctionStateScope scope);
+  static void DateTruncClose(
+      FunctionContext* context, FunctionContext::FunctionStateScope scope);
 
   /// Returns a single field from a timestamp
   ///    Fields:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f538b439/common/function-registry/impala_functions.py
----------------------------------------------------------------------
diff --git a/common/function-registry/impala_functions.py b/common/function-registry/impala_functions.py
index f43c7c1..0e469e6 100644
--- a/common/function-registry/impala_functions.py
+++ b/common/function-registry/impala_functions.py
@@ -124,6 +124,10 @@ visible_functions = [
   [['millisecond'], 'INT', ['TIMESTAMP'], '_ZN6impala18TimestampFunctions11MillisecondEPN10impala_udf15FunctionContextERKNS1_12TimestampValE'],
   [['to_date'], 'STRING', ['TIMESTAMP'], '_ZN6impala18TimestampFunctions6ToDateEPN10impala_udf15FunctionContextERKNS1_12TimestampValE'],
   [['dayname'], 'STRING', ['TIMESTAMP'], '_ZN6impala18TimestampFunctions7DayNameEPN10impala_udf15FunctionContextERKNS1_12TimestampValE'],
+  [['date_trunc'], 'TIMESTAMP', ['STRING', 'TIMESTAMP'],
+   '_ZN6impala11UdfBuiltins9DateTruncEPN10impala_udf15FunctionContextERKNS1_9StringValERKNS1_12TimestampValE',
+   '_ZN6impala11UdfBuiltins16DateTruncPrepareEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE',
+   '_ZN6impala11UdfBuiltins14DateTruncCloseEPN10impala_udf15FunctionContextENS2_18FunctionStateScopeE'],
   [['years_add'], 'TIMESTAMP', ['TIMESTAMP', 'INT'],
       '_ZN6impala18TimestampFunctions6AddSubILb1EN10impala_udf6IntValEN5boost9date_time14years_durationINS4_9gregorian21greg_durations_configEEELb0EEENS2_12TimestampValEPNS2_15FunctionContextERKSA_RKT0_'],
   [['years_add'], 'TIMESTAMP', ['TIMESTAMP', 'BIGINT'],


[7/8] incubator-impala git commit: IMPALA-5750: Catch exceptions from boost thread creation

Posted by jb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index 18f3a75..6898517 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -48,10 +48,10 @@ class Webserver;
 /// TODO: Consider allowing fragment IDs as category parameters.
 class Thread {
  public:
-  /// This constructor pattern mimics that in boost::thread. There is
-  /// one constructor for each number of arguments that the thread
+  /// This static Create method pattern mimics that in the boost::thread constructors.
+  /// There is one static Create method for each number of arguments that the thread
   /// function accepts. To extend the set of acceptable signatures, add
-  /// another constructor with <class F, class A1.... class An>.
+  /// another static Create method with <class F, class A1.... class An>.
   //
   /// In general:
   ///  - category: string identifying the thread category to which this thread belongs,
@@ -61,44 +61,56 @@ class Thread {
   ///  - F - a method type that supports operator(), and the instance passed to the
   ///    constructor is executed immediately in a separate thread.
   ///  - A1...An - argument types whose instances are passed to f(...)
+  ///  - thread - unique_ptr<Thread>* to reset with the created Thread.
+  ///  - fault_injection_eligible - If set to true, allow fault injection at this
+  ///    callsite (see thread_creation_fault_injection). If set to false, fault
+  ///    injection is diabled at this callsite. Thread creation sites that crash
+  ///    Impala or abort startup must have this set to false.
   template <class F>
-  Thread(const std::string& category, const std::string& name, const F& f)
-      : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
-    StartThread(f);
+  static Status Create(const std::string& category, const std::string& name,
+      const F& f, std::unique_ptr<Thread>* thread,
+      bool fault_injection_eligible = false) {
+    return StartThread(category, name, f, thread, fault_injection_eligible);
   }
 
   template <class F, class A1>
-  Thread(const std::string& category, const std::string& name, const F& f, const A1& a1)
-      : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
-    StartThread(boost::bind(f, a1));
+  static Status Create(const std::string& category, const std::string& name,
+      const F& f, const A1& a1, std::unique_ptr<Thread>* thread,
+      bool fault_injection_eligible = false) {
+    return StartThread(category, name, boost::bind(f, a1), thread,
+        fault_injection_eligible);
   }
 
   template <class F, class A1, class A2>
-  Thread(const std::string& category, const std::string& name, const F& f,
-      const A1& a1, const A2& a2)
-      : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
-    StartThread(boost::bind(f, a1, a2));
+  static Status Create(const std::string& category, const std::string& name,
+      const F& f, const A1& a1, const A2& a2, std::unique_ptr<Thread>* thread,
+      bool fault_injection_eligible = false) {
+    return StartThread(category, name, boost::bind(f, a1, a2), thread,
+        fault_injection_eligible);
   }
 
   template <class F, class A1, class A2, class A3>
-  Thread(const std::string& category, const std::string& name, const F& f,
-      const A1& a1, const A2& a2, const A3& a3)
-      : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
-    StartThread(boost::bind(f, a1, a2, a3));
+  static Status Create(const std::string& category, const std::string& name,
+      const F& f, const A1& a1, const A2& a2, const A3& a3,
+      std::unique_ptr<Thread>* thread, bool fault_injection_eligible = false) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3), thread,
+        fault_injection_eligible);
   }
 
   template <class F, class A1, class A2, class A3, class A4>
-  Thread(const std::string& category, const std::string& name, const F& f,
-      const A1& a1, const A2& a2, const A3& a3, const A4& a4)
-      : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
-    StartThread(boost::bind(f, a1, a2, a3, a4));
+  static Status Create(const std::string& category, const std::string& name,
+      const F& f, const A1& a1, const A2& a2, const A3& a3, const A4& a4,
+      std::unique_ptr<Thread>* thread, bool fault_injection_eligible = false) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4), thread,
+        fault_injection_eligible);
   }
 
   template <class F, class A1, class A2, class A3, class A4, class A5>
-  Thread(const std::string& category, const std::string& name, const F& f,
-      const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5)
-      : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {
-    StartThread(boost::bind(f, a1, a2, a3, a4, a5));
+  static Status Create(const std::string& category, const std::string& name,
+      const F& f, const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5,
+      std::unique_ptr<Thread>* thread, bool fault_injection_eligible = false) {
+    return StartThread(category, name, boost::bind(f, a1, a2, a3, a4, a5), thread,
+        fault_injection_eligible);
   }
 
   /// Blocks until this thread finishes execution. Once this method returns, the thread
@@ -117,6 +129,9 @@ class Thread {
   static const int64_t INVALID_THREAD_ID = -1;
 
  private:
+  Thread(const std::string& category, const std::string& name)
+    : category_(category), name_(name), tid_(UNINITIALISED_THREAD_ID) {}
+
   /// To distinguish between a thread ID that can't be determined, and one that hasn't
   /// been assigned. Since tid_ is set in the constructor, this value will never be seen
   /// by clients of this class.
@@ -137,10 +152,13 @@ class Thread {
   /// non-negative integer, or INVALID_THREAD_ID.
   int64_t tid_;
 
-  /// Starts the thread running SuperviseThread(), and returns once that thread has
-  /// initialised and its TID read. Waits for notification from the started thread that
-  /// initialisation is complete before returning.
-  void StartThread(const ThreadFunctor& functor);
+  /// Creates a new thread and starts the thread running SuperviseThread(). It waits
+  /// for notification from the started thread that initialisation is complete and
+  /// the TID read before returning. This will return an error if thread create fails.
+  /// In the event of success, 'thread' will be set to the created Thread.
+  static Status StartThread(const std::string& category, const std::string& name,
+      const ThreadFunctor& functor, std::unique_ptr<Thread>* thread,
+      bool fault_injection_eligible) WARN_UNUSED_RESULT;
 
   /// Wrapper for the user-supplied function. Always invoked from thread_. Executes the
   /// method in functor_, but before doing so registers with the global ThreadMgr and
@@ -175,7 +193,7 @@ class ThreadGroup {
   /// will destroy it when the ThreadGroup is destroyed.  Threads will linger until that
   /// point (even if terminated), however, so callers should be mindful of the cost of
   /// placing very many threads in this set.
-  void AddThread(std::unique_ptr<Thread> thread);
+  void AddThread(std::unique_ptr<Thread>&& thread);
 
   /// Waits for all threads to finish. DO NOT call this from a thread inside this set;
   /// deadlock will predictably ensue.
@@ -196,7 +214,7 @@ void InitThreading();
 /// the "thread-manager." If 'include_jvm_threads' is true, shows information about
 /// live JVM threads in the web UI.
 Status StartThreadInstrumentation(MetricGroup* metrics, Webserver* webserver,
-    bool include_jvm_threads);
+    bool include_jvm_threads) WARN_UNUSED_RESULT;
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 393daba..c08268e 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -331,6 +331,8 @@ error_codes = (
 
   ("ADMISSION_TIMED_OUT", 108, "Admission for query exceeded timeout $0ms in pool $1. "
      "Queued reason: $2"),
+
+  ("THREAD_CREATION_FAILED", 109, "Failed to create thread $0 in category $1: $2"),
 )
 
 import sys


[6/8] incubator-impala git commit: IMPALA-4620: Refactor evalcost computation in query analysis

Posted by jb...@apache.org.
IMPALA-4620: Refactor evalcost computation in query analysis

This patch adds a computeEvalCost abstract function to class Expr and
call it explicitly when an expr is analyzed. Existing code sets evalcost
at random places in analyzeImpl(), causing a bug casting child expr
without recomputing evalcost. Furthermore, if a child of an Expr is
substituted from one with known evalcost to one with unkwown evalcost,
evalcost of the parent won't be updatde by subsequent analyze() calls.
This patch fixes these problems.
A planner test case with wrong predicate order is also fixed.

Change-Id: Ibec3d648532c185d4318476796b1ba432b0fe59e
Reviewed-on: http://gerrit.cloudera.org:8080/7948
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/897f025c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/897f025c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/897f025c

Branch: refs/heads/master
Commit: 897f025c69bd0dd45520721a0d6eb9c9f14aa342
Parents: f538b43
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Tue Aug 29 19:06:12 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Sep 7 01:47:41 2017 +0000

----------------------------------------------------------------------
 .../apache/impala/analysis/AnalyticExpr.java    |  5 ++--
 .../apache/impala/analysis/ArithmeticExpr.java  |  6 +++-
 .../impala/analysis/BetweenPredicate.java       |  3 ++
 .../apache/impala/analysis/BinaryPredicate.java | 25 ++++++++--------
 .../org/apache/impala/analysis/BoolLiteral.java |  2 --
 .../org/apache/impala/analysis/CaseExpr.java    | 14 ++++-----
 .../org/apache/impala/analysis/CastExpr.java    |  8 ++++--
 .../impala/analysis/CompoundPredicate.java      |  6 +++-
 .../apache/impala/analysis/ExistsPredicate.java |  3 ++
 .../java/org/apache/impala/analysis/Expr.java   |  9 +++++-
 .../impala/analysis/FunctionCallExpr.java       |  5 +++-
 .../org/apache/impala/analysis/InPredicate.java | 10 ++++---
 .../impala/analysis/IsNotEmptyPredicate.java    |  7 ++++-
 .../apache/impala/analysis/IsNullPredicate.java |  6 +++-
 .../impala/analysis/KuduPartitionExpr.java      |  3 ++
 .../apache/impala/analysis/LikePredicate.java   | 30 +++++++++++---------
 .../org/apache/impala/analysis/LiteralExpr.java |  6 ++++
 .../org/apache/impala/analysis/NullLiteral.java |  1 -
 .../apache/impala/analysis/NumericLiteral.java  |  3 --
 .../org/apache/impala/analysis/SlotRef.java     |  6 +++-
 .../apache/impala/analysis/StringLiteral.java   |  1 -
 .../org/apache/impala/analysis/Subquery.java    |  3 ++
 .../analysis/TimestampArithmeticExpr.java       |  6 +++-
 .../impala/analysis/TimestampLiteral.java       |  1 -
 .../impala/analysis/TupleIsNullPredicate.java   |  6 +++-
 .../queries/PlannerTest/inline-view.test        |  4 +--
 26 files changed, 117 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
index 534d09a..15d5b4d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticExpr.java
@@ -35,8 +35,6 @@ import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.TColumnValue;
 import org.apache.impala.thrift.TExprNode;
 import org.apache.impala.util.TColumnValueUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
@@ -554,6 +552,9 @@ public class AnalyticExpr extends Expr {
     setChildren();
   }
 
+  @Override
+  protected float computeEvalCost() { return UNKNOWN_COST; }
+
   /**
    * If necessary, rewrites the analytic function, window, and/or order-by elements into
    * a standard format for the purpose of simpler backend execution, as follows:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/ArithmeticExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ArithmeticExpr.java b/fe/src/main/java/org/apache/impala/analysis/ArithmeticExpr.java
index 763a38d..48ac2f2 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ArithmeticExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ArithmeticExpr.java
@@ -190,7 +190,6 @@ public class ArithmeticExpr extends Expr {
       Preconditions.checkState(children_.size() == 2);
       t1 = getChild(1).getType();
     }
-    if (hasChildCosts()) evalCost_ = getChildCosts() + ARITHMETIC_OP_COST;
 
     String fnName = op_.getName();
     switch (op_) {
@@ -263,5 +262,10 @@ public class ArithmeticExpr extends Expr {
   }
 
   @Override
+  protected float computeEvalCost() {
+    return hasChildCosts() ? getChildCosts() + ARITHMETIC_OP_COST : UNKNOWN_COST;
+  }
+
+  @Override
   public Expr clone() { return new ArithmeticExpr(this); }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/BetweenPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/BetweenPredicate.java b/fe/src/main/java/org/apache/impala/analysis/BetweenPredicate.java
index 8806990..4cedeac 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BetweenPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BetweenPredicate.java
@@ -57,6 +57,9 @@ public class BetweenPredicate extends Predicate {
     analyzer.castAllToCompatibleType(children_);
   }
 
+  @Override
+  protected float computeEvalCost() { return UNKNOWN_COST; }
+
   public boolean isNotBetween() { return isNotBetween_; }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java b/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
index cc4b030..afdfaa6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BinaryPredicate.java
@@ -228,19 +228,20 @@ public class BinaryPredicate extends Predicate {
         selectivity_ = Math.max(0, Math.min(1, selectivity_));
       }
     }
+  }
 
-    // Compute cost.
-    if (hasChildCosts()) {
-      if (getChild(0).getType().isFixedLengthType()) {
-        evalCost_ = getChildCosts() + BINARY_PREDICATE_COST;
-      } else if (getChild(0).getType().isStringType()) {
-        evalCost_ = getChildCosts() +
-            (float) (getAvgStringLength(getChild(0)) + getAvgStringLength(getChild(1)) *
-            BINARY_PREDICATE_COST);
-      } else {
-        //TODO(tmarshall): Handle other var length types here.
-        evalCost_ = getChildCosts() + VAR_LEN_BINARY_PREDICATE_COST;
-      }
+  @Override
+  protected float computeEvalCost() {
+    if (!hasChildCosts()) return UNKNOWN_COST;
+    if (getChild(0).getType().isFixedLengthType()) {
+      return getChildCosts() + BINARY_PREDICATE_COST;
+    } else if (getChild(0).getType().isStringType()) {
+      return getChildCosts() +
+          (float) (getAvgStringLength(getChild(0)) + getAvgStringLength(getChild(1))) *
+              BINARY_PREDICATE_COST;
+    } else {
+      //TODO(tmarshall): Handle other var length types here.
+      return getChildCosts() + VAR_LEN_BINARY_PREDICATE_COST;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/BoolLiteral.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/BoolLiteral.java b/fe/src/main/java/org/apache/impala/analysis/BoolLiteral.java
index 0198704..838f120 100644
--- a/fe/src/main/java/org/apache/impala/analysis/BoolLiteral.java
+++ b/fe/src/main/java/org/apache/impala/analysis/BoolLiteral.java
@@ -31,12 +31,10 @@ public class BoolLiteral extends LiteralExpr {
   public BoolLiteral(boolean value) {
     this.value_ = value;
     type_ = Type.BOOLEAN;
-    evalCost_ = LITERAL_COST;
   }
 
   public BoolLiteral(String value) throws AnalysisException {
     type_ = Type.BOOLEAN;
-    evalCost_ = LITERAL_COST;
     if (value.toLowerCase().equals("true")) {
       this.value_ = true;
     } else if (value.toLowerCase().equals("false")) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/CaseExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CaseExpr.java b/fe/src/main/java/org/apache/impala/analysis/CaseExpr.java
index 9d6ea69..237076a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CaseExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CaseExpr.java
@@ -345,18 +345,16 @@ public class CaseExpr extends Expr {
         CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
     Preconditions.checkNotNull(fn_);
     type_ = returnType;
+  }
 
+  @Override
+  protected float computeEvalCost() {
+    if (!hasChildCosts()) return UNKNOWN_COST;
     // Compute cost as the sum of evaluating all of the WHEN exprs, plus
     // the max of the THEN/ELSE exprs.
     float maxThenCost = 0;
     float whenCosts = 0;
-    boolean hasChildCosts = true;
     for (int i = 0; i < children_.size(); ++i) {
-      if (!getChild(i).hasCost()) {
-        hasChildCosts = false;
-        break;
-      }
-
       if (hasCaseExpr_ && i % 2 == 1) {
         // This child is a WHEN expr. BINARY_PREDICATE_COST accounts for the cost of
         // comparing the CASE expr to the WHEN expr.
@@ -371,9 +369,7 @@ public class CaseExpr extends Expr {
         if (thenCost > maxThenCost) maxThenCost = thenCost;
       }
     }
-    if (hasChildCosts) {
-      evalCost_ =  whenCosts + maxThenCost;
-    }
+    return whenCosts + maxThenCost;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CastExpr.java b/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
index 3619c46..f185696 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CastExpr.java
@@ -66,6 +66,7 @@ public class CastExpr extends Expr {
     try {
       analyze();
       computeNumDistinctValues();
+      evalCost_ = computeEvalCost();
     } catch (AnalysisException ex) {
       Preconditions.checkState(false,
           "Implicit casts should never throw analysis exception.");
@@ -205,9 +206,12 @@ public class CastExpr extends Expr {
     analyze();
   }
 
-  private void analyze() throws AnalysisException {
-    if (getChild(0).hasCost()) evalCost_ = getChild(0).getCost() + CAST_COST;
+  @Override
+  protected float computeEvalCost() {
+    return getChild(0).hasCost() ? getChild(0).getCost() + CAST_COST : UNKNOWN_COST;
+  }
 
+  private void analyze() throws AnalysisException {
     Preconditions.checkNotNull(type_);
     if (type_.isComplexType()) {
       throw new AnalysisException(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java b/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java
index 6354004..ba1ef8d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CompoundPredicate.java
@@ -134,7 +134,6 @@ public class CompoundPredicate extends Predicate {
     Preconditions.checkState(fn_ != null);
     Preconditions.checkState(fn_.getReturnType().isBoolean());
     castForFunctionCall(false);
-    if (hasChildCosts()) evalCost_ = getChildCosts() + COMPOUND_PREDICATE_COST;
 
     if (!getChild(0).hasSelectivity() ||
         (children_.size() == 2 && !getChild(1).hasSelectivity())) {
@@ -158,6 +157,11 @@ public class CompoundPredicate extends Predicate {
     selectivity_ = Math.max(0.0, Math.min(1.0, selectivity_));
   }
 
+  @Override
+  protected float computeEvalCost() {
+    return hasChildCosts() ? getChildCosts() + COMPOUND_PREDICATE_COST : UNKNOWN_COST;
+  }
+
   /**
    * Negates a CompoundPredicate.
    */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java b/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java
index 355f562..5acd04a 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ExistsPredicate.java
@@ -70,6 +70,9 @@ public class ExistsPredicate extends Predicate {
   public Expr clone() { return new ExistsPredicate(this); }
 
   @Override
+  protected float computeEvalCost() { return UNKNOWN_COST; }
+
+  @Override
   public String toSqlImpl() {
     StringBuilder strBuilder = new StringBuilder();
     if (notExists_) strBuilder.append("NOT ");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/Expr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index 774d7b3..5dc0438 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -26,7 +26,6 @@ import java.util.ListIterator;
 import java.util.Set;
 
 import org.apache.impala.analysis.BinaryPredicate.Operator;
-import org.apache.impala.analysis.BoolLiteral;
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.Function.CompareMode;
@@ -86,6 +85,7 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
   public final static float LITERAL_COST = 1;
   public final static float SLOT_REF_COST = 1;
   public final static float TIMESTAMP_ARITHMETIC_COST = 5;
+  public final static float UNKNOWN_COST = -1;
 
   // To be used when estimating the cost of Exprs of type string where we don't otherwise
   // have an estimate of how long the strings produced by that Expr are.
@@ -341,6 +341,7 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
 
     // Do all the analysis for the expr subclass before marking the Expr analyzed.
     analyzeImpl(analyzer);
+    evalCost_ = computeEvalCost();
     analysisDone();
   }
 
@@ -363,6 +364,12 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
     }
   }
 
+  /**
+   * Compute and return evalcost of this expr given the evalcost of all children has been
+   * computed. Should be called bottom-up whenever the structure of subtree is modified.
+   */
+  abstract protected float computeEvalCost();
+
   protected void computeNumDistinctValues() {
     if (isConstant()) {
       numDistinctValues_ = 1;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
index 9364f12..5de6f6b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FunctionCallExpr.java
@@ -569,9 +569,12 @@ public class FunctionCallExpr extends Expr {
     if (type_.isWildcardChar() || type_.isWildcardVarchar()) {
       type_ = ScalarType.STRING;
     }
+  }
 
+  @Override
+  protected float computeEvalCost() {
     // TODO(tmarshall): Differentiate based on the specific function.
-    if (hasChildCosts()) evalCost_ = getChildCosts() + FUNCTION_CALL_COST;
+    return hasChildCosts() ? getChildCosts() + FUNCTION_CALL_COST : UNKNOWN_COST;
   }
 
   public FunctionCallExpr getMergeAggInputFn() { return mergeAggInputFn_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/InPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InPredicate.java b/fe/src/main/java/org/apache/impala/analysis/InPredicate.java
index de12167..e34752f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InPredicate.java
@@ -184,11 +184,13 @@ public class InPredicate extends Predicate {
       }
       selectivity_ = Math.max(0.0, Math.min(1.0, selectivity_));
     }
+  }
 
-    if (hasChildCosts()) {
-      // BINARY_PREDICATE_COST accounts for the cost of performing the comparison.
-      evalCost_ = getChildCosts() + BINARY_PREDICATE_COST * (children_.size() - 1);
-    }
+  @Override
+  protected float computeEvalCost() {
+    if (!hasChildCosts()) return UNKNOWN_COST;
+    // BINARY_PREDICATE_COST accounts for the cost of performing the comparison.
+    return getChildCosts() + BINARY_PREDICATE_COST * (children_.size() - 1);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/IsNotEmptyPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/IsNotEmptyPredicate.java b/fe/src/main/java/org/apache/impala/analysis/IsNotEmptyPredicate.java
index 380fcf4..9ea1122 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IsNotEmptyPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IsNotEmptyPredicate.java
@@ -48,7 +48,12 @@ public class IsNotEmptyPredicate extends Predicate {
     }
     // Avoid influencing cardinality estimates.
     selectivity_ = 1.0;
-    if (getChild(0).hasCost()) evalCost_ = getChild(0).getCost() + IS_NOT_EMPTY_COST;
+  }
+
+  @Override
+  protected float computeEvalCost() {
+    if (!getChild(0).hasCost()) return UNKNOWN_COST;
+    return getChild(0).getCost() + IS_NOT_EMPTY_COST;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java b/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java
index fb3e964..fe160c8 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IsNullPredicate.java
@@ -134,7 +134,6 @@ public class IsNullPredicate extends Predicate {
       fn_ = getBuiltinFunction(
           analyzer, IS_NULL, collectChildReturnTypes(), CompareMode.IS_IDENTICAL);
     }
-    if (getChild(0).hasCost()) evalCost_ = getChild(0).getCost() + IS_NULL_COST;
 
     // determine selectivity
     // TODO: increase this to make sure we don't end up favoring broadcast joins
@@ -158,6 +157,11 @@ public class IsNullPredicate extends Predicate {
   }
 
   @Override
+  protected float computeEvalCost() {
+    return getChild(0).hasCost() ? getChild(0).getCost() + IS_NULL_COST : UNKNOWN_COST;
+  }
+
+  @Override
   protected void toThrift(TExprNode msg) {
     msg.node_type = TExprNodeType.FUNCTION_CALL;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
index 8d52d59..08c53fd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/KuduPartitionExpr.java
@@ -79,6 +79,9 @@ public class KuduPartitionExpr extends Expr {
   }
 
   @Override
+  protected float computeEvalCost() { return UNKNOWN_COST; }
+
+  @Override
   protected String toSqlImpl() {
     StringBuilder sb = new StringBuilder("KuduPartition(");
     for (int i = 0; i < children_.size(); ++i) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java b/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
index 9fab11e..32ccddb 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LikePredicate.java
@@ -143,21 +143,23 @@ public class LikePredicate extends Predicate {
       }
     }
     castForFunctionCall(false);
+  }
 
-    if (hasChildCosts()) {
-      if (getChild(1).isLiteral() && !getChild(1).isNullLiteral() &&
-          Pattern.matches("[%_]*[^%_]*[%_]*", ((StringLiteral) getChild(1)).getValue())) {
-        // This pattern only has wildcards as leading or trailing character,
-        // so it is linear.
-        evalCost_ = getChildCosts() +
-            (float) (getAvgStringLength(getChild(0)) + getAvgStringLength(getChild(1)) *
-            BINARY_PREDICATE_COST) + LIKE_COST;
-      } else {
-        // This pattern is more expensive, so calculate its cost as quadratic.
-        evalCost_ = getChildCosts() +
-            (float) (getAvgStringLength(getChild(0)) * getAvgStringLength(getChild(1)) *
-            BINARY_PREDICATE_COST) + LIKE_COST;
-      }
+  @Override
+  protected float computeEvalCost() {
+    if (!hasChildCosts()) return UNKNOWN_COST;
+    if (getChild(1).isLiteral() && !getChild(1).isNullLiteral() &&
+      Pattern.matches("[%_]*[^%_]*[%_]*", ((StringLiteral) getChild(1)).getValue())) {
+      // This pattern only has wildcards as leading or trailing character,
+      // so it is linear.
+      return getChildCosts() +
+          (float) (getAvgStringLength(getChild(0)) + getAvgStringLength(getChild(1)) *
+              BINARY_PREDICATE_COST) + LIKE_COST;
+    } else {
+      // This pattern is more expensive, so calculate its cost as quadratic.
+      return getChildCosts() +
+          (float) (getAvgStringLength(getChild(0)) * getAvgStringLength(getChild(1)) *
+              BINARY_PREDICATE_COST) + LIKE_COST;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java b/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java
index 45a65f9..ccc9c7e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LiteralExpr.java
@@ -43,6 +43,7 @@ public abstract class LiteralExpr extends Expr implements Comparable<LiteralExpr
   private final static Logger LOG = LoggerFactory.getLogger(LiteralExpr.class);
 
   public LiteralExpr() {
+    evalCost_ = LITERAL_COST;
     numDistinctValues_ = 1;
   }
 
@@ -101,6 +102,11 @@ public abstract class LiteralExpr extends Expr implements Comparable<LiteralExpr
     // Literals require no analysis.
   }
 
+  @Override
+  protected float computeEvalCost() {
+    return LITERAL_COST;
+  }
+
   /**
    * Returns an analyzed literal from the thrift object.
    */

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/NullLiteral.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/NullLiteral.java b/fe/src/main/java/org/apache/impala/analysis/NullLiteral.java
index 0dcb264..e2ea869 100644
--- a/fe/src/main/java/org/apache/impala/analysis/NullLiteral.java
+++ b/fe/src/main/java/org/apache/impala/analysis/NullLiteral.java
@@ -28,7 +28,6 @@ public class NullLiteral extends LiteralExpr {
 
   public NullLiteral() {
     type_ = Type.NULL;
-    evalCost_ = LITERAL_COST;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java b/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java
index 79c906c..98b9dbd 100644
--- a/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java
+++ b/fe/src/main/java/org/apache/impala/analysis/NumericLiteral.java
@@ -90,7 +90,6 @@ public class NumericLiteral extends LiteralExpr {
   public NumericLiteral(BigInteger value, Type type) {
     value_ = new BigDecimal(value);
     type_ = type;
-    evalCost_ = LITERAL_COST;
     explicitlyCast_ = true;
     analysisDone();
   }
@@ -98,7 +97,6 @@ public class NumericLiteral extends LiteralExpr {
   public NumericLiteral(BigDecimal value, Type type) {
     value_ = value;
     type_ = type;
-    evalCost_ = LITERAL_COST;
     explicitlyCast_ = true;
     analysisDone();
   }
@@ -222,7 +220,6 @@ public class NumericLiteral extends LiteralExpr {
         }
       }
     }
-    evalCost_ = LITERAL_COST;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index 21bfd22..f90d7c6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -111,7 +111,6 @@ public class SlotRef extends Expr {
       // HMS string.
       throw new AnalysisException("Unsupported type in '" + toSql() + "'.");
     }
-    evalCost_ = SLOT_REF_COST;
 
     numDistinctValues_ = desc_.getStats().getNumDistinctValues();
     Table rootTable = resolvedPath.getRootTable();
@@ -122,6 +121,11 @@ public class SlotRef extends Expr {
   }
 
   @Override
+  protected float computeEvalCost() {
+    return SLOT_REF_COST;
+  }
+
+  @Override
   protected boolean isConstantImpl() { return false; }
 
   public SlotDescriptor getDesc() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java b/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java
index 4f838ff..1dbe5a4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StringLiteral.java
@@ -47,7 +47,6 @@ public class StringLiteral extends LiteralExpr {
   public StringLiteral(String value, Type type, boolean needsUnescaping) {
     value_ = value;
     type_ = type;
-    evalCost_ = LITERAL_COST;
     needsUnescaping_ = needsUnescaping;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/Subquery.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/Subquery.java b/fe/src/main/java/org/apache/impala/analysis/Subquery.java
index 1626d18..bad3d7e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Subquery.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Subquery.java
@@ -99,6 +99,9 @@ public class Subquery extends Expr {
   }
 
   @Override
+  protected float computeEvalCost() { return UNKNOWN_COST; }
+
+  @Override
   protected boolean isConstantImpl() { return false; }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/TimestampArithmeticExpr.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TimestampArithmeticExpr.java b/fe/src/main/java/org/apache/impala/analysis/TimestampArithmeticExpr.java
index 104cd1a..5a5468d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TimestampArithmeticExpr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TimestampArithmeticExpr.java
@@ -164,7 +164,11 @@ public class TimestampArithmeticExpr extends Expr {
     Preconditions.checkNotNull(fn_);
     Preconditions.checkState(fn_.getReturnType().isTimestamp());
     type_ = fn_.getReturnType();
-    if (hasChildCosts()) evalCost_ = getChildCosts() + TIMESTAMP_ARITHMETIC_COST;
+  }
+
+  @Override
+  protected float computeEvalCost() {
+    return hasChildCosts() ? getChildCosts() + TIMESTAMP_ARITHMETIC_COST : UNKNOWN_COST;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/TimestampLiteral.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TimestampLiteral.java b/fe/src/main/java/org/apache/impala/analysis/TimestampLiteral.java
index 568f6e7..25f111e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TimestampLiteral.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TimestampLiteral.java
@@ -45,7 +45,6 @@ public class TimestampLiteral extends LiteralExpr {
     value_ = value;
     strValue_ = strValue;
     type_ = Type.TIMESTAMP;
-    evalCost_ = Expr.LITERAL_COST;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java b/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java
index f55ed81..8da0237 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TupleIsNullPredicate.java
@@ -60,7 +60,11 @@ public class TupleIsNullPredicate extends Predicate {
   protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
     super.analyzeImpl(analyzer);
     analyzer_ = analyzer;
-    evalCost_ = tupleIds_.size() * IS_NULL_COST;
+  }
+
+  @Override
+  protected float computeEvalCost() {
+    return tupleIds_.size() * IS_NULL_COST;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/897f025c/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
index 7d10dc6..2c866e3 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/inline-view.test
@@ -215,7 +215,7 @@ PLAN-ROOT SINK
 |
 |--01:SCAN HDFS [functional.alltypessmall]
 |     partitions=2/4 files=2 size=3.17KB
-|     predicates: functional.alltypessmall.id + 15 = 27, functional.alltypessmall.string_col = '15'
+|     predicates: functional.alltypessmall.string_col = '15', functional.alltypessmall.id + 15 = 27
 |
 00:SCAN HDFS [functional.alltypesagg]
    partitions=5/11 files=5 size=372.38KB
@@ -235,7 +235,7 @@ PLAN-ROOT SINK
 |  |
 |  01:SCAN HDFS [functional.alltypessmall]
 |     partitions=2/4 files=2 size=3.17KB
-|     predicates: functional.alltypessmall.id + 15 = 27, functional.alltypessmall.string_col = '15'
+|     predicates: functional.alltypessmall.string_col = '15', functional.alltypessmall.id + 15 = 27
 |
 03:EXCHANGE [HASH(id,int_col)]
 |


[8/8] incubator-impala git commit: IMPALA-5750: Catch exceptions from boost thread creation

Posted by jb...@apache.org.
IMPALA-5750: Catch exceptions from boost thread creation

The boost thread constructor will throw boost::thread_resource_error
if it is unable to spawn a thread on the system
(e.g. due to a ulimit). This uncaught exception crashes
Impala. Systems with a large number of nodes and threads
are hitting this limit.

This change catches the exception from the thread
constructor and converts it to a Status. This requires
several changes:
1. util/thread.h's Thread constructor is now private
and all Threads are constructed via a new Create()
static factory method.
2. util/thread-pool.h's ThreadPool requires that Init()
be called after the ThreadPool is constructed.
3. To propagate the Status, Threads cannot be created in
constructors, so this is moved to initialization methods
that can return Status.
4. Threads now use unique_ptr's for management in all
cases. Threads cannot be used as stack-allocated local
variables or direct declarations in classes.

Query execution code paths will now handle the error:
1. If the scan node fails to spawn any scanner thread,
it will abort the query.
2. Failing to spawn a fragment instance from the query
state in StartFInstances() will correctly report the error
to the coordinator and tear down the query.

Testing:
This introduces the parameter thread_creation_fault_injection,
which will cause Thread::Create() calls in eligible
locations to fail randomly roughly 1% of the time.
Quite a few locations of Thread::Create() and
ThreadPool::Init() are necessary for startup and cannot
be eligible. However, all the locations used for query
execution are marked as eligible and governed by this
parameter. The code was tested by setting this parameter
to true and running queries to verify that queries either
run to completion with the correct result or fail with
appropriate status.

Change-Id: I15a2f278dc71892b7fec09593f81b1a57ab725c0
Reviewed-on: http://gerrit.cloudera.org:8080/7730
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/e993b971
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/e993b971
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/e993b971

Branch: refs/heads/master
Commit: e993b9712c81dfb66fdf65bb5269cdc38a8eef18
Parents: 897f025
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Wed Aug 16 17:16:45 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Sep 7 03:25:30 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/thread-create-benchmark.cc    | 10 ++-
 be/src/catalog/catalog-server.cc                | 12 ++-
 be/src/catalog/catalog-server.h                 |  3 +-
 be/src/catalog/catalogd-main.cc                 |  2 +-
 be/src/common/global-flags.cc                   |  3 +
 be/src/common/init.cc                           | 21 ++---
 be/src/common/init.h                            |  3 +-
 be/src/exec/blocking-join-node.cc               | 13 +++-
 be/src/exec/hdfs-scan-node.cc                   | 64 ++++++++++------
 be/src/exec/hdfs-scan-node.h                    |  7 +-
 be/src/exec/kudu-scan-node.cc                   | 52 +++++++++----
 be/src/exec/kudu-scan-node.h                    |  7 ++
 be/src/rpc/TAcceptQueueServer.cpp               |  9 +++
 be/src/rpc/auth-provider.h                      |  3 +-
 be/src/rpc/authentication.cc                    |  4 +-
 be/src/rpc/thrift-server-test.cc                |  1 +
 be/src/rpc/thrift-server.cc                     |  7 +-
 be/src/rpc/thrift-server.h                      |  2 +-
 be/src/rpc/thrift-thread.cc                     | 14 +++-
 be/src/rpc/thrift-thread.h                      |  2 +-
 be/src/runtime/data-stream-sender.cc            |  3 +-
 be/src/runtime/disk-io-mgr-handle-cache.h       |  3 +-
 .../runtime/disk-io-mgr-handle-cache.inline.h   |  6 +-
 be/src/runtime/disk-io-mgr.cc                   |  8 +-
 be/src/runtime/exec-env.cc                      |  5 ++
 be/src/runtime/fragment-instance-state.cc       |  5 +-
 be/src/runtime/fragment-instance-state.h        |  2 +-
 be/src/runtime/parallel-executor.cc             | 11 ++-
 be/src/runtime/query-exec-mgr.cc                | 14 +++-
 be/src/runtime/query-state.cc                   | 37 ++++++---
 be/src/runtime/query-state.h                    |  6 +-
 be/src/runtime/thread-resource-mgr.h            | 11 ++-
 be/src/scheduling/admission-controller.cc       | 11 ++-
 be/src/scheduling/admission-controller.h        |  2 +-
 be/src/service/child-query.cc                   |  9 ++-
 be/src/service/child-query.h                    |  6 +-
 be/src/service/client-request-state.cc          | 10 ++-
 be/src/service/client-request-state.h           |  4 +-
 be/src/service/impala-beeswax-server.cc         |  8 +-
 be/src/service/impala-hs2-server.cc             | 18 ++---
 be/src/service/impala-server.cc                 | 22 +++---
 be/src/service/impala-server.h                  | 10 +--
 be/src/service/impalad-main.cc                  |  3 +-
 be/src/statestore/statestore-subscriber.cc      |  4 +-
 be/src/statestore/statestore-subscriber.h       |  2 +-
 be/src/statestore/statestore.cc                 |  6 ++
 be/src/statestore/statestore.h                  |  5 ++
 be/src/statestore/statestored-main.cc           |  3 +-
 be/src/testutil/in-process-servers.cc           |  5 +-
 be/src/testutil/in-process-servers.h            |  2 +-
 be/src/util/thread-pool-test.cc                 |  1 +
 be/src/util/thread-pool.h                       | 70 +++++++++++++----
 be/src/util/thread.cc                           | 39 ++++++++--
 be/src/util/thread.h                            | 80 ++++++++++++--------
 common/thrift/generate_error_codes.py           |  2 +
 55 files changed, 456 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/benchmarks/thread-create-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/thread-create-benchmark.cc b/be/src/benchmarks/thread-create-benchmark.cc
index cac1f1c..7b57242 100644
--- a/be/src/benchmarks/thread-create-benchmark.cc
+++ b/be/src/benchmarks/thread-create-benchmark.cc
@@ -87,14 +87,16 @@ void NativeThreadStarter(int num_threads, const function<void ()>& f) {
 
 // Runs N Impala Threads, each executing 'f'
 void ImpalaThreadStarter(int num_threads, const function<void ()>& f) {
-  vector<Thread*> threads;
+  vector<unique_ptr<Thread>> threads;
   threads.reserve(num_threads);
   for (int i=0; i < num_threads; ++i) {
-    threads.push_back(new Thread("mythreadgroup", "thread", f));
+    unique_ptr<Thread> thread;
+    Status s = Thread::Create("mythreadgroup", "thread", f, &thread);
+    DCHECK(s.ok());
+    threads.push_back(move(thread));
   }
-  for (Thread* thread: threads) {
+  for (unique_ptr<Thread>& thread: threads) {
     thread->Join();
-    delete thread;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 0e0cca0..38a64d1 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -170,9 +170,13 @@ Status CatalogServer::Start() {
 
   // This will trigger a full Catalog metadata load.
   catalog_.reset(new Catalog());
-  catalog_update_gathering_thread_.reset(new Thread("catalog-server",
-      "catalog-update-gathering-thread",
-      &CatalogServer::GatherCatalogUpdatesThread, this));
+  Status status = Thread::Create("catalog-server", "catalog-update-gathering-thread",
+      &CatalogServer::GatherCatalogUpdatesThread, this,
+      &catalog_update_gathering_thread_);
+  if (!status.ok()) {
+    status.AddDetail("CatalogService failed to start");
+    return status;
+  }
 
   statestore_subscriber_.reset(new StatestoreSubscriber(
      Substitute("catalog-server@$0", TNetworkAddressToString(server_address)),
@@ -180,7 +184,7 @@ Status CatalogServer::Start() {
 
   StatestoreSubscriber::UpdateCallback cb =
       bind<void>(mem_fn(&CatalogServer::UpdateCatalogTopicCallback), this, _1, _2);
-  Status status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, cb);
+  status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC, false, cb);
   if (!status.ok()) {
     status.AddDetail("CatalogService failed to start");
     return status;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 17b8732..9d33591 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -36,7 +36,6 @@ namespace impala {
 
 class StatestoreSubscriber;
 class Catalog;
-class TGetAllCatalogObjectsResponse;
 
 /// The Impala CatalogServer manages the caching and persistence of cluster-wide metadata.
 /// The CatalogServer aggregates the metadata from the Hive Metastore, the NameNode,
@@ -86,7 +85,7 @@ class CatalogServer {
   StatsMetric<double>* topic_processing_time_metric_;
 
   /// Thread that polls the catalog for any updates.
-  boost::scoped_ptr<Thread> catalog_update_gathering_thread_;
+  std::unique_ptr<Thread> catalog_update_gathering_thread_;
 
   /// Tracks the set of catalog objects that exist via their topic entry key.
   /// During each IMPALA_CATALOG_TOPIC heartbeat, stores the set of known catalog objects

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/catalog/catalogd-main.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc
index a3a0edb..f98a406 100644
--- a/be/src/catalog/catalogd-main.cc
+++ b/be/src/catalog/catalogd-main.cc
@@ -75,7 +75,7 @@ int CatalogdMain(int argc, char** argv) {
 
   ABORT_IF_ERROR(metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr));
   ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), true, nullptr, nullptr));
-  StartMemoryMaintenanceThread();
+  ABORT_IF_ERROR(StartMemoryMaintenanceThread());
   ABORT_IF_ERROR(StartThreadInstrumentation(metrics.get(), webserver.get(), true));
 
   InitRpcEventTracing(webserver.get());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index ccffcb3..e5420a2 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -137,6 +137,9 @@ DEFINE_int32(fault_injection_rpc_exception_type, 0, "A fault injection option th
     "in debug builds only");
 DEFINE_int32(stress_scratch_write_delay_ms, 0, "A stress option which causes writes to "
     "scratch files to be to be delayed to simulate slow writes.");
+DEFINE_bool(thread_creation_fault_injection, false, "A fault injection option that "
+    " causes calls to Thread::Create() to fail randomly 1% of the time on eligible "
+    " codepaths. Effective in debug builds only.");
 #endif
 
 // Used for testing the path where the Kudu client is stubbed.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index ce55067..003ebf9 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -95,18 +95,18 @@ using std::string;
 // glog only automatically flushes the log file if logbufsecs has passed since the
 // previous flush when a new log is written. That means that on a quiet system, logs
 // will be buffered indefinitely. It also rotates log files.
-static scoped_ptr<impala::Thread> log_maintenance_thread;
+static unique_ptr<impala::Thread> log_maintenance_thread;
 
 // Memory Maintenance thread that runs periodically to free up memory. It does the
 // following things every memory_maintenance_sleep_time_ms secs:
 // 1) Releases BufferPool memory that is not currently in use.
 // 2) Frees excess memory that TCMalloc has left in its pageheap.
-static scoped_ptr<impala::Thread> memory_maintenance_thread;
+static unique_ptr<impala::Thread> memory_maintenance_thread;
 
 // A pause monitor thread to monitor process pauses in impala daemons. The thread sleeps
 // for a short interval of time (THREAD_SLEEP_TIME_MS), wakes up and calculates the actual
 // time slept. If that exceeds PAUSE_WARN_THRESHOLD_MS, a warning is logged.
-static scoped_ptr<impala::Thread> pause_monitor;
+static unique_ptr<impala::Thread> pause_monitor;
 
 [[noreturn]] static void LogMaintenanceThread() {
   while (true) {
@@ -205,10 +205,13 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
   ABORT_IF_ERROR(impala::InitAuth(argv[0]));
 
   // Initialize maintenance_thread after InitGoogleLoggingSafe and InitThreading.
-  log_maintenance_thread.reset(
-      new Thread("common", "log-maintenance-thread", &LogMaintenanceThread));
+  Status thread_spawn_status = Thread::Create("common", "log-maintenance-thread",
+      &LogMaintenanceThread, &log_maintenance_thread);
+  if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail());
 
-  pause_monitor.reset(new Thread("common", "pause-monitor", &PauseMonitorLoop));
+  thread_spawn_status = Thread::Create("common", "pause-monitor",
+      &PauseMonitorLoop, &pause_monitor);
+  if (!thread_spawn_status.ok()) CLEAN_EXIT_WITH_ERROR(thread_spawn_status.GetDetail());
 
   PeriodicCounterUpdater::Init();
 
@@ -251,8 +254,8 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
 #endif
 }
 
-void impala::StartMemoryMaintenanceThread() {
+Status impala::StartMemoryMaintenanceThread() {
   DCHECK(AggregateMemoryMetrics::NUM_MAPS != nullptr) << "Mem metrics not registered.";
-  memory_maintenance_thread.reset(
-      new Thread("common", "memory-maintenance-thread", &MemoryMaintenanceThread));
+  return Thread::Create("common", "memory-maintenance-thread",
+      &MemoryMaintenanceThread, &memory_maintenance_thread);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/common/init.h
----------------------------------------------------------------------
diff --git a/be/src/common/init.h b/be/src/common/init.h
index b1de6d8..fe8a063 100644
--- a/be/src/common/init.h
+++ b/be/src/common/init.h
@@ -19,6 +19,7 @@
 #define IMPALA_COMMON_INIT_H
 
 #include "util/test-info.h"
+#include "common/status.h"
 
 namespace impala {
 
@@ -33,7 +34,7 @@ void InitCommonRuntime(int argc, char** argv, bool init_jvm,
 /// Starts background memory maintenance thread. Must be called after
 /// RegisterMemoryMetrics(). This thread is needed for daemons to free memory and
 /// refresh metrics but is not needed for standalone tests.
-void StartMemoryMaintenanceThread();
+Status StartMemoryMaintenanceThread() WARN_UNUSED_RESULT;
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 7ccddc0..477e934 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -193,10 +193,15 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
     runtime_profile()->AppendExecOption("Join Build-Side Prepared Asynchronously");
     string thread_name = Substitute("join-build-thread (finst:$0, plan-node-id:$1)",
         PrintId(state->fragment_instance_id()), id());
-    Thread build_thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
-        [this, state, build_sink, status=&build_side_status]() {
+    unique_ptr<Thread> build_thread;
+    Status thread_status = Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
+        thread_name, [this, state, build_sink, status=&build_side_status]() {
           ProcessBuildInputAsync(state, build_sink, status);
-        });
+        }, &build_thread, true);
+    if (!thread_status.ok()) {
+      state->resource_pool()->ReleaseThreadToken(false);
+      return thread_status;
+    }
     // Open the left child so that it may perform any initialisation in parallel.
     // Don't exit even if we see an error, we still need to wait for the build thread
     // to finish.
@@ -207,7 +212,7 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
 
     // Blocks until ProcessBuildInput has returned, after which the build side structures
     // are fully constructed.
-    build_thread.Join();
+    build_thread->Join();
     RETURN_IF_ERROR(build_side_status);
     RETURN_IF_ERROR(open_status);
   } else if (IsInSubplan()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 2dc8d7b..528e290 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -320,6 +320,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
   // TODO: It would be good to have a test case for that.
   if (!initial_ranges_issued_) return;
 
+  Status status = Status::OK();
   while (true) {
     // The lock must be given up between loops in order to give writers to done_,
     // all_ranges_started_ etc. a chance to grab the lock.
@@ -347,14 +348,32 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
     }
 
     COUNTER_ADD(&active_scanner_thread_counter_, 1);
-    COUNTER_ADD(num_scanner_threads_started_counter_, 1);
     string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
         PrintId(runtime_state_->fragment_instance_id()), id(),
         num_scanner_threads_started_counter_->value());
 
     auto fn = [this]() { this->ScannerThread(); };
-    scanner_threads_.AddThread(
-        make_unique<Thread>(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn));
+    std::unique_ptr<Thread> t;
+    status =
+      Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
+    if (!status.ok()) {
+      COUNTER_ADD(&active_scanner_thread_counter_, -1);
+      // Release the token and skip running callbacks to find a replacement. Skipping
+      // serves two purposes. First, it prevents a mutual recursion between this function
+      // and ReleaseThreadToken()->InvokeCallbacks(). Second, Thread::Create() failed and
+      // is likely to continue failing for future callbacks.
+      pool->ReleaseThreadToken(false, true);
+
+      // Abort the query. This is still holding the lock_, so done_ is known to be
+      // false and status_ must be ok.
+      DCHECK(status_.ok());
+      status_ = status;
+      SetDoneInternal();
+      break;
+    }
+    // Thread successfully started
+    COUNTER_ADD(num_scanner_threads_started_counter_, 1);
+    scanner_threads_.AddThread(move(t));
   }
 }
 
@@ -420,21 +439,18 @@ void HdfsScanNode::ScannerThread() {
     }
 
     if (!status.ok()) {
-      {
-        unique_lock<mutex> l(lock_);
-        // If there was already an error, the main thread will do the cleanup
-        if (!status_.ok()) break;
-
-        if (status.IsCancelled() && done_) {
-          // Scan node initiated scanner thread cancellation.  No need to do anything.
-          break;
-        }
-        // Set status_ before calling SetDone() (which shuts down the RowBatchQueue),
-        // to ensure that GetNextInternal() notices the error status.
-        status_ = status;
-      }
+      unique_lock<mutex> l(lock_);
+      // If there was already an error, the main thread will do the cleanup
+      if (!status_.ok()) break;
 
-      SetDone();
+      if (status.IsCancelled() && done_) {
+        // Scan node initiated scanner thread cancellation.  No need to do anything.
+        break;
+      }
+      // Set status_ before calling SetDone() (which shuts down the RowBatchQueue),
+      // to ensure that GetNextInternal() notices the error status.
+      status_ = status;
+      SetDoneInternal();
       break;
     }
 
@@ -542,14 +558,16 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
   return status;
 }
 
-void HdfsScanNode::SetDone() {
-  {
-    unique_lock<mutex> l(lock_);
-    if (done_) return;
-    done_ = true;
-  }
+void HdfsScanNode::SetDoneInternal() {
+  if (done_) return;
+  done_ = true;
   if (reader_context_ != NULL) {
     runtime_state_->io_mgr()->CancelContext(reader_context_);
   }
   materialized_row_batches_->Shutdown();
 }
+
+void HdfsScanNode::SetDone() {
+  unique_lock<mutex> l(lock_);
+  SetDoneInternal();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index b056e2e..18a74ad 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -180,8 +180,11 @@ class HdfsScanNode : public HdfsScanNodeBase {
   Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos)
       WARN_UNUSED_RESULT;
 
-  /// sets done_ to true and triggers threads to cleanup. Cannot be called with
-  /// any locks taken. Calling it repeatedly ignores subsequent calls.
+  /// Sets done_ to true and triggers threads to cleanup. Must be called with lock_
+  /// taken. Calling it repeatedly ignores subsequent calls.
+  void SetDoneInternal();
+
+  /// Gets lock_ and calls SetDoneInternal()
   void SetDone();
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index e192a86..7f18710 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -110,9 +110,7 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
       COUNTER_SET(rows_returned_counter_, num_rows_returned_);
       *eos = true;
 
-      unique_lock<mutex> l(lock_);
-      done_ = true;
-      materialized_row_batches_->Shutdown();
+      SetDone();
     }
     materialized_batch.reset();
   } else {
@@ -130,11 +128,7 @@ void KuduScanNode::Close(RuntimeState* state) {
     state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_);
   }
 
-  if (!done_) {
-    unique_lock<mutex> l(lock_);
-    done_ = true;
-    materialized_row_batches_->Shutdown();
-  }
+  SetDone();
 
   scanner_threads_.JoinAll();
   DCHECK_EQ(num_active_scanners_, 0);
@@ -151,9 +145,6 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
     // Check if we can get a token.
     if (!pool->TryAcquireThreadToken()) break;
 
-    ++num_active_scanners_;
-    COUNTER_ADD(num_scanner_threads_started_counter_, 1);
-
     string name = Substitute(
         "kudu-scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
         PrintId(runtime_state_->fragment_instance_id()), id(),
@@ -162,9 +153,28 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
     // Reserve the first token so no other thread picks it up.
     const string* token = GetNextScanToken();
     auto fn = [this, token, name]() { this->RunScannerThread(name, token); };
+    std::unique_ptr<Thread> t;
+    Status status =
+      Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
+    if (!status.ok()) {
+      // Release the token and skip running callbacks to find a replacement. Skipping
+      // serves two purposes. First, it prevents a mutual recursion between this function
+      // and ReleaseThreadToken()->InvokeCallbacks(). Second, Thread::Create() failed and
+      // is likely to continue failing for future callbacks.
+      pool->ReleaseThreadToken(false, true);
+
+      // Abort the query. This is still holding the lock_, so done_ is known to be
+      // false and status_ must be ok.
+      DCHECK(status_.ok());
+      status_ = status;
+      SetDoneInternal();
+      break;
+    }
+    // Thread successfully started
+    COUNTER_ADD(num_scanner_threads_started_counter_, 1);
+    ++num_active_scanners_;
     VLOG_RPC << "Thread started: " << name;
-    scanner_threads_.AddThread(
-        make_unique<Thread>(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn));
+    scanner_threads_.AddThread(move(t));
   }
 }
 
@@ -231,14 +241,13 @@ void KuduScanNode::RunScannerThread(const string& name, const string* initial_to
     unique_lock<mutex> l(lock_);
     if (!status.ok() && status_.ok()) {
       status_ = status;
-      done_ = true;
+      SetDoneInternal();
     }
     // Decrement num_active_scanners_ unless handling the case of an early exit when
     // optional threads have been exceeded, in which case it already was decremented.
     if (!optional_thread_exiting) --num_active_scanners_;
     if (num_active_scanners_ == 0) {
-      done_ = true;
-      materialized_row_batches_->Shutdown();
+      SetDoneInternal();
     }
   }
 
@@ -248,4 +257,15 @@ void KuduScanNode::RunScannerThread(const string& name, const string* initial_to
   runtime_state_->resource_pool()->ReleaseThreadToken(false);
 }
 
+void KuduScanNode::SetDoneInternal() {
+  if (done_) return;
+  done_ = true;
+  materialized_row_batches_->Shutdown();
+}
+
+void KuduScanNode::SetDone() {
+  unique_lock<mutex> l(lock_);
+  SetDoneInternal();
+}
+
 }  // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/exec/kudu-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index 6341cb6..4759f0a 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -96,6 +96,13 @@ class KuduScanNode : public KuduScanNodeBase {
   /// in 'materialized_row_batches_' until the scanner reports eos, an error occurs, or
   /// the limit is reached.
   Status ProcessScanToken(KuduScanner* scanner, const std::string& scan_token);
+
+  /// Sets done_ to true and triggers threads to cleanup. Must be called with lock_
+  /// taken. Calling it repeatedly ignores subsequent calls.
+  void SetDoneInternal();
+
+  /// Gets lock_ and calls SetDoneInternal()
+  void SetDone();
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/TAcceptQueueServer.cpp
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.cpp b/be/src/rpc/TAcceptQueueServer.cpp
index 65fdc46..030d714 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -31,6 +31,7 @@
 #include <unistd.h>
 #endif
 
+#include "common/status.h"
 #include "util/thread-pool.h"
 
 DEFINE_int32(accepted_cnxn_queue_depth, 10000,
@@ -217,6 +218,14 @@ void TAcceptQueueServer::serve() {
       [this](int tid, const shared_ptr<TTransport>& item) {
         this->SetupConnection(item);
       });
+  // Initialize the thread pool
+  Status status = connection_setup_pool.Init();
+  if (!status.ok()) {
+    status.AddDetail("TAcceptQueueServer: thread pool could not start.");
+    string errStr = status.GetDetail();
+    GlobalOutput(errStr.c_str());
+    stop_ = true;
+  }
 
   while (!stop_) {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/auth-provider.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index 0021dc7..9286154 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -19,7 +19,6 @@
 #define IMPALA_RPC_AUTH_PROVIDER_H
 
 #include <string>
-#include <boost/scoped_ptr.hpp>
 #include <boost/thread/mutex.hpp>
 #include <sasl/sasl.h>
 
@@ -142,7 +141,7 @@ class SaslAuthProvider : public AuthProvider {
   bool needs_kinit_;
 
   /// Runs "RunKinit" below if needs_kinit_ is true.
-  boost::scoped_ptr<Thread> kinit_thread_;
+  std::unique_ptr<Thread> kinit_thread_;
 
   /// Periodically (roughly once every FLAGS_kerberos_reinit_interval minutes) calls kinit
   /// to get a ticket granting ticket from the kerberos server for principal_, which is

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/authentication.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index 1a56f7c..275963f 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -835,8 +835,8 @@ Status SaslAuthProvider::Start() {
     Promise<Status> first_kinit;
     stringstream thread_name;
     thread_name << "kinit-" << principal_;
-    kinit_thread_.reset(new Thread("authentication", thread_name.str(),
-        &SaslAuthProvider::RunKinit, this, &first_kinit));
+    RETURN_IF_ERROR(Thread::Create("authentication", thread_name.str(),
+        &SaslAuthProvider::RunKinit, this, &first_kinit, &kinit_thread_));
     LOG(INFO) << "Waiting for Kerberos ticket for principal: " << principal_;
     RETURN_IF_ERROR(first_kinit.Get());
     LOG(INFO) << "Kerberos ticket granted to " << principal_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/thrift-server-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index 6ceaefd..ef50160 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -415,6 +415,7 @@ TEST(ConcurrencyTest, DISABLED_ManyConcurrentConnections) {
         Status status = client->Open();
         ASSERT_OK(status);
       });
+  ASSERT_OK(pool.Init());
   for (int i = 0; i < 1024 * 16; ++i) pool.Offer(i);
   pool.DrainAndShutdown();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 467c004..c385a66 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -172,9 +172,9 @@ Status ThriftServer::ThriftServerEventProcessor::StartAndWaitForServer() {
 
   stringstream name;
   name << "supervise-" << thrift_server_->name_;
-  thrift_server_->server_thread_.reset(
-      new Thread("thrift-server", name.str(),
-                 &ThriftServer::ThriftServerEventProcessor::Supervise, this));
+  RETURN_IF_ERROR(Thread::Create("thrift-server", name.str(),
+      &ThriftServer::ThriftServerEventProcessor::Supervise, this,
+      &thrift_server_->server_thread_));
 
   system_time deadline = get_system_time() +
       posix_time::milliseconds(ThriftServer::ThriftServerEventProcessor::TIMEOUT_MS);
@@ -335,7 +335,6 @@ ThriftServer::ThriftServer(const string& name,
     num_worker_threads_(num_worker_threads),
     server_type_(server_type),
     name_(name),
-    server_thread_(NULL),
     server_(NULL),
     processor_(processor),
     connection_handler_(NULL),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/thrift-server.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index 2002f7f..f889a4e 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -209,7 +209,7 @@ class ThriftServer {
   const std::string name_;
 
   /// Thread that runs ThriftServerEventProcessor::Supervise() in a separate loop
-  boost::scoped_ptr<Thread> server_thread_;
+  std::unique_ptr<Thread> server_thread_;
 
   /// Thrift housekeeping
   boost::scoped_ptr<apache::thrift::server::TServer> server_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/thrift-thread.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-thread.cc b/be/src/rpc/thrift-thread.cc
index cdda36b..ef2144d 100644
--- a/be/src/rpc/thrift-thread.cc
+++ b/be/src/rpc/thrift-thread.cc
@@ -20,7 +20,10 @@
 #include <boost/bind.hpp>
 #include <sstream>
 
+#include <thrift/concurrency/Exception.h>
+
 #include "common/names.h"
+#include "common/status.h"
 
 using namespace impala;
 
@@ -29,8 +32,15 @@ namespace atc = apache::thrift::concurrency;
 
 void ThriftThread::start() {
   Promise<atc::Thread::id_t> promise;
-  impala_thread_.reset(new impala::Thread(group_, name_,
-      bind(&ThriftThread::RunRunnable, this, runnable(), &promise)));
+  Status status = impala::Thread::Create(group_, name_,
+      bind(&ThriftThread::RunRunnable, this, runnable(), &promise), &impala_thread_);
+
+  // Thread creation failed. Thrift expects an exception in this case. See
+  // the implementation of atc::PosixThreadFactory.cpp or atc::BoostThreadFactory.cpp.
+  if (!status.ok()) {
+    throw atc::SystemResourceException(
+        Substitute("Thread::Create() failed: $0", status.GetDetail()));
+  }
 
   // Blocks until the thread id has been set
   tid_ = promise.Get();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/rpc/thrift-thread.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-thread.h b/be/src/rpc/thrift-thread.h
index 397703f..14943b2 100644
--- a/be/src/rpc/thrift-thread.h
+++ b/be/src/rpc/thrift-thread.h
@@ -93,7 +93,7 @@ class ThriftThread : public apache::thrift::concurrency::Thread {
 
   /// Impala thread that runs the runnable and registers itself with the global
   /// ThreadManager.
-  boost::scoped_ptr<impala::Thread> impala_thread_;
+  std::unique_ptr<impala::Thread> impala_thread_;
 
   /// Thrift thread ID, set by RunRunnable.
   apache::thrift::concurrency::Thread::id_t tid_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc b/be/src/runtime/data-stream-sender.cc
index 752e3ce..f39a595 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -77,7 +77,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
       dest_node_id_(dest_node_id),
       num_data_bytes_sent_(0),
       rpc_thread_("DataStreamSender", "SenderThread", 1, 1,
-          bind<void>(mem_fn(&Channel::TransmitData), this, _1, _2)),
+          bind<void>(mem_fn(&Channel::TransmitData), this, _1, _2), true),
       rpc_in_flight_(false) {}
 
   // Initialize channel.
@@ -156,6 +156,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
 };
 
 Status DataStreamSender::Channel::Init(RuntimeState* state) {
+  RETURN_IF_ERROR(rpc_thread_.Init());
   runtime_state_ = state;
   // TODO: figure out how to size batch_
   int capacity = max(1, buffer_size_ / max(row_desc_->GetRowSize(), 1));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/disk-io-mgr-handle-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.h b/be/src/runtime/disk-io-mgr-handle-cache.h
index 96add9f..4ba2342 100644
--- a/be/src/runtime/disk-io-mgr-handle-cache.h
+++ b/be/src/runtime/disk-io-mgr-handle-cache.h
@@ -26,6 +26,7 @@
 #include <boost/thread/mutex.hpp>
 
 #include "common/hdfs.h"
+#include "common/status.h"
 #include "util/aligned-new.h"
 #include "util/impalad-metrics.h"
 #include "util/spinlock.h"
@@ -96,7 +97,7 @@ class FileHandleCache {
 
   /// Starts up a thread that monitors the age of file handles and evicts any that
   /// exceed the limit.
-  void Init();
+  Status Init() WARN_UNUSED_RESULT;
 
   /// Get a file handle from the cache for the specified filename (fname) and
   /// last modification time (mtime). This will hash the filename to determine

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/disk-io-mgr-handle-cache.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-handle-cache.inline.h b/be/src/runtime/disk-io-mgr-handle-cache.inline.h
index 76bef95..3068971 100644
--- a/be/src/runtime/disk-io-mgr-handle-cache.inline.h
+++ b/be/src/runtime/disk-io-mgr-handle-cache.inline.h
@@ -69,9 +69,9 @@ FileHandleCache<NUM_PARTITIONS>::~FileHandleCache() {
 }
 
 template <size_t NUM_PARTITIONS>
-void FileHandleCache<NUM_PARTITIONS>::Init() {
-  eviction_thread_.reset(new Thread("disk-io-mgr-handle-cache", "File Handle Timeout",
-      &FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop, this));
+Status FileHandleCache<NUM_PARTITIONS>::Init() {
+  return Thread::Create("disk-io-mgr-handle-cache", "File Handle Timeout",
+      &FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop, this, &eviction_thread_);
 }
 
 template <size_t NUM_PARTITIONS>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index dff6ec5..0fe16d2 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -406,12 +406,14 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
     for (int j = 0; j < num_threads_per_disk; ++j) {
       stringstream ss;
       ss << "work-loop(Disk: " << i << ", Thread: " << j << ")";
-      disk_thread_group_.AddThread(make_unique<Thread>("disk-io-mgr", ss.str(),
-          &DiskIoMgr::WorkLoop, this, disk_queues_[i]));
+      std::unique_ptr<Thread> t;
+      RETURN_IF_ERROR(Thread::Create("disk-io-mgr", ss.str(), &DiskIoMgr::WorkLoop,
+          this, disk_queues_[i], &t));
+      disk_thread_group_.AddThread(move(t));
     }
   }
   request_context_cache_.reset(new RequestContextCache(this));
-  file_handle_cache_.Init();
+  RETURN_IF_ERROR(file_handle_cache_.Init());
 
   cached_read_options_ = hadoopRzOptionsAlloc();
   DCHECK(cached_read_options_ != nullptr);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index fe4825d..3652280 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -216,6 +216,11 @@ Status ExecEnv::InitForFeTests() {
 Status ExecEnv::StartServices() {
   LOG(INFO) << "Starting global services";
 
+  // Initialize thread pools
+  RETURN_IF_ERROR(exec_rpc_thread_pool_->Init());
+  RETURN_IF_ERROR(async_rpc_pool_->Init());
+  RETURN_IF_ERROR(hdfs_op_thread_pool_->Init());
+
   // Initialize global memory limit.
   // Depending on the system configuration, we will have to calculate the process
   // memory limit either based on the available physical memory, or if overcommitting

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 3f5a72f..74f5495 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -226,9 +226,8 @@ Status FragmentInstanceState::Prepare() {
   if (FLAGS_status_report_interval > 0) {
     string thread_name = Substitute("profile-report (finst:$0)", PrintId(instance_id()));
     unique_lock<mutex> l(report_thread_lock_);
-    report_thread_.reset(
-        new Thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
-            [this]() { this->ReportProfileThread(); }));
+    RETURN_IF_ERROR(Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
+        thread_name, [this]() { this->ReportProfileThread(); }, &report_thread_, true));
     // Make sure the thread started up, otherwise ReportProfileThread() might get into
     // a race with StopReportThread().
     while (!report_thread_active_) report_thread_started_cv_.wait(l);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/fragment-instance-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.h b/be/src/runtime/fragment-instance-state.h
index 750f983..fa35c6b 100644
--- a/be/src/runtime/fragment-instance-state.h
+++ b/be/src/runtime/fragment-instance-state.h
@@ -133,7 +133,7 @@ class FragmentInstanceState {
   RuntimeState* runtime_state_ = nullptr;  // lives in obj_pool()
 
   /// profile reporting-related
-  boost::scoped_ptr<Thread> report_thread_;
+  std::unique_ptr<Thread> report_thread_;
   boost::mutex report_thread_lock_;
 
   /// Indicates that profile reporting thread should stop.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/parallel-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/parallel-executor.cc b/be/src/runtime/parallel-executor.cc
index b7b3cc2..f3dd708 100644
--- a/be/src/runtime/parallel-executor.cc
+++ b/be/src/runtime/parallel-executor.cc
@@ -35,8 +35,15 @@ Status ParallelExecutor::Exec(Function function, void** args, int num_args,
   for (int i = 0; i < num_args; ++i) {
     stringstream ss;
     ss << "worker-thread(" << i << ")";
-    worker_threads.AddThread(make_unique<Thread>("parallel-executor", ss.str(),
-        &ParallelExecutor::Worker, function, args[i], &lock, &status, latencies));
+    std::unique_ptr<Thread> t;
+    Status thread_status = Thread::Create("parallel-executor", ss.str(),
+        &ParallelExecutor::Worker, function, args[i], &lock, &status, latencies, &t);
+    if (!thread_status.ok()) {
+      unique_lock<mutex> l(lock);
+      status = thread_status;
+      break;
+    }
+    worker_threads.AddThread(move(t));
   }
   worker_threads.JoinAll();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 901ddbd..071c5dd 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -55,10 +55,18 @@ Status QueryExecMgr::StartQuery(const TExecQueryFInstancesParams& params) {
   }
   // avoid blocking the rpc handler thread for too long by starting a new thread for
   // query startup (which takes ownership of the QueryState reference)
-  Thread t("query-exec-mgr",
+  unique_ptr<Thread> t;
+  status = Thread::Create("query-exec-mgr",
       Substitute("start-query-finstances-$0", PrintId(query_id)),
-      &QueryExecMgr::StartQueryHelper, this, qs);
-  t.Detach();
+          &QueryExecMgr::StartQueryHelper, this, qs, &t, true);
+  if (!status.ok()) {
+    // decrement refcount taken in QueryState::Init()
+    qs->ReleaseInitialReservationRefcount();
+    // decrement refcount taken in GetOrCreateQueryState()
+    ReleaseQueryState(qs);
+    return status;
+  }
+  t->Detach();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 4311e27..4c5eb17 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -291,6 +291,7 @@ void QueryState::StartFInstances() {
   VLOG_QUERY << "descriptor table for query=" << PrintId(query_id())
              << "\n" << desc_tbl_->DebugString();
 
+  Status thread_create_status;
   DCHECK_GT(rpc_params_.fragment_ctxs.size(), 0);
   TPlanFragmentCtx* fragment_ctx = &rpc_params_.fragment_ctxs[0];
   int fragment_ctx_idx = 0;
@@ -305,25 +306,35 @@ void QueryState::StartFInstances() {
     }
     FragmentInstanceState* fis = obj_pool_.Add(
         new FragmentInstanceState(this, *fragment_ctx, instance_ctx));
-    fis_map_.emplace(fis->instance_id(), fis);
-
-    // update fragment_map_
-    vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx];
-    fis_list.push_back(fis);
 
     // start new thread to execute instance
     refcnt_.Add(1);  // decremented in ExecFInstance()
     initial_reservation_refcnt_.Add(1);  // decremented in ExecFInstance()
     string thread_name = Substitute(
         "exec-finstance (finst:$0)", PrintId(instance_ctx.fragment_instance_id));
-    Thread t(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name,
-        [this, fis]() { this->ExecFInstance(fis); });
-    t.Detach();
+    unique_ptr<Thread> t;
+    thread_create_status = Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME,
+        thread_name, [this, fis]() { this->ExecFInstance(fis); }, &t, true);
+    if (!thread_create_status.ok()) {
+      // Undo refcnt increments done immediately prior to Thread::Create(). The
+      // reference counts were both greater than zero before the increments, so
+      // neither of these decrements will free any structures.
+      ReleaseInitialReservationRefcount();
+      ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this);
+      break;
+    }
+    // Fragment instance successfully started
+    fis_map_.emplace(fis->instance_id(), fis);
+    // update fragment_map_
+    vector<FragmentInstanceState*>& fis_list = fragment_map_[instance_ctx.fragment_idx];
+    fis_list.push_back(fis);
+    t->Detach();
   }
 
   // don't return until every instance is prepared and record the first non-OK
-  // (non-CANCELLED if available) status
-  Status prepare_status;
+  // (non-CANCELLED if available) status (including any error from thread creation
+  // above).
+  Status prepare_status = thread_create_status;
   for (auto entry: fis_map_) {
     Status instance_status = entry.second->WaitForPrepare();
     // don't wipe out an error in one instance with the resulting CANCELLED from
@@ -333,6 +344,12 @@ void QueryState::StartFInstances() {
     }
   }
   instances_prepared_promise_.Set(prepare_status);
+  // If this is aborting due to failure in thread creation, report status to the
+  // coordinator to start query cancellation. (Other errors are reported by the
+  // fragment instance itself.)
+  if (!thread_create_status.ok()) {
+    ReportExecStatusAux(true, thread_create_status, nullptr, true);
+  }
 }
 
 void QueryState::ReleaseInitialReservationRefcount() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index fc71772..d9606be 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -155,8 +155,10 @@ class QueryState {
   void ReleaseResources();
 
   /// Sends a ReportExecStatus rpc to the coordinator. If fis == nullptr, the
-  /// status must be an error. If fis is given, expects that fis finished its Prepare
-  /// phase; it then sends a report for that instance, including its profile.
+  /// status must be an error. If fis is given, the content will depend on whether
+  /// the fis has finished its Prepare phase. It sends a report for the instance,
+  /// and it will include the profile if the fis is prepared. If the fis is not
+  /// prepared, the status must be an error.
   /// If there is an error during the rpc, initiates cancellation.
   void ReportExecStatus(bool done, const Status& status, FragmentInstanceState* fis);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/runtime/thread-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/thread-resource-mgr.h b/be/src/runtime/thread-resource-mgr.h
index 8b86dcc..bf601bd 100644
--- a/be/src/runtime/thread-resource-mgr.h
+++ b/be/src/runtime/thread-resource-mgr.h
@@ -111,8 +111,10 @@ class ThreadResourceMgr {
     /// each call to AcquireThreadToken and each successful call to TryAcquireThreadToken
     /// If the thread token is from AcquireThreadToken, required must be true; false
     /// if from TryAcquireThreadToken.
-    /// Must not be called from from ThreadAvailableCb.
-    void ReleaseThreadToken(bool required);
+    /// If 'skip_callbacks' is true, ReleaseThreadToken() will not run callbacks to find
+    /// a replacement for this thread. This is dangerous and can lead to underutilization
+    /// of the system.
+    void ReleaseThreadToken(bool required, bool skip_callbacks = false);
 
     /// Register a callback to be notified when a thread is available.
     /// Returns a unique id to be used when removing the callback.
@@ -266,7 +268,8 @@ inline bool ThreadResourceMgr::ResourcePool::TryAcquireThreadToken(bool* is_rese
   }
 }
 
-inline void ThreadResourceMgr::ResourcePool::ReleaseThreadToken(bool required) {
+inline void ThreadResourceMgr::ResourcePool::ReleaseThreadToken(
+    bool required, bool skip_callbacks) {
   if (required) {
     DCHECK_GT(num_required_threads(), 0);
     __sync_fetch_and_add(&num_threads_, -1);
@@ -282,7 +285,7 @@ inline void ThreadResourceMgr::ResourcePool::ReleaseThreadToken(bool required) {
       }
     }
   }
-  InvokeCallbacks();
+  if (!skip_callbacks) InvokeCallbacks();
 }
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 1aadf22..c23f4be 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -218,12 +218,13 @@ AdmissionController::AdmissionController(StatestoreSubscriber* subscriber,
       metrics_group_(metrics),
       host_id_(TNetworkAddressToString(host_addr)),
       thrift_serializer_(false),
-      done_(false) {
-  dequeue_thread_.reset(new Thread("scheduling", "admission-thread",
-        &AdmissionController::DequeueLoop, this));
-}
+      done_(false) {}
 
 AdmissionController::~AdmissionController() {
+  // If the dequeue thread is not running (e.g. if Init() fails), then there is
+  // nothing to do.
+  if (dequeue_thread_ == nullptr) return;
+
   // The AdmissionController should live for the lifetime of the impalad, but
   // for unit tests we need to ensure that no thread is waiting on the
   // condition variable. This notifies the dequeue thread to stop and waits
@@ -238,6 +239,8 @@ AdmissionController::~AdmissionController() {
 }
 
 Status AdmissionController::Init() {
+  RETURN_IF_ERROR(Thread::Create("scheduling", "admission-thread",
+      &AdmissionController::DequeueLoop, this, &dequeue_thread_));
   StatestoreSubscriber::UpdateCallback cb =
     bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2);
   Status status = subscriber_->AddTopic(IMPALA_REQUEST_QUEUE_TOPIC, true, cb);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 3e49cfb..81b2968 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -219,7 +219,7 @@ class AdmissionController {
   MetricGroup* metrics_group_;
 
   /// Thread dequeuing and admitting queries.
-  boost::scoped_ptr<Thread> dequeue_thread_;
+  std::unique_ptr<Thread> dequeue_thread_;
 
   // The local impalad's host/port id, used to construct topic keys.
   const std::string host_id_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/child-query.cc
----------------------------------------------------------------------
diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc
index f58aacc..21a2b07 100644
--- a/be/src/service/child-query.cc
+++ b/be/src/service/child-query.cc
@@ -152,16 +152,17 @@ ChildQueryExecutor::~ChildQueryExecutor() {
   DCHECK(!is_running_);
 }
 
-void ChildQueryExecutor::ExecAsync(vector<ChildQuery>&& child_queries) {
+Status ChildQueryExecutor::ExecAsync(vector<ChildQuery>&& child_queries) {
   DCHECK(!child_queries.empty());
   lock_guard<SpinLock> lock(lock_);
   DCHECK(child_queries_.empty());
   DCHECK(child_queries_thread_.get() == NULL);
-  if (is_cancelled_) return;
+  if (is_cancelled_) return Status::OK();
   child_queries_ = move(child_queries);
-  child_queries_thread_.reset(new Thread("query-exec-state", "async child queries",
-      bind(&ChildQueryExecutor::ExecChildQueries, this)));
+  RETURN_IF_ERROR(Thread::Create("query-exec-state", "async child queries",
+      bind(&ChildQueryExecutor::ExecChildQueries, this), &child_queries_thread_));
   is_running_ = true;
+  return Status::OK();
 }
 
 void ChildQueryExecutor::ExecChildQueries() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/child-query.h
----------------------------------------------------------------------
diff --git a/be/src/service/child-query.h b/be/src/service/child-query.h
index 36f6197..83bcb58 100644
--- a/be/src/service/child-query.h
+++ b/be/src/service/child-query.h
@@ -19,7 +19,7 @@
 #define IMPALA_SERVICE_CHILD_QUERY_H
 
 #include <string>
-#include <boost/thread.hpp>
+#include <boost/thread/mutex.hpp>
 
 #include "common/status.h"
 #include "impala-server.h"
@@ -156,7 +156,7 @@ class ChildQueryExecutor {
   /// Asynchronously executes 'child_queries' one by one in a new thread. 'child_queries'
   /// must be non-empty. May clear or modify the 'child_queries' arg. Can only be called
   /// once. Does nothing if Cancel() was already called.
-  void ExecAsync(std::vector<ChildQuery>&& child_queries);
+  Status ExecAsync(std::vector<ChildQuery>&& child_queries) WARN_UNUSED_RESULT;
 
   /// Waits for all child queries to complete successfully or with an error. Returns a
   /// non-OK status if a child query fails. Returns OK if ExecAsync() was not called,
@@ -200,7 +200,7 @@ class ChildQueryExecutor {
 
   /// Thread to execute 'child_queries_' in. Immutable after the first time it is set or
   /// after 'is_cancelled_' is true.
-  boost::scoped_ptr<Thread> child_queries_thread_;
+  std::unique_ptr<Thread> child_queries_thread_;
 
   /// The status of the child queries. The status is OK iff all child queries complete
   /// successfully. Otherwise, status contains the error of the first child query that

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 60f799c..2a5b379 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -495,7 +495,9 @@ Status ClientRequestState::ExecDdlRequest() {
           ChildQuery(compute_stats_params.col_stats_query, this, parent_server_));
     }
 
-    if (child_queries.size() > 0) child_query_executor_->ExecAsync(move(child_queries));
+    if (child_queries.size() > 0) {
+      RETURN_IF_ERROR(child_query_executor_->ExecAsync(move(child_queries)));
+    }
     return Status::OK();
   }
 
@@ -592,9 +594,9 @@ Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) {
   return Status::OK();
 }
 
-void ClientRequestState::WaitAsync() {
-  wait_thread_.reset(new Thread(
-      "query-exec-state", "wait-thread", &ClientRequestState::Wait, this));
+Status ClientRequestState::WaitAsync() {
+  return Thread::Create("query-exec-state", "wait-thread",
+      &ClientRequestState::Wait, this, &wait_thread_, true);
 }
 
 void ClientRequestState::BlockOnWait() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 66e206d..6846165 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -78,7 +78,7 @@ class ClientRequestState {
   void Wait();
 
   /// Calls Wait() asynchronously in a thread and returns immediately.
-  void WaitAsync();
+  Status WaitAsync();
 
   /// BlockOnWait() may be called after WaitAsync() has been called in order to wait
   /// for the asynchronous thread (wait_thread_) to complete. It is safe to call this
@@ -250,7 +250,7 @@ class ClientRequestState {
   ExecEnv* exec_env_;
 
   /// Thread for asynchronously running Wait().
-  boost::scoped_ptr<Thread> wait_thread_;
+  std::unique_ptr<Thread> wait_thread_;
 
   /// Condition variable to make BlockOnWait() thread-safe. One thread joins
   /// wait_thread_, and all other threads block on this cv. Used with lock_.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index a170ea1..bcf76b6 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -68,10 +68,14 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
   request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
   // start thread to wait for results to become available, which will allow
   // us to advance query state to FINISHED or EXCEPTION
-  request_state->WaitAsync();
+  Status status = request_state->WaitAsync();
+  if (!status.ok()) {
+    discard_result(UnregisterQuery(request_state->query_id(), false, &status));
+    RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
+  }
   // Once the query is running do a final check for session closure and add it to the
   // set of in-flight queries.
-  Status status = SetQueryInflight(session, request_state);
+  status = SetQueryInflight(session, request_state);
   if (!status.ok()) {
     discard_result(UnregisterQuery(request_state->query_id(), false, &status));
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index f32d04e..da8d606 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -464,21 +464,16 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
         QueryResultSet::CreateHS2ResultSet(
             session->hs2_version, *request_state->result_metadata(), nullptr),
         cache_num_rows);
-    if (!status.ok()) {
-      discard_result(UnregisterQuery(request_state->query_id(), false, &status));
-      HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
-    }
+    if (!status.ok()) goto return_error;
   }
   request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
   // Start thread to wait for results to become available.
-  request_state->WaitAsync();
+  status = request_state->WaitAsync();
+  if (!status.ok()) goto return_error;
   // Once the query is running do a final check for session closure and add it to the
   // set of in-flight queries.
   status = SetQueryInflight(session, request_state);
-  if (!status.ok()) {
-    discard_result(UnregisterQuery(request_state->query_id(), false, &status));
-    HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
-  }
+  if (!status.ok()) goto return_error;
   return_val.__isset.operationHandle = true;
   return_val.operationHandle.__set_operationType(TOperationType::EXECUTE_STATEMENT);
   return_val.operationHandle.__set_hasResultSet(request_state->returns_result_set());
@@ -489,6 +484,11 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
       apache::hive::service::cli::thrift::TStatusCode::SUCCESS_STATUS);
 
   VLOG_QUERY << "ExecuteStatement(): return_val=" << ThriftDebugString(return_val);
+  return;
+
+ return_error:
+  discard_result(UnregisterQuery(request_state->query_id(), false, &status));
+  HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
 }
 
 void ImpalaServer::GetTypeInfo(TGetTypeInfoResp& return_val,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 18582da..7eae5b1 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -383,15 +383,16 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
           "impala-server", "cancellation-worker",
       FLAGS_cancellation_thread_pool_size, MAX_CANCELLATION_QUEUE_SIZE,
       bind<void>(&ImpalaServer::CancelFromThreadPool, this, _1, _2)));
+  ABORT_IF_ERROR(cancellation_thread_pool_->Init());
 
   // Initialize a session expiry thread which blocks indefinitely until the first session
   // with non-zero timeout value is opened. Note that a session which doesn't specify any
   // idle session timeout value will use the default value FLAGS_idle_session_timeout.
-  session_timeout_thread_.reset(new Thread("impala-server", "session-expirer",
-      bind<void>(&ImpalaServer::ExpireSessions, this)));
+  ABORT_IF_ERROR(Thread::Create("impala-server", "session-expirer",
+      bind<void>(&ImpalaServer::ExpireSessions, this), &session_timeout_thread_));
 
-  query_expiration_thread_.reset(new Thread("impala-server", "query-expirer",
-      bind<void>(&ImpalaServer::ExpireQueries, this)));
+  ABORT_IF_ERROR(Thread::Create("impala-server", "query-expirer",
+      bind<void>(&ImpalaServer::ExpireQueries, this), &query_expiration_thread_));
 
   is_coordinator_ = FLAGS_is_coordinator;
   is_executor_ = FLAGS_is_executor;
@@ -448,8 +449,8 @@ Status ImpalaServer::InitLineageLogging() {
   lineage_logger_.reset(new SimpleLogger(FLAGS_lineage_event_log_dir,
       LINEAGE_LOG_FILE_PREFIX, FLAGS_max_lineage_log_file_size));
   RETURN_IF_ERROR(lineage_logger_->Init());
-  lineage_logger_flush_thread_.reset(new Thread("impala-server",
-        "lineage-log-flush", &ImpalaServer::LineageLoggerFlushThread, this));
+  RETURN_IF_ERROR(Thread::Create("impala-server", "lineage-log-flush",
+      &ImpalaServer::LineageLoggerFlushThread, this, &lineage_logger_flush_thread_));
   return Status::OK();
 }
 
@@ -540,8 +541,9 @@ Status ImpalaServer::InitAuditEventLogging() {
   audit_event_logger_.reset(new SimpleLogger(FLAGS_audit_event_log_dir,
      AUDIT_EVENT_LOG_FILE_PREFIX, FLAGS_max_audit_event_log_file_size));
   RETURN_IF_ERROR(audit_event_logger_->Init());
-  audit_event_logger_flush_thread_.reset(new Thread("impala-server",
-        "audit-event-log-flush", &ImpalaServer::AuditEventLoggerFlushThread, this));
+  RETURN_IF_ERROR(Thread::Create("impala-server", "audit-event-log-flush",
+      &ImpalaServer::AuditEventLoggerFlushThread, this,
+      &audit_event_logger_flush_thread_));
   return Status::OK();
 }
 
@@ -600,8 +602,8 @@ Status ImpalaServer::InitProfileLogging() {
       PROFILE_LOG_FILE_PREFIX, FLAGS_max_profile_log_file_size,
       FLAGS_max_profile_log_files));
   RETURN_IF_ERROR(profile_logger_->Init());
-  profile_log_file_flush_thread_.reset(new Thread("impala-server", "log-flush-thread",
-      &ImpalaServer::LogFileFlushThread, this));
+  RETURN_IF_ERROR(Thread::Create("impala-server", "log-flush-thread",
+      &ImpalaServer::LogFileFlushThread, this, &profile_log_file_flush_thread_));
 
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 1e83de5..eb3251c 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -745,13 +745,13 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
   boost::scoped_ptr<SimpleLogger> lineage_logger_;
 
   /// If profile logging is enabled, wakes once every 5s to flush query profiles to disk
-  boost::scoped_ptr<Thread> profile_log_file_flush_thread_;
+  std::unique_ptr<Thread> profile_log_file_flush_thread_;
 
   /// If audit event logging is enabled, wakes once every 5s to flush audit events to disk
-  boost::scoped_ptr<Thread> audit_event_logger_flush_thread_;
+  std::unique_ptr<Thread> audit_event_logger_flush_thread_;
 
   /// If lineage logging is enabled, wakes once every 5s to flush lineage events to disk
-  boost::scoped_ptr<Thread> lineage_logger_flush_thread_;
+  std::unique_ptr<Thread> lineage_logger_flush_thread_;
 
   /// global, per-server state
   ExecEnv* exec_env_;  // not owned
@@ -762,7 +762,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
 
   /// Thread that runs ExpireSessions. It will wake up periodically to check for sessions
   /// which are idle for more their timeout values.
-  boost::scoped_ptr<Thread> session_timeout_thread_;
+  std::unique_ptr<Thread> session_timeout_thread_;
 
   /// Contains all the non-zero idle session timeout values.
   std::multiset<int32_t> session_timeout_set_;
@@ -966,7 +966,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
   ExpirationQueue queries_by_timestamp_;
 
   /// Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set.
-  boost::scoped_ptr<Thread> query_expiration_thread_;
+  std::unique_ptr<Thread> query_expiration_thread_;
 
   /// Serializes TBackendDescriptors when creating topic updates
   ThriftSerializer thrift_serializer_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 8a7961c..53b7d3e 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -97,7 +97,8 @@ int ImpaladMain(int argc, char** argv) {
     ShutdownLogging();
     exit(1);
   }
-  StartMemoryMaintenanceThread(); // Memory metrics are created in StartServices().
+  // Memory metrics are created in StartServices().
+  ABORT_IF_ERROR(StartMemoryMaintenanceThread());
 
   DCHECK(exec_env.process_mem_tracker() != nullptr)
       << "ExecEnv::StartServices() must be called before starting RPC services";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 7cfcaf7..fa839aa 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -224,8 +224,8 @@ Status StatestoreSubscriber::Start() {
   }
 
   // Registration is finished at this point, so it's fine to release the lock.
-  recovery_mode_thread_.reset(new Thread("statestore-subscriber", "recovery-mode-thread",
-      &StatestoreSubscriber::RecoveryModeChecker, this));
+  RETURN_IF_ERROR(Thread::Create("statestore-subscriber", "recovery-mode-thread",
+      &StatestoreSubscriber::RecoveryModeChecker, this, &recovery_mode_thread_));
 
   return status;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 65dcac9..49db5d0 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -143,7 +143,7 @@ class StatestoreSubscriber {
   boost::scoped_ptr<impala::TimeoutFailureDetector> failure_detector_;
 
   /// Thread in which RecoveryModeChecker runs.
-  boost::scoped_ptr<Thread> recovery_mode_thread_;
+  std::unique_ptr<Thread> recovery_mode_thread_;
 
   /// Class-wide lock. Protects all subsequent members. Most private methods must
   /// be called holding this lock; this is noted in the method comments.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 5d7738c..75ba5c7 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -256,6 +256,12 @@ Statestore::Statestore(MetricGroup* metrics)
   heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");
 }
 
+Status Statestore::Init() {
+  RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init());
+  RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
+  return Status::OK();
+}
+
 void Statestore::RegisterWebpages(Webserver* webserver) {
   Webserver::UrlCallback topics_callback =
       bind<void>(mem_fn(&Statestore::TopicsHandler), this, _1, _2);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index b3ba315..05feac3 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -28,6 +28,7 @@
 #include <boost/unordered_map.hpp>
 #include <boost/uuid/uuid_generators.hpp>
 
+#include "common/status.h"
 #include "gen-cpp/StatestoreService.h"
 #include "gen-cpp/StatestoreSubscriber.h"
 #include "gen-cpp/Types_types.h"
@@ -97,6 +98,10 @@ class Statestore : public CacheLineAligned {
   /// The only constructor; initialises member variables only.
   Statestore(MetricGroup* metrics);
 
+  /// Initialize the ThreadPools used for updates and heartbeats. Returns an error if
+  /// ThreadPool initialization fails.
+  Status Init() WARN_UNUSED_RESULT;
+
   /// Registers a new subscriber with the given unique subscriber ID, running a subscriber
   /// service at the given location, with the provided list of topic subscriptions.
   /// The registration_id output parameter is the unique ID for this registration, used to

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/statestore/statestored-main.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestored-main.cc b/be/src/statestore/statestored-main.cc
index 1f06f04..1a11237 100644
--- a/be/src/statestore/statestored-main.cc
+++ b/be/src/statestore/statestored-main.cc
@@ -70,7 +70,7 @@ int StatestoredMain(int argc, char** argv) {
   ABORT_IF_ERROR(
       metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr));
   ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), false, nullptr, nullptr));
-  StartMemoryMaintenanceThread();
+  ABORT_IF_ERROR(StartMemoryMaintenanceThread());
   ABORT_IF_ERROR(
     StartThreadInstrumentation(metrics.get(), webserver.get(), false));
   InitRpcEventTracing(webserver.get());
@@ -81,6 +81,7 @@ int StatestoredMain(int argc, char** argv) {
   CommonMetrics::InitCommonMetrics(metrics.get());
 
   Statestore statestore(metrics.get());
+  ABORT_IF_ERROR(statestore.Init());
   statestore.RegisterWebpages(webserver.get());
   boost::shared_ptr<TProcessor> processor(
       new StatestoreServiceProcessor(statestore.thrift_iface()));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 5b2a4e5..d0036af 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -165,6 +165,7 @@ InProcessStatestore::InProcessStatestore(int statestore_port, int webserver_port
 }
 
 Status InProcessStatestore::Start() {
+  RETURN_IF_ERROR(statestore_->Init());
   RETURN_IF_ERROR(webserver_->Start());
   boost::shared_ptr<TProcessor> processor(
       new StatestoreServiceProcessor(statestore_->thrift_iface()));
@@ -177,8 +178,8 @@ Status InProcessStatestore::Start() {
   ThriftServer* server;
   ABORT_IF_ERROR(builder.metrics(metrics_.get()).Build(&server));
   statestore_server_.reset(server);
-  statestore_main_loop_.reset(
-      new Thread("statestore", "main-loop", &Statestore::MainLoop, statestore_.get()));
+  RETURN_IF_ERROR(Thread::Create("statestore", "main-loop",
+      &Statestore::MainLoop, statestore_.get(), &statestore_main_loop_));
 
   RETURN_IF_ERROR(statestore_server_->Start());
   return WaitForServer("localhost", statestore_port_, 10, 100);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/testutil/in-process-servers.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index d22c441..9e3b2f5 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -141,7 +141,7 @@ class InProcessStatestore {
   /// Statestore Thrift server
   boost::scoped_ptr<ThriftServer> statestore_server_;
 
-  boost::scoped_ptr<Thread> statestore_main_loop_;
+  std::unique_ptr<Thread> statestore_main_loop_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/util/thread-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread-pool-test.cc b/be/src/util/thread-pool-test.cc
index edfe881..a7e4df7 100644
--- a/be/src/util/thread-pool-test.cc
+++ b/be/src/util/thread-pool-test.cc
@@ -46,6 +46,7 @@ TEST(ThreadPoolTest, BasicTest) {
   }
 
   ThreadPool<int> thread_pool("thread-pool", "worker", 5, 250, Count);
+  ASSERT_OK(thread_pool.Init());
   for (int i = 0; i <= OFFERED_RANGE; ++i) {
     ASSERT_TRUE(thread_pool.Offer(i));
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/util/thread-pool.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h
index 800f690..4b5dbf0 100644
--- a/be/src/util/thread-pool.h
+++ b/be/src/util/thread-pool.h
@@ -38,24 +38,23 @@ class ThreadPool : public CacheLineAligned {
   /// process.
   typedef boost::function<void (int thread_id, const T& workitem)> WorkFunction;
 
-  /// Creates a new thread pool and start num_threads threads.
+  /// Creates a new thread pool without starting any threads. Code must call
+  /// Init() on this thread pool before any calls to Offer().
   ///  -- num_threads: how many threads are part of this pool
   ///  -- queue_size: the maximum size of the queue on which work items are offered. If the
   ///     queue exceeds this size, subsequent calls to Offer will block until there is
   ///     capacity available.
   ///  -- work_function: the function to run every time an item is consumed from the queue
+  ///  -- fault_injection_eligible - If set to true, allow fault injection at this
+  ///     callsite (see thread_creation_fault_injection). If set to false, fault
+  ///     injection is diabled at this callsite. Thread creation sites that crash
+  ///     Impala or abort startup must have this set to false.
   ThreadPool(const std::string& group, const std::string& thread_prefix,
-      uint32_t num_threads, uint32_t queue_size, const WorkFunction& work_function)
-    : work_function_(work_function),
-      work_queue_(queue_size),
-      shutdown_(false) {
-    for (int i = 0; i < num_threads; ++i) {
-      std::stringstream threadname;
-      threadname << thread_prefix << "(" << i + 1 << ":" << num_threads << ")";
-      threads_.AddThread(std::make_unique<Thread>(group, threadname.str(),
-          boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i)));
-    }
-  }
+      uint32_t num_threads, uint32_t queue_size, const WorkFunction& work_function,
+      bool fault_injection_eligible = false)
+    : group_(group), thread_prefix_(thread_prefix), num_threads_(num_threads),
+      work_function_(work_function), work_queue_(queue_size),
+      fault_injection_eligible_(fault_injection_eligible) {}
 
   /// Destructor ensures that all threads are terminated before this object is freed
   /// (otherwise they may continue to run and reference member variables)
@@ -64,8 +63,32 @@ class ThreadPool : public CacheLineAligned {
     Join();
   }
 
+  /// Create the threads needed for this ThreadPool. Returns an error on any
+  /// error spawning the threads.
+  Status Init() {
+    for (int i = 0; i < num_threads_; ++i) {
+      std::stringstream threadname;
+      threadname << thread_prefix_ << "(" << i + 1 << ":" << num_threads_ << ")";
+      std::unique_ptr<Thread> t;
+      Status status = Thread::Create(group_, threadname.str(),
+          boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i), &t,
+          fault_injection_eligible_);
+      if (!status.ok()) {
+        // The thread pool initialization failed. Shutdown any threads that were
+        // spawned. Note: Shutdown() and Join() are safe to call multiple times.
+        Shutdown();
+        Join();
+        return status;
+      }
+      threads_.AddThread(std::move(t));
+    }
+    initialized_ = true;
+    return Status::OK();
+  }
+
   /// Blocking operation that puts a work item on the queue. If the queue is full, blocks
-  /// until there is capacity available.
+  /// until there is capacity available. The ThreadPool must be initialized before
+  /// calling this method.
   //
   /// 'work' is copied into the work queue, but may be referenced at any time in the
   /// future. Therefore the caller needs to ensure that any data referenced by work (if T
@@ -77,6 +100,7 @@ class ThreadPool : public CacheLineAligned {
   /// (which typically means that the thread pool has already been shut down).
   template <typename V>
   bool Offer(V&& work) {
+    DCHECK(initialized_);
     return work_queue_.BlockingPut(std::forward<V>(work));
   }
 
@@ -108,6 +132,8 @@ class ThreadPool : public CacheLineAligned {
   void DrainAndShutdown() {
     {
       boost::unique_lock<boost::mutex> l(lock_);
+      // If the ThreadPool is not initialized, then the queue must be empty.
+      DCHECK(initialized_ || work_queue_.Size() == 0);
       while (work_queue_.Size() != 0) {
         empty_cv_.wait(l);
       }
@@ -141,6 +167,15 @@ class ThreadPool : public CacheLineAligned {
     return shutdown_;
   }
 
+  /// Group string to tag threads for this pool
+  const std::string group_;
+
+  /// Thread name prefix
+  const std::string thread_prefix_;
+
+  /// The number of threads to start in this pool
+  uint32_t num_threads_;
+
   /// User-supplied method to call to process each work item.
   WorkFunction work_function_;
 
@@ -148,14 +183,21 @@ class ThreadPool : public CacheLineAligned {
   /// FIFO order.
   BlockingQueue<T> work_queue_;
 
+  /// Whether this ThreadPool will tolerate failure by aborting a query. This means
+  /// it is safe to inject errors for Init().
+  bool fault_injection_eligible_;
+
   /// Collection of worker threads that process work from the queue.
   ThreadGroup threads_;
 
   /// Guards shutdown_ and empty_cv_
   boost::mutex lock_;
 
+  /// Set to true when Init() has finished spawning the threads.
+  bool initialized_ = false;
+
   /// Set to true when threads should stop doing work and terminate.
-  bool shutdown_;
+  bool shutdown_ = false;
 
   /// Signalled when the queue becomes empty
   boost::condition_variable empty_cv_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/e993b971/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index 0e08ab1..cd38c9a 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -33,6 +33,10 @@
 
 #include "common/names.h"
 
+#ifndef NDEBUG
+DECLARE_bool(thread_creation_fault_injection);
+#endif
+
 namespace this_thread = boost::this_thread;
 using namespace rapidjson;
 
@@ -286,22 +290,41 @@ void ThreadMgr::ThreadGroupUrlCallback(const Webserver::ArgumentMap& args,
   document->AddMember("threads", lst, document->GetAllocator());
 }
 
-void Thread::StartThread(const ThreadFunctor& functor) {
+Status Thread::StartThread(const std::string& category, const std::string& name,
+    const ThreadFunctor& functor, unique_ptr<Thread>* thread,
+    bool fault_injection_eligible) {
   DCHECK(thread_manager.get() != nullptr)
       << "Thread created before InitThreading called";
-  DCHECK(tid_ == UNINITIALISED_THREAD_ID) << "StartThread called twice";
+  DCHECK(thread->get() == nullptr);
+
+#ifndef NDEBUG
+  if (fault_injection_eligible && FLAGS_thread_creation_fault_injection) {
+    // Fail roughly 1% of the time on eligible codepaths.
+    if ((rand() % 100) == 1) {
+      return Status(Substitute("Fake thread creation failure (category: $0, name: $1)",
+          category, name));
+    }
+  }
+#endif
 
+  unique_ptr<Thread> t(new Thread(category, name));
   Promise<int64_t> thread_started;
-  thread_.reset(
-      new thread(&Thread::SuperviseThread, name_, category_, functor, &thread_started));
-
+  try {
+    t->thread_.reset(
+        new boost::thread(&Thread::SuperviseThread, t->name_, t->category_, functor,
+            &thread_started));
+  } catch (boost::thread_resource_error& e) {
+    return Status(TErrorCode::THREAD_CREATION_FAILED, name, category, e.what());
+  }
   // TODO: This slows down thread creation although not enormously. To make this faster,
   // consider delaying thread_started.Get() until the first call to tid(), but bear in
   // mind that some coordination is required between SuperviseThread() and this to make
   // sure that the thread is still available to have its tid set.
-  tid_ = thread_started.Get();
+  t->tid_ = thread_started.Get();
 
-  VLOG(2) << "Started thread " << tid_ << " - " << category_ << ":" << name_;
+  VLOG(2) << "Started thread " << t->tid() << " - " << category << ":" << name;
+  *thread = move(t);
+  return Status::OK();
 }
 
 void Thread::SuperviseThread(const string& name, const string& category,
@@ -330,7 +353,7 @@ void Thread::SuperviseThread(const string& name, const string& category,
   thread_mgr_ref->RemoveThread(this_thread::get_id(), category_copy);
 }
 
-void ThreadGroup::AddThread(unique_ptr<Thread> thread) {
+void ThreadGroup::AddThread(unique_ptr<Thread>&& thread) {
   threads_.emplace_back(move(thread));
 }