You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/18 15:12:47 UTC

[1/9] incubator-impala git commit: IMPALA-4039: Increase width of Operator column in runtime profile

Repository: incubator-impala
Updated Branches:
  refs/heads/master c7db60aa4 -> dc2f69e5a


IMPALA-4039: Increase width of Operator column in runtime profile

This patch changes the maximum column width in impala::PrintExecSummary
from 30 to 1000. It affects:
1. Summary page in web UI
2. "ExecSummary" section in runtime profile given by
    1). Profile command in impala-shell
    2). Profile page in web UI
Summary command in impala-shell is unaffected: there isn't a width limit
on client side.

Change-Id: I1ae559913a98e32f77782161aa2b76e7c8a5dabd
Reviewed-on: http://gerrit.cloudera.org:8080/7691
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/da60a9a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/da60a9a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/da60a9a1

Branch: refs/heads/master
Commit: da60a9a15ab21fd2569a90e1e91df56ae2640819
Parents: c7db60a
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Wed Aug 16 16:05:13 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Aug 17 05:48:02 2017 +0000

----------------------------------------------------------------------
 be/src/util/summary-util.cc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/da60a9a1/be/src/util/summary-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/summary-util.cc b/be/src/util/summary-util.cc
index 5061efe..01b3d7e 100644
--- a/be/src/util/summary-util.cc
+++ b/be/src/util/summary-util.cc
@@ -113,7 +113,7 @@ string impala::PrintExecSummary(const TExecSummary& exec_summary) {
   if (!exec_summary.__isset.nodes) return "";
 
   TablePrinter printer;
-  printer.set_max_output_width(30);
+  printer.set_max_output_width(1000);
   printer.AddColumn("Operator", true);
   printer.AddColumn("#Hosts", false);
   printer.AddColumn("Avg Time", false);


[8/9] incubator-impala git commit: IMPALA-5677: limit clean page memory consumption

Posted by ta...@apache.org.
IMPALA-5677: limit clean page memory consumption

Adds the following flag:

  -buffer_pool_clean_pages_limit ((Advanced) Limit on bytes of clean spilled
    pages that will be accumulated in the buffer pool. Specified as number of
    bytes ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes
    ('<float>[gG]'), or percentage of the buffer pool limit ('<int>%').
    Defaults to bytes if no unit is given..) type: string default: "10%"

This helps prevent Impala accumulating excessive amounts of clean pages,
which can cause some problems in practice:
* The OS buffer cache is squeezed, reducing I/O performance from HDFS
  and potentially reducing the ability of the OS to absorb writes from
  Impala without blocking.
* Impala process memory consumption can expand more than users or admins
  might expect. E.g. if one query is running with a mem_limit of 1GB,
  many people will be surprised if the process inflates to the full
  process limit of 100GB. Impala doesn't provide any guarantees except
  from staying within the process mem_limit, but this could be a
  surprising divergence from past behaviour.

Observability:
A new metric buffer-pool.clean-pages-limit is added.

Testing:
Added a backend test to directly test that clean pages are evicted.
Ran in a loop to flush out any flakiness.

Ran exhaustive tests.

Change-Id: Ib6b687ab4bdddf07d9d6c997ff814aa3976042f9
Reviewed-on: http://gerrit.cloudera.org:8080/7653
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/039255a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/039255a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/039255a6

Branch: refs/heads/master
Commit: 039255a6ad01be785fb7b5ee02758dd125218ba9
Parents: 79fba27
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Aug 9 17:40:46 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Aug 18 04:22:32 2017 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc                   |   6 +
 .../runtime/bufferpool/buffer-allocator-test.cc |   7 +-
 be/src/runtime/bufferpool/buffer-allocator.cc   | 110 ++++++++++------
 be/src/runtime/bufferpool/buffer-allocator.h    |  21 +++-
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 124 ++++++++++++++-----
 be/src/runtime/bufferpool/buffer-pool.cc        |  10 +-
 be/src/runtime/bufferpool/buffer-pool.h         |   8 +-
 be/src/runtime/bufferpool/suballocator-test.cc  |   2 +-
 be/src/runtime/exec-env.cc                      |  15 ++-
 be/src/runtime/exec-env.h                       |   2 +-
 be/src/runtime/test-env.cc                      |   3 +-
 be/src/util/memory-metrics.cc                   |   7 ++
 be/src/util/memory-metrics.h                    |   2 +
 common/thrift/metrics.json                      |  10 ++
 14 files changed, 243 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 98417f2..5d61edc 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -57,6 +57,12 @@ static const string buffer_pool_limit_help_msg = "(Advanced) Limit on buffer poo
     "The default value and behaviour of this flag may change between releases.";
 DEFINE_string(buffer_pool_limit, "80%", buffer_pool_limit_help_msg.c_str());
 
+static const string buffer_pool_clean_pages_limit_help_msg = "(Advanced) Limit on bytes "
+    "of clean pages that will be accumulated in the buffer pool. "
+     + Substitute(MEM_UNITS_HELP_MSG, "the buffer pool limit") + ".";
+DEFINE_string(buffer_pool_clean_pages_limit, "10%",
+    buffer_pool_clean_pages_limit_help_msg.c_str());
+
 DEFINE_int64(min_buffer_size, 64 * 1024,
     "(Advanced) The minimum buffer size to use in the buffer pool");
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/buffer-allocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator-test.cc b/be/src/runtime/bufferpool/buffer-allocator-test.cc
index 7ec64ad..6427648 100644
--- a/be/src/runtime/bufferpool/buffer-allocator-test.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator-test.cc
@@ -39,7 +39,7 @@ using BufferHandle = BufferPool::BufferHandle;
 class BufferAllocatorTest : public ::testing::Test {
  public:
   virtual void SetUp() {
-    dummy_pool_ = obj_pool_.Add(new BufferPool(1, 0));
+    dummy_pool_ = obj_pool_.Add(new BufferPool(1, 0, 0));
     dummy_reservation_.InitRootTracker(nullptr, 0);
     ASSERT_OK(dummy_pool_->RegisterClient("", nullptr, &dummy_reservation_, nullptr, 0,
         obj_pool_.Add(new RuntimeProfile(&obj_pool_, "")), &dummy_client_));
@@ -73,7 +73,8 @@ class BufferAllocatorTest : public ::testing::Test {
 // Confirm that ASAN will catch use-after-free errors, even if the BufferAllocator caches
 // returned memory.
 TEST_F(BufferAllocatorTest, Poisoning) {
-  BufferAllocator allocator(dummy_pool_, TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN);
+  BufferAllocator allocator(
+      dummy_pool_, TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN);
   BufferHandle buffer;
   ASSERT_OK(allocator.Allocate(&dummy_client_, TEST_BUFFER_LEN, &buffer));
   uint8_t* data = buffer.data();
@@ -94,7 +95,7 @@ TEST_F(BufferAllocatorTest, FreeListSizes) {
   const int NUM_BUFFERS = 512;
   const int64_t TOTAL_BYTES = NUM_BUFFERS * TEST_BUFFER_LEN;
 
-  BufferAllocator allocator(dummy_pool_, TEST_BUFFER_LEN, TOTAL_BYTES);
+  BufferAllocator allocator(dummy_pool_, TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES);
 
   // Allocate a bunch of buffers - all free list checks should miss.
   vector<BufferHandle> buffers(NUM_BUFFERS);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/buffer-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.cc b/be/src/runtime/bufferpool/buffer-allocator.cc
index 0f05cf3..09c2623 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.cc
+++ b/be/src/runtime/bufferpool/buffer-allocator.cc
@@ -34,6 +34,12 @@ DECLARE_bool(disable_mem_pools);
 
 namespace impala {
 
+/// Decrease 'bytes_remaining' by up to 'max_decrease', down to a minimum of 0.
+/// If 'require_full_decrease' is true, only decrease if we can decrease it
+/// 'max_decrease'. Returns the amount it was decreased by.
+static int64_t DecreaseBytesRemaining(
+    int64_t max_decrease, bool require_full_decrease, AtomicInt64* bytes_remaining);
+
 /// An arena containing free buffers and clean pages that are associated with a
 /// particular core. All public methods are thread-safe.
 class BufferPool::FreeBufferArena : public CacheLineAligned {
@@ -98,10 +104,6 @@ class BufferPool::FreeBufferArena : public CacheLineAligned {
   /// it doesn't acquire the arena lock.
   int64_t GetNumCleanPages();
 
-  /// Return the total bytes of clean pages in the arena. May be approximate since
-  /// it doesn't acquire the arena lock.
-  int64_t GetCleanPageBytes();
-
   string DebugString();
 
  private:
@@ -109,6 +111,15 @@ class BufferPool::FreeBufferArena : public CacheLineAligned {
   /// All members are protected by FreeBufferArena::lock_ unless otherwise mentioned.
   struct PerSizeLists {
     PerSizeLists() : num_free_buffers(0), low_water_mark(0), num_clean_pages(0) {}
+
+    /// Helper to add a free buffer and increment the counter.
+    /// FreeBufferArena::lock_ must be held by the caller.
+    void AddFreeBuffer(BufferHandle&& buffer) {
+      DCHECK_EQ(num_free_buffers.Load(), free_buffers.Size());
+      num_free_buffers.Add(1);
+      free_buffers.AddFreeBuffer(move(buffer));
+    }
+
     /// The number of entries in 'free_buffers'. Can be read without holding a lock to
     /// allow threads to quickly skip over empty lists when trying to find a buffer.
     AtomicInt64 num_free_buffers;
@@ -170,7 +181,8 @@ int64_t BufferPool::BufferAllocator::CalcMaxBufferLen(
 }
 
 BufferPool::BufferAllocator::BufferAllocator(
-    BufferPool* pool, int64_t min_buffer_len, int64_t system_bytes_limit)
+    BufferPool* pool, int64_t min_buffer_len, int64_t system_bytes_limit,
+    int64_t clean_page_bytes_limit)
   : pool_(pool),
     system_allocator_(new SystemAllocator(min_buffer_len)),
     min_buffer_len_(min_buffer_len),
@@ -179,6 +191,8 @@ BufferPool::BufferAllocator::BufferAllocator(
     log_max_buffer_len_(BitUtil::Log2Ceiling64(max_buffer_len_)),
     system_bytes_limit_(system_bytes_limit),
     system_bytes_remaining_(system_bytes_limit),
+    clean_page_bytes_limit_(clean_page_bytes_limit),
+    clean_page_bytes_remaining_(clean_page_bytes_limit),
     per_core_arenas_(CpuInfo::GetMaxNumCores()),
     max_scavenge_attempts_(MAX_SCAVENGE_ATTEMPTS) {
   DCHECK(BitUtil::IsPowerOf2(min_buffer_len_)) << min_buffer_len_;
@@ -197,6 +211,7 @@ BufferPool::BufferAllocator::~BufferAllocator() {
   per_core_arenas_.clear(); // Release all the memory.
   // Check for accounting leaks.
   DCHECK_EQ(system_bytes_limit_, system_bytes_remaining_.Load());
+  DCHECK_EQ(clean_page_bytes_limit_, clean_page_bytes_remaining_.Load());
 }
 
 Status BufferPool::BufferAllocator::Allocate(
@@ -231,7 +246,7 @@ Status BufferPool::BufferAllocator::AllocateInternal(int64_t len, BufferHandle*
   if (current_core_arena->PopFreeBuffer(len, buffer)) return Status::OK();
 
   // Fast-ish path: allocate a new buffer if there is room in 'system_bytes_remaining_'.
-  int64_t delta = DecreaseSystemBytesRemaining(len, true);
+  int64_t delta = DecreaseBytesRemaining(len, true, &system_bytes_remaining_);
   if (delta != len) {
     DCHECK_EQ(0, delta);
     const vector<int>& numa_node_cores = CpuInfo::GetCoresOfSameNumaNode(current_core);
@@ -285,14 +300,14 @@ Status BufferPool::BufferAllocator::AllocateInternal(int64_t len, BufferHandle*
   return Status::OK();
 }
 
-int64_t BufferPool::BufferAllocator::DecreaseSystemBytesRemaining(
-    int64_t max_decrease, bool require_full_decrease) {
+int64_t DecreaseBytesRemaining(
+    int64_t max_decrease, bool require_full_decrease, AtomicInt64* bytes_remaining) {
   while (true) {
-    int64_t old_value = system_bytes_remaining_.Load();
+    int64_t old_value = bytes_remaining->Load();
     if (require_full_decrease && old_value < max_decrease) return 0;
     int64_t decrease = min(old_value, max_decrease);
     int64_t new_value = old_value - decrease;
-    if (system_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
+    if (bytes_remaining->CompareAndSwap(old_value, new_value)) {
       return decrease;
     }
   }
@@ -311,11 +326,12 @@ int64_t BufferPool::BufferAllocator::ScavengeBuffers(
   //    examined (or from 'system_bytes_available_') because in order to do so, it would
   //    have had to return the equivalent amount of memory to an earlier arena or added
   //    it back into 'systems_bytes_reamining_'. The former can't happen since we're
-  //    still holding those locks, and the latter is solved by trying
-  //    DecreaseSystemBytesRemaining() at the end.
+  //    still holding those locks, and the latter is solved by trying to decrease
+  //    system_bytes_remaining_ with DecreaseBytesRemaining() at the end.
   DCHECK_GT(target_bytes, 0);
   // First make sure we've used up all the headroom in the buffer limit.
-  int64_t bytes_found = DecreaseSystemBytesRemaining(target_bytes, false);
+  int64_t bytes_found =
+      DecreaseBytesRemaining(target_bytes, false, &system_bytes_remaining_);
   if (bytes_found == target_bytes) return bytes_found;
 
   // In 'slow_but_sure' mode, we will hold locks for multiple arenas at the same time and
@@ -341,7 +357,8 @@ int64_t BufferPool::BufferAllocator::ScavengeBuffers(
   // thread holds the lock while decrementing 'system_bytes_remaining_' in the cases
   // where it may not have reservation corresponding to that memory.
   if (slow_but_sure && bytes_found < target_bytes) {
-    bytes_found += DecreaseSystemBytesRemaining(target_bytes - bytes_found, true);
+    bytes_found += DecreaseBytesRemaining(
+        target_bytes - bytes_found, true, &system_bytes_remaining_);
     DCHECK_EQ(bytes_found, target_bytes) << DebugString();
   }
   return bytes_found;
@@ -428,15 +445,21 @@ int64_t BufferPool::BufferAllocator::GetNumCleanPages() const {
   return SumOverArenas([](FreeBufferArena* arena) { return arena->GetNumCleanPages(); });
 }
 
+int64_t BufferPool::BufferAllocator::GetCleanPageBytesLimit() const {
+  return clean_page_bytes_limit_;
+}
+
 int64_t BufferPool::BufferAllocator::GetCleanPageBytes() const {
-  return SumOverArenas([](FreeBufferArena* arena) { return arena->GetCleanPageBytes(); });
+  return clean_page_bytes_limit_ - clean_page_bytes_remaining_.Load();
 }
 
 string BufferPool::BufferAllocator::DebugString() {
   stringstream ss;
   ss << "<BufferAllocator> " << this << " min_buffer_len: " << min_buffer_len_
      << " system_bytes_limit: " << system_bytes_limit_
-     << " system_bytes_remaining: " << system_bytes_remaining_.Load() << "\n";
+     << " system_bytes_remaining: " << system_bytes_remaining_.Load() << "\n"
+     << " clean_page_bytes_limit: " << clean_page_bytes_limit_
+     << " clean_page_bytes_remaining: " << clean_page_bytes_remaining_.Load() << "\n";
   for (int i = 0; i < per_core_arenas_.size(); ++i) {
     ss << "  Arena " << i << " " << per_core_arenas_[i]->DebugString() << "\n";
   }
@@ -466,10 +489,7 @@ void BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) {
     return;
   }
   PerSizeLists* lists = GetListsForSize(buffer.len());
-  FreeList* list = &lists->free_buffers;
-  DCHECK_EQ(lists->num_free_buffers.Load(), list->Size());
-  lists->num_free_buffers.Add(1);
-  list->AddFreeBuffer(move(buffer));
+  lists->AddFreeBuffer(move(buffer));
 }
 
 bool BufferPool::FreeBufferArena::RemoveCleanPage(bool claim_buffer, Page* page) {
@@ -478,14 +498,14 @@ bool BufferPool::FreeBufferArena::RemoveCleanPage(bool claim_buffer, Page* page)
   DCHECK_EQ(lists->num_clean_pages.Load(), lists->clean_pages.size());
   if (!lists->clean_pages.Remove(page)) return false;
   lists->num_clean_pages.Add(-1);
+  parent_->clean_page_bytes_remaining_.Add(page->len);
   if (!claim_buffer) {
     BufferHandle buffer;
     {
       lock_guard<SpinLock> pl(page->buffer_lock);
       buffer = move(page->buffer);
     }
-    lists->free_buffers.AddFreeBuffer(move(buffer));
-    lists->num_free_buffers.Add(1);
+    lists->AddFreeBuffer(move(buffer));
   }
   return true;
 }
@@ -517,6 +537,7 @@ bool BufferPool::FreeBufferArena::EvictCleanPage(
   Page* page = lists->clean_pages.Dequeue();
   if (page == nullptr) return false;
   lists->num_clean_pages.Add(-1);
+  parent_->clean_page_bytes_remaining_.Add(buffer_len);
   lock_guard<SpinLock> pl(page->buffer_lock);
   *buffer = move(page->buffer);
   return true;
@@ -560,10 +581,11 @@ pair<int64_t, int64_t> BufferPool::FreeBufferArena::FreeSystemMemory(
     // Evict clean pages by moving their buffers to the free page list before freeing
     // them. This ensures that they are freed based on memory address in the expected
     // order.
+    int num_pages_evicted = 0;
+    int64_t page_bytes_evicted = 0;
     while (bytes_freed + buffer_bytes_to_free < target_bytes_to_free) {
       Page* page = clean_pages->Dequeue();
       if (page == nullptr) break;
-      lists->num_clean_pages.Add(-1);
       BufferHandle page_buffer;
       {
         lock_guard<SpinLock> pl(page->buffer_lock);
@@ -571,9 +593,14 @@ pair<int64_t, int64_t> BufferPool::FreeBufferArena::FreeSystemMemory(
       }
       ++buffers_to_free;
       buffer_bytes_to_free += page_buffer.len();
+      ++num_pages_evicted;
+      page_bytes_evicted += page_buffer.len();
       free_buffers->AddFreeBuffer(move(page_buffer));
-      lists->num_free_buffers.Add(1);
     }
+    lists->num_free_buffers.Add(num_pages_evicted);
+    lists->num_clean_pages.Add(-num_pages_evicted);
+    parent_->clean_page_bytes_remaining_.Add(page_bytes_evicted);
+
     if (buffers_to_free > 0) {
       int64_t buffer_bytes_freed =
           parent_->FreeToSystem(free_buffers->GetBuffersToFree(buffers_to_free));
@@ -598,16 +625,31 @@ pair<int64_t, int64_t> BufferPool::FreeBufferArena::FreeSystemMemory(
 }
 
 void BufferPool::FreeBufferArena::AddCleanPage(Page* page) {
-  if (FLAGS_disable_mem_pools) {
-    // Immediately evict the page.
-    AddFreeBuffer(move(page->buffer));
-    return;
-  }
+  bool eviction_needed = FLAGS_disable_mem_pools
+    || DecreaseBytesRemaining(
+        page->len, true, &parent_->clean_page_bytes_remaining_) == 0;
   lock_guard<SpinLock> al(lock_);
   PerSizeLists* lists = GetListsForSize(page->len);
   DCHECK_EQ(lists->num_clean_pages.Load(), lists->clean_pages.size());
-  lists->clean_pages.Enqueue(page);
-  lists->num_clean_pages.Add(1);
+  if (eviction_needed) {
+    if (lists->clean_pages.empty()) {
+      // No other pages to evict, must evict 'page' instead of adding it.
+      lists->AddFreeBuffer(move(page->buffer));
+    } else {
+      // Evict an older page (FIFO eviction) to make space for this one.
+      Page* page_to_evict = lists->clean_pages.Dequeue();
+      lists->clean_pages.Enqueue(page);
+      BufferHandle page_to_evict_buffer;
+      {
+        lock_guard<SpinLock> pl(page_to_evict->buffer_lock);
+        page_to_evict_buffer = move(page_to_evict->buffer);
+      }
+      lists->AddFreeBuffer(move(page_to_evict_buffer));
+    }
+  } else {
+    lists->clean_pages.Enqueue(page);
+    lists->num_clean_pages.Add(1);
+  }
 }
 
 void BufferPool::FreeBufferArena::Maintenance() {
@@ -663,12 +705,6 @@ int64_t BufferPool::FreeBufferArena::GetNumCleanPages() {
   });
 }
 
-int64_t BufferPool::FreeBufferArena::GetCleanPageBytes() {
-  return SumOverSizes([](PerSizeLists* lists, int64_t buffer_size) {
-    return lists->num_clean_pages.Load() * buffer_size;
-  });
-}
-
 string BufferPool::FreeBufferArena::DebugString() {
   lock_guard<SpinLock> al(lock_);
   stringstream ss;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-allocator.h b/be/src/runtime/bufferpool/buffer-allocator.h
index b360f38..ab4feb6 100644
--- a/be/src/runtime/bufferpool/buffer-allocator.h
+++ b/be/src/runtime/bufferpool/buffer-allocator.h
@@ -79,7 +79,8 @@ namespace impala {
 ///
 class BufferPool::BufferAllocator {
  public:
-  BufferAllocator(BufferPool* pool, int64_t min_buffer_len, int64_t system_bytes_limit);
+  BufferAllocator(BufferPool* pool, int64_t min_buffer_len, int64_t system_bytes_limit,
+      int64_t clean_page_bytes_limit);
   ~BufferAllocator();
 
   /// Allocate a buffer with a power-of-two length 'len'. This function may acquire
@@ -135,6 +136,9 @@ class BufferPool::BufferAllocator {
   /// Return the total bytes of free buffers in the allocator.
   int64_t GetFreeBufferBytes() const;
 
+  /// Return the limit on bytes of clean pages in the allocator.
+  int64_t GetCleanPageBytesLimit() const;
+
   /// Return the total number of clean pages in the allocator.
   int64_t GetNumCleanPages() const;
 
@@ -169,11 +173,6 @@ class BufferPool::BufferAllocator {
   Status AllocateInternal(
       int64_t len, BufferPool::BufferHandle* buffer) WARN_UNUSED_RESULT;
 
-  /// Decrease 'system_bytes_remaining_' by up to 'max_decrease', down to a minimum of 0.
-  /// If 'require_full_decrease' is true, only decrease if we can decrease it
-  /// 'max_decrease'. Returns the amount it was decreased by.
-  int64_t DecreaseSystemBytesRemaining(int64_t max_decrease, bool require_full_decrease);
-
   /// Tries to reclaim enough memory from various sources so that the caller can allocate
   /// a buffer of 'target_bytes' from the system allocator. Scavenges buffers from the
   /// free buffer and clean page lists of all cores and frees them with
@@ -219,6 +218,16 @@ class BufferPool::BufferAllocator {
   /// allocated or after an existing buffer is freed with the system allocator.
   AtomicInt64 system_bytes_remaining_;
 
+  /// The maximum bytes of clean pages that can accumulate across all arenas before
+  /// they will be evicted.
+  const int64_t clean_page_bytes_limit_;
+
+  /// The number of bytes of 'clean_page_bytes_limit_' not used by clean pages. I.e.
+  /// (clean_page_bytes_limit - bytes of clean pages in the BufferAllocator).
+  /// 'clean_pages_bytes_limit_' is enforced by increasing this value before a
+  /// clean page is added and decreasing it after a clean page is reclaimed or evicted.
+  AtomicInt64 clean_page_bytes_remaining_;
+
   /// Free and clean pages. One arena per core.
   std::vector<std::unique_ptr<FreeBufferArena>> per_core_arenas_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index b2f8695..61d7a39 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -367,6 +367,7 @@ class BufferPoolTest : public ::testing::Test {
   /// Parameterised test implementations.
   void TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core);
   void TestEvictionPolicy(int64_t page_size);
+  void TestCleanPageLimit(int max_clean_pages, bool randomize_core);
   void TestQueryTeardown(bool write_error);
   void TestWriteError(int write_delay_ms);
   void TestRandomInternalSingle(int64_t buffer_len, bool multiple_pins);
@@ -461,7 +462,7 @@ TEST_F(BufferPoolTest, BasicRegistration) {
   int64_t total_mem = sum_initial_reservations * num_concurrent_queries;
   global_reservations_.InitRootTracker(NewProfile(), total_mem);
 
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
 
   RegisterQueriesAndClients(&pool, 0, num_concurrent_queries, sum_initial_reservations,
       reservation_limit, &rng_);
@@ -484,7 +485,7 @@ TEST_F(BufferPoolTest, ConcurrentRegistration) {
   int64_t total_mem = num_concurrent_queries * sum_initial_reservations;
   global_reservations_.InitRootTracker(NewProfile(), total_mem);
 
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   vector<mt19937> thread_rngs = RandTestUtil::CreateThreadLocalRngs(num_threads, &rng_);
   // Launch threads, each with a different set of query IDs.
   thread_group workers;
@@ -508,7 +509,7 @@ TEST_F(BufferPoolTest, PageCreation) {
   int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1);
   int64_t total_mem = 2 * 2 * max_page_len;
   global_reservations_.InitRootTracker(NULL, total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
       total_mem, NewProfile(), &client));
@@ -553,7 +554,7 @@ TEST_F(BufferPoolTest, BufferAllocation) {
   int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1);
   int64_t total_mem = 2 * 2 * max_buffer_len;
   global_reservations_.InitRootTracker(NULL, total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
       total_mem, NewProfile(), &client));
@@ -604,7 +605,7 @@ TEST_F(BufferPoolTest, CleanPageStats) {
   const int MAX_NUM_BUFFERS = 4;
   const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
 
   ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -647,13 +648,77 @@ TEST_F(BufferPoolTest, CleanPageStats) {
   global_reservations_.Close();
 }
 
+/// Test that the buffer pool respects the clean page limit with all pages in
+/// the same arena.
+TEST_F(BufferPoolTest, CleanPageLimitOneArena) {
+  TestCleanPageLimit(0, false);
+  TestCleanPageLimit(2, false);
+  TestCleanPageLimit(4, false);
+}
+
+/// Test that the buffer pool respects the clean page limit with pages spread across
+/// different arenas.
+TEST_F(BufferPoolTest, CleanPageLimitRandomArenas) {
+  TestCleanPageLimit(0, true);
+  TestCleanPageLimit(2, true);
+  TestCleanPageLimit(4, true);
+}
+
+void BufferPoolTest::TestCleanPageLimit(int max_clean_pages, bool randomize_core) {
+  const int MAX_NUM_BUFFERS = 4;
+  const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  const int max_clean_page_bytes = max_clean_pages * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, max_clean_page_bytes);
+
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+  if (!randomize_core) CpuTestUtil::PinToCore(0);
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages, randomize_core);
+  WriteData(pages, 0);
+
+  // Unpin pages and wait until they're written out and therefore clean.
+  UnpinAll(&pool, &client, &pages);
+  WaitForAllWrites(&client);
+  EXPECT_EQ(max_clean_pages, pool.GetNumCleanPages());
+  EXPECT_EQ(max_clean_page_bytes, pool.GetCleanPageBytes());
+
+  // Do an allocation to force a buffer to be reclaimed from somewhere.
+  ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
+  if (randomize_core) {
+    // We will either evict a clean page or reclaim a free buffer, depending on the
+    // arena that we pick.
+    EXPECT_LE(pool.GetNumCleanPages(), max_clean_pages);
+    EXPECT_LE(pool.GetCleanPageBytes(), max_clean_page_bytes);
+  } else {
+    // We will reclaim one of the free buffers in arena 0.
+    EXPECT_EQ(min(MAX_NUM_BUFFERS - 1, max_clean_pages), pool.GetNumCleanPages());
+    const int64_t expected_clean_page_bytes =
+        min<int64_t>((MAX_NUM_BUFFERS - 1) * TEST_BUFFER_LEN, max_clean_page_bytes);
+    EXPECT_EQ(expected_clean_page_bytes, pool.GetCleanPageBytes());
+  }
+
+  // Re-pin all the pages - none will be clean afterwards.
+  ASSERT_OK(PinAll(&pool, &client, &pages));
+  VerifyData(pages, 0);
+  EXPECT_EQ(0, pool.GetNumCleanPages());
+  EXPECT_EQ(0, pool.GetCleanPageBytes());
+
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+  global_reservations_.Close();
+}
+
 /// Test transfer of buffer handles between clients.
 TEST_F(BufferPoolTest, BufferTransfer) {
   // Each client needs to have enough reservation for a buffer.
   const int num_clients = 5;
   int64_t total_mem = num_clients * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   BufferPool::ClientHandle clients[num_clients];
   BufferPool::BufferHandle handles[num_clients];
   for (int i = 0; i < num_clients; ++i) {
@@ -691,7 +756,7 @@ TEST_F(BufferPoolTest, Pin) {
   int64_t total_mem = TEST_BUFFER_LEN * 1024;
   // Set up client with enough reservation to pin twice.
   int64_t child_reservation = TEST_BUFFER_LEN * 2;
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -743,7 +808,7 @@ TEST_F(BufferPoolTest, AsyncPin) {
   const int DATA_SEED = 1234;
   // Set up pool with enough reservation to keep two buffers in memory.
   const int64_t TOTAL_MEM = 2 * TEST_BUFFER_LEN;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -813,7 +878,7 @@ TEST_F(BufferPoolTest, AsyncPin) {
 /// Creating a page or pinning without sufficient reservation should DCHECK.
 TEST_F(BufferPoolTest, PinWithoutReservation) {
   int64_t total_mem = TEST_BUFFER_LEN * 1024;
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
@@ -838,7 +903,7 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
   int64_t total_mem = TEST_BUFFER_LEN * 1024;
   // Set up client with enough reservation for two buffers/pins.
   int64_t child_reservation = TEST_BUFFER_LEN * 2;
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -897,7 +962,7 @@ TEST_F(BufferPoolTest, ConcurrentPageCreation) {
   int total_mem = num_threads * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, total_mem);
 
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
   // Share a file group between the threads.
   TmpFileMgr::FileGroup* file_group = NewFileGroup();
 
@@ -938,7 +1003,7 @@ void BufferPoolTest::CreatePageLoop(BufferPool* pool, TmpFileMgr::FileGroup* fil
 /// Test that DCHECK fires when trying to unpin a page with spilling disabled.
 TEST_F(BufferPoolTest, SpillingDisabledDcheck) {
   global_reservations_.InitRootTracker(NULL, 2 * TEST_BUFFER_LEN);
-  BufferPool pool(TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN);
+  BufferPool pool(TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN, 2 * TEST_BUFFER_LEN);
   BufferPool::PageHandle handle;
 
   BufferPool::ClientHandle client;
@@ -961,7 +1026,7 @@ TEST_F(BufferPoolTest, SpillingDisabledDcheck) {
 /// Test simple case where pool must evict a page from the same client to fit another.
 TEST_F(BufferPoolTest, EvictPageSameClient) {
   global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN);
-  BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN);
+  BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN, TEST_BUFFER_LEN);
   BufferPool::PageHandle handle1, handle2;
 
   BufferPool::ClientHandle client;
@@ -987,7 +1052,7 @@ TEST_F(BufferPoolTest, EvictPageSameClient) {
 TEST_F(BufferPoolTest, EvictPageDifferentSizes) {
   const int64_t TOTAL_BYTES = 2 * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES);
   BufferPool::PageHandle handle1, handle2;
 
   BufferPool::ClientHandle client;
@@ -1017,7 +1082,7 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) {
   const int NUM_CLIENTS = 2;
   const int64_t TOTAL_BYTES = NUM_CLIENTS * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES);
 
   BufferPool::ClientHandle clients[NUM_CLIENTS];
   for (int i = 0; i < NUM_CLIENTS; ++i) {
@@ -1063,7 +1128,7 @@ TEST_F(BufferPoolTest, MultiplyPinnedPageAccounting) {
   const int NUM_BUFFERS = 3;
   const int64_t TOTAL_BYTES = NUM_BUFFERS * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES, TOTAL_BYTES);
 
   BufferPool::ClientHandle client;
   RuntimeProfile* profile = NewProfile();
@@ -1102,7 +1167,8 @@ const int64_t MEM_RECLAMATION_TOTAL_BYTES =
 // Test that we can reclaim buffers and pages from the same arena and from other arenas.
 TEST_F(BufferPoolTest, MemoryReclamation) {
   global_reservations_.InitRootTracker(NULL, MEM_RECLAMATION_TOTAL_BYTES);
-  BufferPool pool(TEST_BUFFER_LEN, MEM_RECLAMATION_TOTAL_BYTES);
+  BufferPool pool(TEST_BUFFER_LEN, MEM_RECLAMATION_TOTAL_BYTES,
+      MEM_RECLAMATION_TOTAL_BYTES);
   // Assume that all cores are online. Test various combinations of cores to validate
   // that it can reclaim from any other other core.
   for (int src = 0; src < CpuInfo::num_cores(); ++src) {
@@ -1202,7 +1268,7 @@ void BufferPoolTest::TestEvictionPolicy(int64_t page_size) {
   const int MAX_NUM_BUFFERS = 5;
   int64_t total_mem = MAX_NUM_BUFFERS * page_size;
   global_reservations_.InitRootTracker(NewProfile(), total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
 
   ClientHandle client;
   RuntimeProfile* profile = NewProfile();
@@ -1274,7 +1340,7 @@ TEST_F(BufferPoolTest, DestroyDuringWrite) {
   const int TRIALS = 20;
   const int MAX_NUM_BUFFERS = 5;
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   ClientHandle client;
   for (int trial = 0; trial < TRIALS; ++trial) {
@@ -1366,7 +1432,7 @@ TEST_F(BufferPoolTest, QueryTeardownWriteError) {
 void BufferPoolTest::TestWriteError(int write_delay_ms) {
   int MAX_NUM_BUFFERS = 2;
   int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -1422,7 +1488,7 @@ TEST_F(BufferPoolTest, WriteErrorWriteDelay) {
 TEST_F(BufferPoolTest, TmpFileAllocateError) {
   const int MAX_NUM_BUFFERS = 2;
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * MAX_NUM_BUFFERS;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -1462,7 +1528,7 @@ TEST_F(BufferPoolTest, WriteErrorBlacklist) {
   const int PAGES_PER_QUERY = MAX_NUM_PAGES / TOTAL_QUERIES;
   const int64_t TOTAL_MEM = MAX_NUM_PAGES * TEST_BUFFER_LEN;
   const int64_t MEM_PER_QUERY = PAGES_PER_QUERY * TEST_BUFFER_LEN;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   vector<FileGroup*> file_groups;
   vector<ClientHandle> clients(TOTAL_QUERIES);
@@ -1575,7 +1641,7 @@ TEST_F(BufferPoolTest, WriteErrorBlacklist) {
 TEST_F(BufferPoolTest, ScratchReadError) {
   // Only allow one buffer in memory.
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
 
   // Simulate different types of error.
@@ -1647,7 +1713,7 @@ TEST_F(BufferPoolTest, NoDirsAllocationError) {
   vector<string> tmp_dirs = InitMultipleTmpDirs(2);
   int MAX_NUM_BUFFERS = 2;
   int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -1679,7 +1745,7 @@ TEST_F(BufferPoolTest, NoTmpDirs) {
   InitMultipleTmpDirs(0);
   const int MAX_NUM_BUFFERS = 3;
   const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
@@ -1767,7 +1833,7 @@ void BufferPoolTest::TestRandomInternalSingle(
     int64_t min_buffer_len, bool multiple_pins) {
   const int MAX_NUM_BUFFERS = 200;
   const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * min_buffer_len;
-  BufferPool pool(min_buffer_len, TOTAL_MEM);
+  BufferPool pool(min_buffer_len, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   MemTracker global_tracker(TOTAL_MEM);
   TestRandomInternalImpl(
@@ -1780,7 +1846,7 @@ void BufferPoolTest::TestRandomInternalMulti(
     int num_threads, int64_t min_buffer_len, bool multiple_pins) {
   const int MAX_NUM_BUFFERS_PER_THREAD = 200;
   const int64_t TOTAL_MEM = num_threads * MAX_NUM_BUFFERS_PER_THREAD * min_buffer_len;
-  BufferPool pool(min_buffer_len, TOTAL_MEM);
+  BufferPool pool(min_buffer_len, TOTAL_MEM, TOTAL_MEM);
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
   MemTracker global_tracker(TOTAL_MEM);
   FileGroup* shared_file_group = NewFileGroup();
@@ -1910,7 +1976,7 @@ void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, FileGroup* file_gr
 TEST_F(BufferPoolTest, SubReservation) {
   const int64_t TOTAL_MEM = TEST_BUFFER_LEN * 10;
   global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
       TOTAL_MEM, NewProfile(), &client));
@@ -1944,7 +2010,7 @@ TEST_F(BufferPoolTest, DecreaseReservation) {
   const int MAX_NUM_BUFFERS = 4;
   const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
-  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
 
   ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 2e9ba3d..6a111ff 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -104,8 +104,10 @@ Status BufferPool::PageHandle::GetBuffer(const BufferHandle** buffer) const {
   return Status::OK();
 }
 
-BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
-  : allocator_(new BufferAllocator(this, min_buffer_len, buffer_bytes_limit)),
+BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit,
+      int64_t clean_page_bytes_limit)
+  : allocator_(new BufferAllocator(
+        this, min_buffer_len, buffer_bytes_limit, clean_page_bytes_limit)),
     min_buffer_len_(min_buffer_len) {
   DCHECK_GT(min_buffer_len, 0);
   DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
@@ -272,6 +274,10 @@ int64_t BufferPool::GetSystemBytesAllocated() const {
   return allocator_->GetSystemBytesAllocated();
 }
 
+int64_t BufferPool::GetCleanPageBytesLimit() const {
+  return allocator_->GetCleanPageBytesLimit();
+}
+
 int64_t BufferPool::GetNumCleanPages() const {
   return allocator_->GetNumCleanPages();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 7b11551..3d6145c 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -158,7 +158,10 @@ class BufferPool : public CacheLineAligned {
   /// 'buffer_bytes_limit': the maximum physical memory in bytes that can be used by the
   ///     buffer pool. If 'buffer_bytes_limit' is not a multiple of 'min_buffer_len', the
   ///     remainder will not be usable.
-  BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit);
+  /// 'clean_page_bytes_limit': the maximum bytes of clean pages that will be retained by the
+  ///     buffer pool.
+  BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit,
+      int64_t clean_page_bytes_limit);
   ~BufferPool();
 
   /// Register a client. Returns an error status and does not register the client if the
@@ -264,6 +267,9 @@ class BufferPool : public CacheLineAligned {
   int64_t GetSystemBytesLimit() const;
   int64_t GetSystemBytesAllocated() const;
 
+  /// Return the limit on bytes of clean pages in the pool.
+  int64_t GetCleanPageBytesLimit() const;
+
   /// Return the total number of clean pages in the pool.
   int64_t GetNumCleanPages() const;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/bufferpool/suballocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc
index ea515fe..32acfaf 100644
--- a/be/src/runtime/bufferpool/suballocator-test.cc
+++ b/be/src/runtime/bufferpool/suballocator-test.cc
@@ -68,7 +68,7 @@ class SuballocatorTest : public ::testing::Test {
   /// bytes of buffers of minimum length 'min_buffer_len'.
   void InitPool(int64_t min_buffer_len, int total_mem) {
     global_reservation_.InitRootTracker(nullptr, total_mem);
-    buffer_pool_.reset(new BufferPool(min_buffer_len, total_mem));
+    buffer_pool_.reset(new BufferPool(min_buffer_len, total_mem, 0));
   }
 
   /// Register a client with 'buffer_pool_'. The client is automatically deregistered

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 7f62c96..0b96e2a 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -86,6 +86,7 @@ DECLARE_int32(num_cores);
 DECLARE_int32(be_port);
 DECLARE_string(mem_limit);
 DECLARE_string(buffer_pool_limit);
+DECLARE_string(buffer_pool_clean_pages_limit);
 DECLARE_int64(min_buffer_size);
 DECLARE_bool(is_coordinator);
 DECLARE_int32(webserver_port);
@@ -263,7 +264,14 @@ Status ExecEnv::StartServices() {
           "positive bytes value: $0", FLAGS_buffer_pool_limit));
   }
   buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, FLAGS_min_buffer_size);
-  InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit);
+
+  int64_t clean_pages_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_clean_pages_limit,
+      &is_percent, buffer_pool_limit);
+  if (clean_pages_limit <= 0) {
+    return Status(Substitute("Invalid --buffer_pool_clean_pages_limit value, must be a percentage or "
+          "positive bytes value: $0", FLAGS_buffer_pool_clean_pages_limit));
+  }
+  InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit);
 
   RETURN_IF_ERROR(metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr));
   impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
@@ -344,8 +352,9 @@ Status ExecEnv::StartServices() {
   return Status::OK();
 }
 
-void ExecEnv::InitBufferPool(int64_t min_buffer_size, int64_t capacity) {
-  buffer_pool_.reset(new BufferPool(min_buffer_size, capacity));
+void ExecEnv::InitBufferPool(int64_t min_buffer_size, int64_t capacity,
+    int64_t clean_pages_limit) {
+  buffer_pool_.reset(new BufferPool(min_buffer_size, capacity, clean_pages_limit));
   buffer_reservation_.reset(new ReservationTracker());
   buffer_reservation_->InitRootTracker(nullptr, capacity);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 5d7a3d0..9e925a1 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -206,7 +206,7 @@ class ExecEnv {
   KuduClientMap kudu_client_map_;
 
   /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity.
-  void InitBufferPool(int64_t min_page_len, int64_t capacity);
+  void InitBufferPool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit);
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 23dfa4c..69bbc50 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -58,7 +58,8 @@ Status TestEnv::Init() {
   } else {
     RETURN_IF_ERROR(tmp_file_mgr()->Init(metrics()));
   }
-  exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_);
+  exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_,
+      static_cast<int64_t>(0.1 * buffer_pool_capacity_));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/util/memory-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.cc b/be/src/util/memory-metrics.cc
index f2ddcb9..7d0d065 100644
--- a/be/src/util/memory-metrics.cc
+++ b/be/src/util/memory-metrics.cc
@@ -54,6 +54,7 @@ BufferPoolMetric* BufferPoolMetric::SYSTEM_ALLOCATED = nullptr;
 BufferPoolMetric* BufferPoolMetric::RESERVED = nullptr;
 BufferPoolMetric* BufferPoolMetric::NUM_FREE_BUFFERS = nullptr;
 BufferPoolMetric* BufferPoolMetric::FREE_BUFFER_BYTES = nullptr;
+BufferPoolMetric* BufferPoolMetric::CLEAN_PAGES_LIMIT = nullptr;
 BufferPoolMetric* BufferPoolMetric::NUM_CLEAN_PAGES = nullptr;
 BufferPoolMetric* BufferPoolMetric::CLEAN_PAGE_BYTES = nullptr;
 
@@ -238,6 +239,9 @@ Status BufferPoolMetric::InitMetrics(MetricGroup* metrics,
   FREE_BUFFER_BYTES = metrics->RegisterMetric(
       new BufferPoolMetric(MetricDefs::Get("buffer-pool.free-buffer-bytes"),
           BufferPoolMetricType::FREE_BUFFER_BYTES, global_reservations, buffer_pool));
+  CLEAN_PAGES_LIMIT = metrics->RegisterMetric(
+      new BufferPoolMetric(MetricDefs::Get("buffer-pool.clean-pages-limit"),
+          BufferPoolMetricType::CLEAN_PAGES_LIMIT, global_reservations, buffer_pool));
   NUM_CLEAN_PAGES = metrics->RegisterMetric(
       new BufferPoolMetric(MetricDefs::Get("buffer-pool.clean-pages"),
           BufferPoolMetricType::NUM_CLEAN_PAGES, global_reservations, buffer_pool));
@@ -271,6 +275,9 @@ void BufferPoolMetric::CalculateValue() {
     case BufferPoolMetricType::FREE_BUFFER_BYTES:
       value_ = buffer_pool_->GetFreeBufferBytes();
       break;
+    case BufferPoolMetricType::CLEAN_PAGES_LIMIT:
+      value_ = buffer_pool_->GetCleanPageBytesLimit();
+      break;
     case BufferPoolMetricType::NUM_CLEAN_PAGES:
       value_ = buffer_pool_->GetNumCleanPages();
       break;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 6f306f7..4f65933 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -201,6 +201,7 @@ class BufferPoolMetric : public UIntGauge {
   static BufferPoolMetric* RESERVED;
   static BufferPoolMetric* NUM_FREE_BUFFERS;
   static BufferPoolMetric* FREE_BUFFER_BYTES;
+  static BufferPoolMetric* CLEAN_PAGES_LIMIT;
   static BufferPoolMetric* NUM_CLEAN_PAGES;
   static BufferPoolMetric* CLEAN_PAGE_BYTES;
 
@@ -218,6 +219,7 @@ class BufferPoolMetric : public UIntGauge {
     RESERVED,
     NUM_FREE_BUFFERS, // Total number of free buffers in BufferPool.
     FREE_BUFFER_BYTES, // Total bytes of free buffers in BufferPool.
+    CLEAN_PAGES_LIMIT, // Limit on number of clean pages in BufferPool.
     NUM_CLEAN_PAGES, // Total number of clean pages in BufferPool.
     CLEAN_PAGE_BYTES, // Total bytes of clean pages in BufferPool.
   };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/039255a6/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 2fbb6fc..6c4f07d 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1176,6 +1176,16 @@
     "key": "buffer-pool.free-buffer-bytes"
   },
   {
+    "description": "Limit on number of clean pages cached in the buffer pool.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Buffer Pool Clean Pages Limit.",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "buffer-pool.clean-pages-limit"
+  },
+  {
     "description": "Total number of clean pages cached in the buffer pool.",
     "contexts": [
       "IMPALAD"


[5/9] incubator-impala git commit: IMPALA-5681: release reservation from blocking operators

Posted by ta...@apache.org.
IMPALA-5681: release reservation from blocking operators

When an in-memory blocking aggregation or join is in the GetNext()
phase where it is outputting accumulated rows then we expect
memory consumption to monotonically decrease because no more
rows will be accumulated in memory.

This change adds support to release unused reservation and makes
use of it for in-memory aggregations and sorts.

We don't release memory for operators with spilled data, since they
may need the reservation to bring it back into memory. We also
don't release memory in subplans, since it will probably be used
in a later iteration of the subplan.

Testing:
Updated spilling test that now requires less memory.

Ran stress test binary search on tpch_parquet. No changes, except
Q18 now requires 325MB instead of 450MB to execute without spilling.

Ran query with two sorts in the same pipeline and watched /memz to
confirm that the first node in the pipeline was incrementally releasing
memory. Added a regression test based on this experiment.

Added a backend test to directly test reservation decreasing.

Change-Id: I6f4d0ad127d5fcd14b9821a7c127eec11d98692f
Reviewed-on: http://gerrit.cloudera.org:8080/7619
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/8609b09a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/8609b09a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/8609b09a

Branch: refs/heads/master
Commit: 8609b09a95f2b20e42e4ba4aa8b5d605fb545a4e
Parents: 5fcc9cb
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Aug 7 09:15:29 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Aug 17 20:17:48 2017 +0000

----------------------------------------------------------------------
 be/src/exec/exec-node.cc                        |  4 ++
 be/src/exec/exec-node.h                         | 11 +++-
 be/src/exec/partitioned-aggregation-node.cc     | 10 ++++
 be/src/exec/sort-node.cc                        | 14 +++++
 be/src/exec/sort-node.h                         |  4 ++
 .../runtime/bufferpool/buffer-pool-internal.h   | 14 ++++-
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 62 ++++++++++++++++++--
 be/src/runtime/bufferpool/buffer-pool.cc        | 21 +++++++
 be/src/runtime/bufferpool/buffer-pool.h         | 10 ++++
 be/src/runtime/sorter.cc                        |  7 +++
 be/src/runtime/sorter.h                         |  5 +-
 .../queries/QueryTest/spilling-aggs.test        |  2 +-
 .../tpch/queries/sort-reservation-usage.test    | 30 ++++++++++
 tests/query_test/test_sort.py                   |  4 ++
 14 files changed, 187 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index c3a5f80..afd7262 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -247,6 +247,10 @@ Status ExecNode::ClaimBufferReservation(RuntimeState* state) {
   return Status::OK();
 }
 
+Status ExecNode::ReleaseUnusedReservation() {
+  return buffer_pool_client_.DecreaseReservationTo(resource_profile_.min_reservation);
+}
+
 Status ExecNode::CreateTree(
     RuntimeState* state, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) {
   if (plan.nodes.size() == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 55c51ab..04470f2 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -233,7 +233,16 @@ class ExecNode {
   /// ExecNode. Only needs to be called by ExecNodes that will use the client.
   /// The client is automatically cleaned up in Close(). Should not be called if
   /// the client is already open.
-  Status ClaimBufferReservation(RuntimeState* state);
+  ///
+  /// The ExecNode must return the initial reservation to
+  /// QueryState::initial_reservations(), which is done automatically in Close() as long
+  /// as the initial reservation is not released before Close().
+  Status ClaimBufferReservation(RuntimeState* state) WARN_UNUSED_RESULT;
+
+  /// Release any unused reservation in excess of the node's initial reservation. Returns
+  /// an error if releasing the reservation requires flushing pages to disk, and that
+  /// fails.
+  Status ReleaseUnusedReservation() WARN_UNUSED_RESULT;
 
   /// Extends blocking queue for row batches. Row batches have a property that
   /// they must be processed in the order they were produced, even in cancellation

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 8432bcc..3949041 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -1198,6 +1198,16 @@ int64_t PartitionedAggregationNode::LargestSpilledPartition() const {
 Status PartitionedAggregationNode::NextPartition() {
   DCHECK(output_partition_ == nullptr);
 
+  if (!IsInSubplan() && spilled_partitions_.empty()) {
+    // All partitions are in memory. Release reservation that was used for previous
+    // partitions that is no longer needed. If we have spilled partitions, we want to
+    // hold onto all reservation in case it is needed to process the spilled partitions.
+    DCHECK(!buffer_pool_client_.has_unpinned_pages());
+    Status status = ReleaseUnusedReservation();
+    DCHECK(status.ok()) << "Should not fail - all partitions are in memory so there are "
+                        << "no unpinned pages. " << status.GetDetail();
+  }
+
   // Keep looping until we get to a partition that fits in memory.
   Partition* partition = nullptr;
   while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 440f809..80df214 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -103,6 +103,19 @@ Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     *eos = false;
   }
 
+  if (returned_buffer_) {
+    // If the Sorter returned a buffer on the last call to GetNext(), we might have an
+    // opportunity to release memory. Release reservation, unless it might be needed
+    // for the next subplan iteration or merging spilled runs.
+    returned_buffer_ = false;
+    if (!IsInSubplan() && !sorter_->HasSpilledRuns()) {
+      DCHECK(!buffer_pool_client_.has_unpinned_pages());
+      Status status = ReleaseUnusedReservation();
+      DCHECK(status.ok()) << "Should not fail - no runs were spilled so no pages are "
+                          << "unpinned. " << status.GetDetail();
+    }
+  }
+
   DCHECK_EQ(row_batch->num_rows(), 0);
   RETURN_IF_ERROR(sorter_->GetNext(row_batch, eos));
   while ((num_rows_skipped_ < offset_)) {
@@ -119,6 +132,7 @@ Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     RETURN_IF_ERROR(sorter_->GetNext(row_batch, eos));
   }
 
+  returned_buffer_ = row_batch->num_buffers() > 0;
   num_rows_returned_ += row_batch->num_rows();
   if (ReachedLimit()) {
     row_batch->set_num_rows(row_batch->num_rows() - (num_rows_returned_ - limit_));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/exec/sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index a11d424..d6eef25 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -69,6 +69,10 @@ class SortNode : public ExecNode {
   std::vector<bool> is_asc_order_;
   std::vector<bool> nulls_first_;
 
+  /// Whether the previous call to GetNext() returned a buffer attached to the RowBatch.
+  /// Used to avoid unnecessary calls to ReleaseUnusedReservation().
+  bool returned_buffer_ = false;
+
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 0c0408b..7094942 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -227,7 +227,7 @@ class BufferPool::Client {
   /// already in memory, ensures the data is in the page's buffer. If the data is on
   /// disk, starts an async read of the data and sets 'pin_in_flight' on the page to
   /// true. Neither the client's lock nor page->buffer_lock should be held by the caller.
-  Status StartMoveToPinned(ClientHandle* client, Page* page);
+  Status StartMoveToPinned(ClientHandle* client, Page* page) WARN_UNUSED_RESULT;
 
   /// Moves a page that has a pin in flight back to the evicted state, undoing
   /// StartMoveToPinned(). Neither the client's lock nor page->buffer_lock should be held
@@ -236,13 +236,16 @@ class BufferPool::Client {
 
   /// Finish the work of bring the data of an evicted page to memory if
   /// page->pin_in_flight was set to true by StartMoveToPinned().
-  Status FinishMoveEvictedToPinned(Page* page);
+  Status FinishMoveEvictedToPinned(Page* page) WARN_UNUSED_RESULT;
 
   /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer()
   /// API to deduct from the client's reservation and update internal accounting. Cleans
   /// dirty pages if needed to satisfy the buffer pool's internal invariants. No page or
   /// client locks should be held by the caller.
-  Status PrepareToAllocateBuffer(int64_t len);
+  Status PrepareToAllocateBuffer(int64_t len) WARN_UNUSED_RESULT;
+
+  /// Implementation of ClientHandle::DecreaseReservationTo().
+  Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT;
 
   /// Called after a buffer of 'len' is freed via the FreeBuffer() API to update
   /// internal accounting and release the buffer to the client's reservation. No page or
@@ -272,6 +275,11 @@ class BufferPool::Client {
   const BufferPoolClientCounters& counters() const { return counters_; }
   bool spilling_enabled() const { return file_group_ != NULL; }
   void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; }
+  bool has_unpinned_pages() const {
+    // Safe to read without lock since other threads should not be calling BufferPool
+    // functions that create, destroy or unpin pages.
+    return pinned_pages_.size() < num_pages_;
+  }
 
   std::string DebugString();
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 2eff955..b2f8695 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -157,6 +157,14 @@ class BufferPoolTest : public ::testing::Test {
     return !page->page_->buffer.is_open();
   }
 
+  int NumEvicted(vector<BufferPool::PageHandle>& pages) {
+    int num_evicted = 0;
+    for (PageHandle& page : pages) {
+      if (IsEvicted(&page)) ++num_evicted;
+    }
+    return num_evicted;
+  }
+
   /// Allocate buffers of varying sizes at most 'max_buffer_size' that add up to
   /// 'total_bytes'. Both numbers must be a multiple of the minimum buffer size.
   /// If 'randomize_core' is true, will switch thread between cores randomly before
@@ -606,6 +614,7 @@ TEST_F(BufferPoolTest, CleanPageStats) {
   vector<PageHandle> pages;
   CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
   WriteData(pages, 0);
+  EXPECT_FALSE(client.has_unpinned_pages());
 
   // Pages don't start off clean.
   EXPECT_EQ(0, pool.GetNumCleanPages());
@@ -613,22 +622,27 @@ TEST_F(BufferPoolTest, CleanPageStats) {
 
   // Unpin pages and wait until they're written out and therefore clean.
   UnpinAll(&pool, &client, &pages);
+  EXPECT_TRUE(client.has_unpinned_pages());
   WaitForAllWrites(&client);
   EXPECT_EQ(MAX_NUM_BUFFERS, pool.GetNumCleanPages());
   EXPECT_EQ(TOTAL_MEM, pool.GetCleanPageBytes());
+  EXPECT_TRUE(client.has_unpinned_pages());
 
   // Do an allocation to force eviction of one page.
   ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
   EXPECT_EQ(MAX_NUM_BUFFERS - 1, pool.GetNumCleanPages());
   EXPECT_EQ(TOTAL_MEM - TEST_BUFFER_LEN, pool.GetCleanPageBytes());
+  EXPECT_TRUE(client.has_unpinned_pages());
 
   // Re-pin all the pages - none will be clean afterwards.
   ASSERT_OK(PinAll(&pool, &client, &pages));
   VerifyData(pages, 0);
   EXPECT_EQ(0, pool.GetNumCleanPages());
   EXPECT_EQ(0, pool.GetCleanPageBytes());
+  EXPECT_FALSE(client.has_unpinned_pages());
 
   DestroyAll(&pool, &client, &pages);
+  EXPECT_FALSE(client.has_unpinned_pages());
   pool.DeregisterClient(&client);
   global_reservations_.Close();
 }
@@ -1242,11 +1256,7 @@ void BufferPoolTest::TestEvictionPolicy(int64_t page_size) {
   // No additional memory should have been allocated - it should have been recycled.
   EXPECT_EQ(total_mem, pool.GetSystemBytesAllocated());
   // Check that two pages were evicted.
-  int num_evicted = 0;
-  for (PageHandle& page : pages) {
-    if (IsEvicted(&page)) ++num_evicted;
-  }
-  EXPECT_EQ(NUM_EXTRA_BUFFERS, num_evicted);
+  EXPECT_EQ(NUM_EXTRA_BUFFERS, NumEvicted(pages));
 
   // Free up memory required to pin the original pages again.
   FreeBuffers(&pool, &client, &extra_buffers);
@@ -1928,6 +1938,48 @@ TEST_F(BufferPoolTest, SubReservation) {
   subreservation.Close();
   pool.DeregisterClient(&client);
 }
+
+// Check that we can decrease reservation without violating any buffer pool invariants.
+TEST_F(BufferPoolTest, DecreaseReservation) {
+  const int MAX_NUM_BUFFERS = 4;
+  const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+
+  ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      nullptr, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM));
+
+  vector<PageHandle> pages;
+  CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages);
+  WriteData(pages, 0);
+
+  // Unpin pages and decrease reservation while the writes are in flight.
+  UnpinAll(&pool, &client, &pages);
+  ASSERT_OK(client.DecreaseReservationTo(2 * TEST_BUFFER_LEN));
+  // Two pages must be clean to stay within reservation
+  EXPECT_GE(pool.GetNumCleanPages(), 2);
+  EXPECT_EQ(2 * TEST_BUFFER_LEN, client.GetReservation());
+
+  // Decrease it further after the pages are evicted.
+  WaitForAllWrites(&client);
+  ASSERT_OK(client.DecreaseReservationTo(TEST_BUFFER_LEN));
+  EXPECT_GE(pool.GetNumCleanPages(), 3);
+  EXPECT_EQ(TEST_BUFFER_LEN, client.GetReservation());
+
+  // Check that we can still use the reservation.
+  ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN));
+  EXPECT_EQ(1, NumEvicted(pages));
+
+  // Check that we can decrease it to zero.
+  ASSERT_OK(client.DecreaseReservationTo(0));
+  EXPECT_EQ(0, client.GetReservation());
+
+  DestroyAll(&pool, &client, &pages);
+  pool.DeregisterClient(&client);
+  global_reservations_.Close();
+}
 }
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index df92928..2e9ba3d 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -296,6 +296,10 @@ bool BufferPool::ClientHandle::IncreaseReservationToFit(int64_t bytes) {
   return impl_->reservation()->IncreaseReservationToFit(bytes);
 }
 
+Status BufferPool::ClientHandle::DecreaseReservationTo(int64_t target_bytes) {
+  return impl_->DecreaseReservationTo(target_bytes);
+}
+
 int64_t BufferPool::ClientHandle::GetReservation() const {
   return impl_->reservation()->GetReservation();
 }
@@ -334,6 +338,10 @@ void BufferPool::ClientHandle::SetDebugDenyIncreaseReservation(double probabilit
   impl_->reservation()->SetDebugDenyIncreaseReservation(probability);
 }
 
+bool BufferPool::ClientHandle::has_unpinned_pages() const {
+  return impl_->has_unpinned_pages();
+}
+
 BufferPool::SubReservation::SubReservation(ClientHandle* client) {
   tracker_.reset(new ReservationTracker);
   tracker_->InitChildTracker(
@@ -543,6 +551,19 @@ Status BufferPool::Client::PrepareToAllocateBuffer(int64_t len) {
   return Status::OK();
 }
 
+Status BufferPool::Client::DecreaseReservationTo(int64_t target_bytes) {
+  unique_lock<mutex> lock(lock_);
+  int64_t current_reservation = reservation_.GetReservation();
+  DCHECK_GE(current_reservation, target_bytes);
+  int64_t amount_to_free =
+      min(reservation_.GetUnusedReservation(), current_reservation - target_bytes);
+  if (amount_to_free == 0) return Status::OK();
+  // Clean enough pages to allow us to safely release reservation.
+  RETURN_IF_ERROR(CleanPages(&lock, amount_to_free));
+  reservation_.DecreaseReservation(amount_to_free);
+  return Status::OK();
+}
+
 Status BufferPool::Client::CleanPages(unique_lock<mutex>* client_lock, int64_t len) {
   DCheckHoldsLock(*client_lock);
   DCHECK_CONSISTENCY();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 4798d6c..7b11551 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -324,6 +324,13 @@ class BufferPool::ClientHandle {
   /// if successful, after which 'bytes' can be used.
   bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT;
 
+  /// Try to decrease this client's reservation down to a minimum of 'target_bytes' by
+  /// releasing unused reservation to ancestor ReservationTrackers, all the way up to
+  /// the root of the ReservationTracker tree. This client's reservation must be at least
+  /// 'target_bytes' before calling this method. May fail if decreasing the reservation
+  /// requires flushing unpinned pages to disk and a write to disk fails.
+  Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT;
+
   /// Move some of this client's reservation to the SubReservation. 'bytes' of unused
   /// reservation must be available in this tracker.
   void SaveReservation(SubReservation* dst, int64_t bytes);
@@ -351,6 +358,9 @@ class BufferPool::ClientHandle {
 
   bool is_registered() const { return impl_ != NULL; }
 
+  /// Return true if there are any unpinned pages for this client.
+  bool has_unpinned_pages() const;
+
   std::string DebugString() const;
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index ee0e4be..de05945 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -1760,4 +1760,11 @@ Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) {
   RETURN_IF_ERROR(merged_run->FinalizeInput());
   return Status::OK();
 }
+
+bool Sorter::HasSpilledRuns() const {
+  // All runs in 'merging_runs_' are spilled. 'sorted_runs_' can contain at most one
+  // non-spilled run.
+  return !merging_runs_.empty() || sorted_runs_.size() > 1 ||
+      (sorted_runs_.size() == 1 && !sorted_runs_.back()->is_pinned());
+}
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/be/src/runtime/sorter.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index cafab72..5e7240b 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -152,6 +152,9 @@ class Sorter {
   /// sort with the current sorter.
   int64_t ComputeMinReservation();
 
+  /// Return true if the sorter has any spilled runs.
+  bool HasSpilledRuns() const;
+
  private:
   class Page;
   class Run;
@@ -239,7 +242,7 @@ class Sorter {
   /// memory.
   boost::scoped_ptr<SortedRunMerger> merger_;
 
-  /// Runs that are currently processed by the merge_.
+  /// Spilled runs that are currently processed by the merge_.
   /// These runs can be deleted when we are done with the current merge.
   std::deque<Run*> merging_runs_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
index 6fe86c3..d9f60cc 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test
@@ -125,7 +125,7 @@ row_regex: .*RowsPassedThrough: .* \([1-9][0-9]*\)
 ====
 ---- QUERY
 # Test aggregation spill with group_concat distinct
-set buffer_pool_limit=50m;
+set buffer_pool_limit=30m;
 select l_orderkey, count(*), group_concat(distinct l_linestatus, '|')
 from lineitem
 group by 1

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/testdata/workloads/tpch/queries/sort-reservation-usage.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/tpch/queries/sort-reservation-usage.test b/testdata/workloads/tpch/queries/sort-reservation-usage.test
new file mode 100644
index 0000000..92f180d
--- /dev/null
+++ b/testdata/workloads/tpch/queries/sort-reservation-usage.test
@@ -0,0 +1,30 @@
+====
+---- QUERY
+# Test that in-mem sorts incrementally give up memory when emitting output.
+# This query and the limit is calibrated to fail if the first sort does not
+# give up memory to the second sort.
+set num_nodes=1;
+set scratch_limit=0;
+set buffer_pool_limit=15m;
+set default_spillable_buffer_size=64kb;
+SELECT *
+FROM   (SELECT
+        Rank() OVER(ORDER BY  l_orderkey) AS rank,
+        Rank() OVER(ORDER BY  l_partkey) AS rank2
+        FROM lineitem
+        WHERE l_shipdate < '1992-05-09') a
+WHERE rank < 10
+ORDER BY rank;
+---- RESULTS
+1,118035
+2,55836
+2,141809
+2,155407
+5,84064
+5,129763
+7,10725
+7,31340
+7,155173
+---- TYPES
+BIGINT,BIGINT
+====

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/8609b09a/tests/query_test/test_sort.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index df95ddd..0eae035 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -159,6 +159,10 @@ class TestQueryFullSort(ImpalaTestSuite):
       query, exec_option, table_format=table_format).data)
     assert(result[0] == sorted(result[0]))
 
+  def test_sort_reservation_usage(self, vector):
+    """Tests for sorter reservation usage."""
+    self.run_test_case('sort-reservation-usage', vector)
+
 class TestRandomSort(ImpalaTestSuite):
   @classmethod
   def get_workload(self):


[4/9] incubator-impala git commit: IMPALA-5796: CTAS for Kudu fails with expr rewrite

Posted by ta...@apache.org.
IMPALA-5796: CTAS for Kudu fails with expr rewrite

When an expr rewrite occurs, we reanalyze the statement. Some state
that is set in TableDef::analyze() wasn't being reset() first, causing
a failure during reanalysis.

This patch adds TableDef::reset(), which clears the TableDef state
that is set during analyze().

Testing:
- Added a regression test in AnalyzeDDLTest

Change-Id: Ia67bb33736b5a843663b226cdd0fa5bd839cbea1
Reviewed-on: http://gerrit.cloudera.org:8080/7666
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5fcc9cb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5fcc9cb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5fcc9cb4

Branch: refs/heads/master
Commit: 5fcc9cb4cc2a9848c7d8ba677494588d8014fa92
Parents: 86e88ca
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Mon Aug 14 11:39:03 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Aug 17 20:01:27 2017 +0000

----------------------------------------------------------------------
 .../impala/analysis/CreateTableAsSelectStmt.java  |  4 ++--
 .../apache/impala/analysis/CreateTableStmt.java   | 12 ++++++++++++
 .../java/org/apache/impala/analysis/TableDef.java | 18 +++++++++++++++---
 .../apache/impala/analysis/AnalyzeDDLTest.java    |  3 +++
 4 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5fcc9cb4/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index eb492c6..516a06e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -49,8 +49,6 @@ import com.google.common.collect.Lists;
  * select statement.
  */
 public class CreateTableAsSelectStmt extends StatementBase {
-  private final CreateTableStmt createStmt_;
-
   // List of partition columns from the PARTITIONED BY (...) clause. Set to null if no
   // partition was given.
   private final List<String> partitionKeys_;
@@ -58,6 +56,7 @@ public class CreateTableAsSelectStmt extends StatementBase {
   /////////////////////////////////////////
   // BEGIN: Members that need to be reset()
 
+  private final CreateTableStmt createStmt_;
   private final InsertStmt insertStmt_;
 
   // END: Members that need to be reset()
@@ -234,6 +233,7 @@ public class CreateTableAsSelectStmt extends StatementBase {
   @Override
   public void reset() {
     super.reset();
+    createStmt_.reset();
     insertStmt_.reset();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5fcc9cb4/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
index 6169997..5810a40 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableStmt.java
@@ -51,9 +51,15 @@ public class CreateTableStmt extends StatementBase {
   final static String KUDU_STORAGE_HANDLER_ERROR_MESSAGE = "Kudu tables must be"
       + " specified using 'STORED AS KUDU'.";
 
+  /////////////////////////////////////////
+  // BEGIN: Members that need to be reset()
+
   // Table parameters specified in a CREATE TABLE statement
   private final TableDef tableDef_;
 
+  // END: Members that need to be reset()
+  /////////////////////////////////////////
+
   // Table owner. Set during analysis
   private String owner_;
 
@@ -71,6 +77,12 @@ public class CreateTableStmt extends StatementBase {
   }
 
   @Override
+  public void reset() {
+    super.reset();
+    tableDef_.reset();
+  }
+
+  @Override
   public CreateTableStmt clone() { return new CreateTableStmt(this); }
 
   public String getTbl() { return getTblName().getTbl(); }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5fcc9cb4/fe/src/main/java/org/apache/impala/analysis/TableDef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index bd07af8..f4901f9 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -71,9 +71,6 @@ class TableDef {
   // mean no primary keys were specified as the columnDefs_ could contain primary keys.
   private final List<String> primaryKeyColNames_ = Lists.newArrayList();
 
-  // Authoritative list of primary key column definitions populated during analysis.
-  private final List<ColumnDef> primaryKeyColDefs_ = Lists.newArrayList();
-
   // If true, the table's data will be preserved if dropped.
   private final boolean isExternal_;
 
@@ -83,9 +80,18 @@ class TableDef {
   // Partitioning parameters.
   private final TableDataLayout dataLayout_;
 
+  /////////////////////////////////////////
+  // BEGIN: Members that need to be reset()
+
+  // Authoritative list of primary key column definitions populated during analysis.
+  private final List<ColumnDef> primaryKeyColDefs_ = Lists.newArrayList();
+
   // True if analyze() has been called.
   private boolean isAnalyzed_ = false;
 
+  // END: Members that need to be reset()
+  /////////////////////////////////////////
+
   /**
    * Set of table options. These options are grouped together for convenience while
    * parsing CREATE TABLE statements. They are typically found at the end of CREATE
@@ -150,6 +156,11 @@ class TableDef {
     dataLayout_ = TableDataLayout.createEmptyLayout();
   }
 
+  public void reset() {
+    primaryKeyColDefs_.clear();
+    isAnalyzed_ = false;
+  }
+
   public TableName getTblName() {
     return fqTableName_ != null ? fqTableName_ : tableName_;
   }
@@ -191,6 +202,7 @@ class TableDef {
    * Analyzes the parameters of a CREATE TABLE statement.
    */
   void analyze(Analyzer analyzer) throws AnalysisException {
+    if (isAnalyzed_) return;
     Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
     fqTableName_ = analyzer.getFqTableName(getTblName());
     fqTableName_.analyze();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5fcc9cb4/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
index 9e2ffd7..4d5ac0a 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java
@@ -1591,6 +1591,9 @@ public class AnalyzeDDLTest extends FrontendTestBase {
     AnalyzesOk("create table t primary key(id) stored as kudu as select id, bool_col " +
         "from functional.alltypestiny",
         "Unpartitioned Kudu tables are inefficient for large data sizes.");
+    // IMPALA-5796: CTAS into a Kudu table with expr rewriting.
+    AnalyzesOk("create table t primary key(id) stored as kudu as select id, bool_col " +
+        "from functional.alltypestiny where id between 0 and 10");
     // CTAS in an external Kudu table
     AnalysisError("create external table t stored as kudu " +
         "tblproperties('kudu.table_name'='t') as select id, int_col from " +


[6/9] incubator-impala git commit: Propagate HAVE_PIPE2 compile time value to files that use it

Posted by ta...@apache.org.
Propagate HAVE_PIPE2 compile time value to files that use it

The HAVE_PIPE2 is a variable that tracks whether a platform has the
system function pipe2() present.

This value was not propagated to the appropriate file that uses it,
causing its value to always be 0, and the wrong branch to be taken
at compile time.

This fixes it by propagating the value to the file.

Change-Id: I6cdc343da35a34be8d95fbea3543d080dbc1ec29
Reviewed-on: http://gerrit.cloudera.org:8080/7705
Reviewed-by: Henry Robinson <he...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5c82f9d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5c82f9d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5c82f9d8

Branch: refs/heads/master
Commit: 5c82f9d84cc46735ed515a92d2e5d43fa5b88d84
Parents: 8609b09
Author: Sailesh Mukil <sa...@apache.org>
Authored: Thu Aug 17 11:54:42 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Aug 17 23:05:56 2017 +0000

----------------------------------------------------------------------
 be/src/kudu/util/subprocess.cc | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5c82f9d8/be/src/kudu/util/subprocess.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/subprocess.cc b/be/src/kudu/util/subprocess.cc
index b072ed8..2812f76 100644
--- a/be/src/kudu/util/subprocess.cc
+++ b/be/src/kudu/util/subprocess.cc
@@ -52,6 +52,8 @@
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/status.h"
 
+#include "common/config.h"
+
 using std::string;
 using std::unique_ptr;
 using std::vector;


[3/9] incubator-impala git commit: IMPALA-2615: support [[nodiscard]] on Status

Posted by ta...@apache.org.
IMPALA-2615: support [[nodiscard]] on Status

This is the set of changes required to get Impala to compile on
GCC 7 using the [[nodiscard]] attribute, which generates a warning
whenever a status is dropped. It is not enabled on the current
default compiler GCC 4.9.2 or Clang 3.8 so I added WARN_UNUSED_RESULT
in various classes so that we can catch the dropped statuses with
our current toolchain.

The changes are:
* Use the new [[nodiscard]] attribute and fix all the dropped
  statuses. Many were innocuous or very improbably but some appear to
  be actual bugs. Adds a discard_result() function that explicitly
  ignores the result of a function.
* Removes the bad JNI pattern of checking for exceptions after
  DeleteGlobalRef(), which doesn't throw.
* Fix miscellaneous compile errors and warnings.
* Remove use of ptr_vector, which pulls in headers with deprecated
  things.
* Fix a memory lifetime bug with default_fs_ (it was masked by
  the old refcounted std::string implementation).

Change-Id: I972543af2e9f98b12dcbb5479b4c1a7d53952197
Reviewed-on: http://gerrit.cloudera.org:8080/7253
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/86e88cad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/86e88cad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/86e88cad

Branch: refs/heads/master
Commit: 86e88cad5d7ee41264bbb66742b267ef67dec47e
Parents: da60a9a
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jun 14 17:35:08 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Aug 17 09:04:56 2017 +0000

----------------------------------------------------------------------
 CMakeLists.txt                                  |  3 +-
 be/CMakeLists.txt                               | 13 +++++-
 be/src/benchmarks/bit-packing-benchmark.cc      |  1 +
 be/src/benchmarks/expr-benchmark.cc             |  6 +--
 be/src/benchmarks/hash-benchmark.cc             |  2 +-
 be/src/benchmarks/network-perf-benchmark.cc     |  2 +-
 .../benchmarks/row-batch-serialize-benchmark.cc |  8 ++--
 be/src/catalog/catalog-server.cc                |  4 +-
 be/src/catalog/catalog-util.cc                  |  4 +-
 be/src/catalog/catalogd-main.cc                 |  4 +-
 be/src/codegen/codegen-symbol-emitter.cc        |  2 +-
 be/src/codegen/llvm-codegen-test.cc             |  2 +-
 be/src/common/compiler-util.h                   | 20 +++++++++
 be/src/common/init.cc                           |  4 +-
 be/src/common/status.h                          |  6 +--
 be/src/exec/external-data-source-executor.cc    |  2 +-
 be/src/exec/hbase-scan-node.cc                  |  2 +-
 be/src/exec/hbase-table-scanner.cc              | 27 ++++++-----
 be/src/exec/hbase-table-scanner.h               | 44 ++++++++++--------
 be/src/exec/hbase-table-writer.cc               | 13 ++----
 be/src/exec/hdfs-scan-node.cc                   |  2 +-
 be/src/exec/kudu-scan-node.cc                   |  2 +-
 be/src/exec/kudu-table-sink.h                   |  2 +-
 be/src/exec/kudu-util.h                         |  4 +-
 be/src/experiments/compression-test.cc          |  7 +--
 be/src/exprs/expr-codegen-test.cc               | 10 ++---
 be/src/exprs/expr-test.cc                       |  4 +-
 be/src/exprs/hive-udf-call.cc                   |  3 +-
 be/src/rpc/auth-provider.h                      | 14 +++---
 be/src/rpc/thrift-client.cc                     | 10 ++---
 be/src/rpc/thrift-client.h                      | 11 ++---
 be/src/runtime/buffered-tuple-stream-test.cc    |  6 +--
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 27 ++++++-----
 be/src/runtime/client-cache.cc                  |  4 +-
 be/src/runtime/client-cache.h                   | 23 +++++-----
 be/src/runtime/collection-value-builder.h       |  2 +-
 be/src/runtime/coordinator-backend-state.cc     |  5 ++-
 be/src/runtime/data-stream-recvr.cc             |  6 ++-
 be/src/runtime/data-stream-test.cc              | 13 +++---
 be/src/runtime/disk-io-mgr.cc                   |  2 +-
 be/src/runtime/exec-env.cc                      |  4 +-
 be/src/runtime/exec-env.h                       | 10 ++---
 be/src/runtime/fragment-instance-state.cc       |  4 +-
 be/src/runtime/hbase-table-factory.cc           |  2 +-
 be/src/runtime/hbase-table.cc                   |  3 +-
 be/src/runtime/parallel-executor.cc             |  2 +-
 be/src/runtime/tmp-file-mgr-test.cc             |  9 ++--
 be/src/runtime/tmp-file-mgr.cc                  |  3 +-
 be/src/scheduling/scheduler-test-util.cc        |  3 +-
 be/src/scheduling/scheduler-test.cc             | 38 ++++++++--------
 be/src/service/client-request-state.cc          |  8 ++--
 be/src/service/fe-support.cc                    | 20 ++++++---
 be/src/service/impala-beeswax-server.cc         |  8 ++--
 be/src/service/impala-hs2-server.cc             | 14 +++---
 be/src/service/impala-http-handler.cc           |  9 ++--
 be/src/service/impala-server.cc                 | 47 +++++++++++++-------
 be/src/service/impalad-main.cc                  |  3 +-
 be/src/service/query-options-test.cc            |  3 +-
 be/src/statestore/statestore.cc                 | 10 +++--
 be/src/statestore/statestore.h                  | 19 ++++----
 be/src/statestore/statestored-main.cc           |  6 ++-
 be/src/testutil/death-test-util.h               |  8 ++--
 be/src/testutil/fault-injection-util.cc         |  2 +
 be/src/testutil/impalad-query-executor.cc       |  4 +-
 be/src/testutil/in-process-servers.cc           | 10 +++--
 be/src/testutil/in-process-servers.h            |  2 +-
 be/src/util/benchmark.cc                        |  1 +
 be/src/util/bit-util-test.cc                    |  6 ++-
 be/src/util/codec.h                             |  9 ++--
 be/src/util/filesystem-util.h                   | 13 +++---
 be/src/util/hdfs-util-test.cc                   |  3 +-
 be/src/util/jni-util.cc                         |  6 ---
 be/src/util/jni-util.h                          | 31 +++++++------
 be/src/util/memory-metrics.h                    |  4 +-
 be/src/util/metrics-test.cc                     |  4 +-
 be/src/util/network-util.h                      |  4 +-
 be/src/util/parquet-reader.cc                   |  5 ++-
 be/src/util/runtime-profile.cc                  | 27 ++++++-----
 be/src/util/runtime-profile.h                   |  4 +-
 be/src/util/thread-pool.h                       |  2 +-
 be/src/util/thread.cc                           |  8 ++--
 be/src/util/thread.h                            |  8 ++--
 82 files changed, 403 insertions(+), 309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 63e5d43..31a3fc4 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -144,7 +144,8 @@ endfunction()
 
 
 find_package(Boost REQUIRED COMPONENTS thread regex filesystem system date_time)
-include_directories(${Boost_INCLUDE_DIRS})
+# Mark Boost as a system header to avoid compile warnings.
+include_directories(SYSTEM ${Boost_INCLUDE_DIRS})
 message(STATUS "Boost include dir: " ${Boost_INCLUDE_DIRS})
 message(STATUS "Boost libraries: " ${Boost_LIBRARIES})
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 22c18b6..3ac4193 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -70,6 +70,16 @@ SET(CXX_CLANG_FLAGS "${CXX_CLANG_FLAGS} -Wno-mismatched-tags")
 #   -g: Enable symbols for profiler tools
 #   -Wno-unused-local-typedefs: Do not warn for local typedefs that are unused.
 SET(CXX_GCC_FLAGS "-g -Wno-unused-local-typedefs")
+if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
+  # We need to add additional arguments for GCC 7+. We go down this branch if building
+  # with a non-GCC compiler of version 7+, but in that case CXX_GCC_FLAGS is not used,
+  # so it is inconsequential. TODO: IMPALA-5490: make this non-conditional when we
+  # upgrade GCC.
+  #  -faligned-new: new will automatically align types. Otherwise "new Counter()" in the
+  #       Kudu util code produces a warning (see KUDU-2094).
+  #   TODO: -faligned-new is part of C++17, remove flag when we bump language version.
+  SET(CXX_GCC_FLAGS "${CXX_GCC_FLAGS} -faligned-new")
+endif()
 
 # compiler flags for different build types (run 'cmake -DCMAKE_BUILD_TYPE=<type> .')
 # For CMAKE_BUILD_TYPE=Debug
@@ -264,7 +274,8 @@ set(CLANG_INCLUDE_FLAGS
   "-I${GFLAGS_INCLUDE_DIR}"
   "-I${RAPIDJSON_INCLUDE_DIR}"
   "-I${AVRO_INCLUDE_DIR}"
-  "-I${BOOST_INCLUDEDIR}"
+  # Include Boost as a system directory to suppress warnings from headers.
+  "-isystem${BOOST_INCLUDEDIR}"
   # Required so that jni.h can be found during Clang compilation
   "-I${JAVA_INCLUDE_PATH}"
   "-I${JAVA_INCLUDE_PATH2}"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/benchmarks/bit-packing-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/bit-packing-benchmark.cc b/be/src/benchmarks/bit-packing-benchmark.cc
index 6e80d83..955c0fb 100644
--- a/be/src/benchmarks/bit-packing-benchmark.cc
+++ b/be/src/benchmarks/bit-packing-benchmark.cc
@@ -261,6 +261,7 @@
 
 #include <algorithm>
 #include <iostream>
+#include <numeric>
 #include <vector>
 
 #include "gutil/strings/substitute.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/benchmarks/expr-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/expr-benchmark.cc b/be/src/benchmarks/expr-benchmark.cc
index c55ee3d..093d2b1 100644
--- a/be/src/benchmarks/expr-benchmark.cc
+++ b/be/src/benchmarks/expr-benchmark.cc
@@ -65,8 +65,8 @@ using namespace impala;
 class Planner {
  public:
   Planner() {
-    frontend_.SetCatalogInitialized();
-    exec_env_.InitForFeTests();
+    ABORT_IF_ERROR(frontend_.SetCatalogInitialized());
+    ABORT_IF_ERROR(exec_env_.InitForFeTests());
   }
 
   Status GeneratePlan(const string& stmt, TExecRequest* result) {
@@ -561,7 +561,7 @@ Benchmark* BenchmarkTimestampFunctions() {
 int main(int argc, char** argv) {
   impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
   impala::InitFeSupport(false);
-  impala::LlvmCodeGen::InitializeLlvm();
+  ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm());
 
   // Dynamically construct at runtime as the planner initialization depends on
   // static objects being initialized in other compilation modules.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/benchmarks/hash-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/hash-benchmark.cc b/be/src/benchmarks/hash-benchmark.cc
index 1b56b7b..dadad25 100644
--- a/be/src/benchmarks/hash-benchmark.cc
+++ b/be/src/benchmarks/hash-benchmark.cc
@@ -420,7 +420,7 @@ int main(int argc, char **argv) {
   cout << Benchmark::GetMachineInfo() << endl;
   impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
   impala::InitFeSupport();
-  LlvmCodeGen::InitializeLlvm();
+  ABORT_IF_ERROR(LlvmCodeGen::InitializeLlvm());
 
   const int NUM_ROWS = 1024;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/benchmarks/network-perf-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/network-perf-benchmark.cc b/be/src/benchmarks/network-perf-benchmark.cc
index 6cebaaf..1a0de24 100644
--- a/be/src/benchmarks/network-perf-benchmark.cc
+++ b/be/src/benchmarks/network-perf-benchmark.cc
@@ -81,7 +81,7 @@ class TestServer : public NetworkTestServiceIf {
   }
 
   void Server(ThriftServer* server) {
-    server->Start();
+    ABORT_IF_ERROR(server->Start());
     server->Join();
   }
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/benchmarks/row-batch-serialize-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc b/be/src/benchmarks/row-batch-serialize-benchmark.cc
index ee531e6..699b91b 100644
--- a/be/src/benchmarks/row-batch-serialize-benchmark.cc
+++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc
@@ -289,7 +289,7 @@ class RowBatchSerializeBenchmark {
     SerializeArgs* args = reinterpret_cast<SerializeArgs*>(data);
     for (int iter = 0; iter < batch_size; ++iter) {
       TRowBatch trow_batch;
-      args->batch->Serialize(&trow_batch, args->full_dedup);
+      ABORT_IF_ERROR(args->batch->Serialize(&trow_batch, args->full_dedup));
     }
   }
 
@@ -338,19 +338,19 @@ class RowBatchSerializeBenchmark {
     RowBatch* no_dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, &tracker));
     FillBatch(no_dup_batch, 12345, 1, -1);
     TRowBatch no_dup_tbatch;
-    no_dup_batch->Serialize(&no_dup_tbatch);
+    ABORT_IF_ERROR(no_dup_batch->Serialize(&no_dup_tbatch));
 
     RowBatch* adjacent_dup_batch =
         obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, &tracker));
     FillBatch(adjacent_dup_batch, 12345, 5, -1);
     TRowBatch adjacent_dup_tbatch;
-    adjacent_dup_batch->Serialize(&adjacent_dup_tbatch, false);
+    ABORT_IF_ERROR(adjacent_dup_batch->Serialize(&adjacent_dup_tbatch, false));
 
     RowBatch* dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, &tracker));
     // Non-adjacent duplicates.
     FillBatch(dup_batch, 12345, 1, NUM_ROWS / 5);
     TRowBatch dup_tbatch;
-    dup_batch->Serialize(&dup_tbatch, true);
+    ABORT_IF_ERROR(dup_batch->Serialize(&dup_tbatch, true));
 
     int baseline;
     Benchmark ser_suite("serialize");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index bf2892f..0e0cca0 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -401,11 +401,11 @@ void CatalogServer::CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args
 
     // Get the object type and name from the topic entry key
     TCatalogObject request;
-    TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request);
+    Status status = TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request);
 
     // Get the object and dump its contents.
     TCatalogObject result;
-    Status status = catalog_->GetCatalogObject(request, &result);
+    if (status.ok()) status = catalog_->GetCatalogObject(request, &result);
     if (status.ok()) {
       Value debug_string(ThriftDebugString(result).c_str(), document->GetAllocator());
       document->AddMember("thrift_string", debug_string, document->GetAllocator());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/catalog/catalog-util.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-util.cc b/be/src/catalog/catalog-util.cc
index 57d21b3..de4f2fd 100644
--- a/be/src/catalog/catalog-util.cc
+++ b/be/src/catalog/catalog-util.cc
@@ -201,9 +201,9 @@ Status CompressCatalogObject(string* catalog_object) {
       const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(output_buffer.data()));
   ReadWriteUtil::PutInt(output_buffer_ptr, static_cast<uint32_t>(catalog_object->size()));
   output_buffer_ptr += sizeof(uint32_t);
-  compressor->ProcessBlock(true, catalog_object->size(),
+  RETURN_IF_ERROR(compressor->ProcessBlock(true, catalog_object->size(),
       reinterpret_cast<const uint8_t*>(catalog_object->data()), &compressed_data_len,
-      &output_buffer_ptr);
+      &output_buffer_ptr));
   output_buffer.resize(compressed_data_len + sizeof(uint32_t));
   *catalog_object = move(output_buffer);
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/catalog/catalogd-main.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalogd-main.cc b/be/src/catalog/catalogd-main.cc
index fe681d0..a3a0edb 100644
--- a/be/src/catalog/catalogd-main.cc
+++ b/be/src/catalog/catalogd-main.cc
@@ -73,10 +73,10 @@ int CatalogdMain(int argc, char** argv) {
     LOG(INFO) << "Not starting webserver";
   }
 
-  metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr);
+  ABORT_IF_ERROR(metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr));
   ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), true, nullptr, nullptr));
   StartMemoryMaintenanceThread();
-  StartThreadInstrumentation(metrics.get(), webserver.get(), true);
+  ABORT_IF_ERROR(StartThreadInstrumentation(metrics.get(), webserver.get(), true));
 
   InitRpcEventTracing(webserver.get());
   metrics->AddProperty<string>("catalog.version", GetVersionString(true));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/codegen/codegen-symbol-emitter.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/codegen-symbol-emitter.cc b/be/src/codegen/codegen-symbol-emitter.cc
index b2c64e2..02008c7 100644
--- a/be/src/codegen/codegen-symbol-emitter.cc
+++ b/be/src/codegen/codegen-symbol-emitter.cc
@@ -61,7 +61,7 @@ void CodegenSymbolEmitter::NotifyObjectEmitted(const ObjectFile &obj,
     if (asm_file.fail()) {
       // Log error and continue if we can't write the disassembly to a file. Note that
       // fstream operations don't throw exceptions by default unless configured to do so.
-      LOG(ERROR) << "Could not save disassembly to: " << asm_file;
+      LOG(ERROR) << "Could not save disassembly to: " << asm_path_;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/codegen/llvm-codegen-test.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen-test.cc b/be/src/codegen/llvm-codegen-test.cc
index 61b942f..f6e1a57 100644
--- a/be/src/codegen/llvm-codegen-test.cc
+++ b/be/src/codegen/llvm-codegen-test.cc
@@ -462,6 +462,6 @@ int main(int argc, char **argv) {
   ::testing::InitGoogleTest(&argc, argv);
   impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
   impala::InitFeSupport(false);
-  impala::LlvmCodeGen::InitializeLlvm();
+  ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm());
   return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/common/compiler-util.h
----------------------------------------------------------------------
diff --git a/be/src/common/compiler-util.h b/be/src/common/compiler-util.h
index 5132eb0..592c08c 100644
--- a/be/src/common/compiler-util.h
+++ b/be/src/common/compiler-util.h
@@ -53,6 +53,26 @@
 #define RESTRICT __restrict__
 #endif
 
+/// GCC 5+ and Clang 3.6+ support __has_cpp_attribute(). Always return false on compilers
+/// that don't know about __has_cpp_attribute().
+#if !defined(__GNUC__) || __GNUC__ >= 5
+#define HAS_CPP_ATTRIBUTE(attr) __has_cpp_attribute(attr)
+#else
+#define HAS_CPP_ATTRIBUTE(attr) 0
+#endif
+
+// Use [[nodiscard]] specifier if supported by our compiler.
+#if HAS_CPP_ATTRIBUTE(nodiscard)
+#define NODISCARD [[nodiscard]]
+#else
+#define NODISCARD
+#endif
+
+// Suppress warnings when ignoring the return value from a function annotated with
+// WARN_UNUSED_RESULT. Based on ignore_result() in gutil/basictypes.h.
+template<typename T>
+inline void discard_result(const T&) {}
+
 namespace impala {
 
 /// The size of an L1 cache line in bytes on x86-64.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/common/init.cc
----------------------------------------------------------------------
diff --git a/be/src/common/init.cc b/be/src/common/init.cc
index d778523..9734610 100644
--- a/be/src/common/init.cc
+++ b/be/src/common/init.cc
@@ -184,7 +184,7 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
   CpuInfo::VerifyCpuRequirements();
 
   // Set the default hostname. The user can override this with the hostname flag.
-  GetHostname(&FLAGS_hostname);
+  ABORT_IF_ERROR(GetHostname(&FLAGS_hostname));
 
   google::SetVersionString(impala::GetBuildVersion());
   google::ParseCommandLineFlags(&argc, &argv, true);
@@ -225,7 +225,7 @@ void impala::InitCommonRuntime(int argc, char** argv, bool init_jvm,
   LOG(INFO) << "Process ID: " << getpid();
 
   // Required for the FE's Catalog
-  impala::LibCache::Init();
+  ABORT_IF_ERROR(impala::LibCache::Init());
   Status fs_cache_init_status = impala::HdfsFsCache::Init();
   if (!fs_cache_init_status.ok()) CLEAN_EXIT_WITH_ERROR(fs_cache_init_status.GetDetail());
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/common/status.h
----------------------------------------------------------------------
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 91ad907..cf7481b 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -80,7 +80,7 @@ namespace impala {
 /// TODO: macros:
 /// RETURN_IF_ERROR(status) << "msg"
 /// MAKE_ERROR() << "msg"
-class Status {
+class NODISCARD Status {
  public:
   typedef strings::internal::SubstituteArg ArgType;
 
@@ -265,7 +265,7 @@ std::ostream& operator<<(std::ostream& os, const Status& status);
 /// some generally useful macros
 #define RETURN_IF_ERROR(stmt)                          \
   do {                                                 \
-    Status __status__ = (stmt);                        \
+    ::impala::Status __status__ = (stmt);              \
     if (UNLIKELY(!__status__.ok())) return __status__; \
   } while (false)
 
@@ -276,7 +276,7 @@ std::ostream& operator<<(std::ostream& os, const Status& status);
 
 #define ABORT_IF_ERROR(stmt) \
   do { \
-    Status __status__ = (stmt); \
+    ::impala::Status __status__ = (stmt); \
     if (UNLIKELY(!__status__.ok())) { \
       ABORT_WITH_ERROR(__status__.GetDetail()); \
     } \

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/external-data-source-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/external-data-source-executor.cc b/be/src/exec/external-data-source-executor.cc
index df93893..7c810c6 100644
--- a/be/src/exec/external-data-source-executor.cc
+++ b/be/src/exec/external-data-source-executor.cc
@@ -204,7 +204,7 @@ Status ExternalDataSourceExecutor::Close(const TCloseParams& params,
   Status status = CallJniMethod(executor_, s.close_id_, params,
       result);
   JNIEnv* env = getJNIEnv();
-  if (executor_ != NULL) status.MergeStatus(JniUtil::FreeGlobalRef(env, executor_));
+  if (executor_ != NULL) env->DeleteGlobalRef(executor_);
   is_initialized_ = false;
   return status;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/hbase-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index 3d74d81..e783731 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -239,7 +239,7 @@ Status HBaseScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eo
         ss << "hbase table: " << table_name_ << endl;
         void* key;
         int key_length;
-        hbase_scanner_->GetRowKey(env, &key, &key_length);
+        RETURN_IF_ERROR(hbase_scanner_->GetRowKey(env, &key, &key_length));
         ss << "row key: " << string(reinterpret_cast<const char*>(key), key_length);
         state->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str()));
       }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/hbase-table-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-scanner.cc b/be/src/exec/hbase-table-scanner.cc
index 6e0d1ac..cf13e75 100644
--- a/be/src/exec/hbase-table-scanner.cc
+++ b/be/src/exec/hbase-table-scanner.cc
@@ -430,7 +430,7 @@ Status HBaseTableScanner::HandleResultScannerTimeout(JNIEnv* env, bool* timeout)
   jbyteArray start_bytes =
     (jbyteArray) env->CallObjectMethod(cell, cell_get_row_array_);
   jbyteArray end_bytes;
-  CreateByteArray(env, scan_range.stop_key(), &end_bytes);
+  RETURN_IF_ERROR(CreateByteArray(env, scan_range.stop_key(), &end_bytes));
   return InitScanRange(env, start_bytes, end_bytes);
 }
 
@@ -438,9 +438,9 @@ Status HBaseTableScanner::InitScanRange(JNIEnv* env, const ScanRange& scan_range
   JniLocalFrame jni_frame;
   RETURN_IF_ERROR(jni_frame.push(env));
   jbyteArray start_bytes;
-  CreateByteArray(env, scan_range.start_key(), &start_bytes);
+  RETURN_IF_ERROR(CreateByteArray(env, scan_range.start_key(), &start_bytes));
   jbyteArray end_bytes;
-  CreateByteArray(env, scan_range.stop_key(), &end_bytes);
+  RETURN_IF_ERROR(CreateByteArray(env, scan_range.stop_key(), &end_bytes));
   return InitScanRange(env, start_bytes, end_bytes);
 }
 
@@ -457,7 +457,8 @@ Status HBaseTableScanner::InitScanRange(JNIEnv* env, jbyteArray start_bytes,
   if (resultscanner_ != NULL) {
     // resultscanner_.close();
     env->CallObjectMethod(resultscanner_, resultscanner_close_id_);
-    RETURN_IF_ERROR(JniUtil::FreeGlobalRef(env, resultscanner_));
+    RETURN_ERROR_IF_EXC(env);
+    env->DeleteGlobalRef(resultscanner_);
     resultscanner_ = NULL;
   }
   // resultscanner_ = htable_.getScanner(scan_);
@@ -542,7 +543,7 @@ Status HBaseTableScanner::Next(JNIEnv* env, bool* has_next) {
     return Status::OK();
   }
 
-  if (cells_ != NULL) RETURN_IF_ERROR(JniUtil::FreeGlobalRef(env, cells_));
+  if (cells_ != NULL) env->DeleteGlobalRef(cells_);
   // cells_ = result.raw();
   jobject local_cells = reinterpret_cast<jobjectArray>(
       env->CallObjectMethod(result, result_raw_cells_id_));
@@ -656,7 +657,7 @@ Status HBaseTableScanner::GetRowKey(JNIEnv* env, const SlotDescriptor* slot_desc
   void* key;
   int key_length;
   jobject cell = env->GetObjectArrayElement(cells_, 0);
-  GetRowKey(env, cell, &key, &key_length);
+  RETURN_IF_ERROR(GetRowKey(env, cell, &key, &key_length));
   DCHECK_EQ(key_length, slot_desc->type().GetByteSize());
   WriteTupleSlot(slot_desc, tuple, reinterpret_cast<char*>(key));
   RETURN_ERROR_IF_EXC(env);
@@ -700,7 +701,7 @@ Status HBaseTableScanner::GetCurrentValue(JNIEnv* env, const string& family,
 Status HBaseTableScanner::GetValue(JNIEnv* env, const string& family,
     const string& qualifier, void** value, int* value_length) {
   bool is_null;
-  GetCurrentValue(env, family, qualifier, value, value_length, &is_null);
+  RETURN_IF_ERROR(GetCurrentValue(env, family, qualifier, value, value_length, &is_null));
   RETURN_ERROR_IF_EXC(env);
   if (is_null) {
     *value = NULL;
@@ -716,7 +717,8 @@ Status HBaseTableScanner::GetValue(JNIEnv* env, const string& family,
   void* value;
   int value_length;
   bool is_null;
-  GetCurrentValue(env, family, qualifier, &value, &value_length, &is_null);
+  RETURN_IF_ERROR(
+      GetCurrentValue(env, family, qualifier, &value, &value_length, &is_null));
   RETURN_ERROR_IF_EXC(env);
   if (is_null) {
     tuple->SetNull(slot_desc->null_indicator_offset());
@@ -755,15 +757,16 @@ void HBaseTableScanner::Close(JNIEnv* env) {
                   << "(this does not necessarily indicate a problem)";
       } else {
         // GetJniExceptionMsg will clear the exception status and log
-        JniUtil::GetJniExceptionMsg(env, true,
+        Status status = JniUtil::GetJniExceptionMsg(env, true,
             "Unknown error occurred while closing ResultScanner: ");
+        if (!status.ok()) LOG(WARNING) << "Error closing ResultScanner()";
       }
     }
-    JniUtil::FreeGlobalRef(env, resultscanner_);
+    env->DeleteGlobalRef(resultscanner_);
     resultscanner_ = NULL;
   }
-  if (scan_ != NULL) JniUtil::FreeGlobalRef(env, scan_);
-  if (cells_ != NULL) JniUtil::FreeGlobalRef(env, cells_);
+  if (scan_ != NULL) env->DeleteGlobalRef(scan_);
+  if (cells_ != NULL) env->DeleteGlobalRef(cells_);
 
   // Close the HTable so that the connections are not kept around.
   if (htable_.get() != NULL) htable_->Close(state_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/hbase-table-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-scanner.h b/be/src/exec/hbase-table-scanner.h
index 865dafc..0d9abe2 100644
--- a/be/src/exec/hbase-table-scanner.h
+++ b/be/src/exec/hbase-table-scanner.h
@@ -85,7 +85,7 @@ class HBaseTableScanner {
 
   /// JNI setup. Create global references to classes,
   /// and find method ids.
-  static Status Init();
+  static Status Init() WARN_UNUSED_RESULT;
 
   /// HBase scan range; "" means unbounded
   class ScanRange {
@@ -114,33 +114,34 @@ class HBaseTableScanner {
   /// If start_/stop_key is not empty, is used for the corresponding role in the scan.
   /// Note: scan_range_vector cannot be modified for the duration of the scan.
   Status StartScan(JNIEnv* env, const TupleDescriptor* tuple_desc,
-                   const ScanRangeVector& scan_range_vector,
-                   const std::vector<THBaseFilter>& filters);
+      const ScanRangeVector& scan_range_vector,
+      const std::vector<THBaseFilter>& filters) WARN_UNUSED_RESULT;
 
   /// Position cursor to next row. Sets has_next to true if more rows exist, false
   /// otherwise.
   /// Returns non-ok status if an error occurred.
-  Status Next(JNIEnv* env, bool* has_next);
+  Status Next(JNIEnv* env, bool* has_next) WARN_UNUSED_RESULT;
 
   /// Get the current HBase row key.
-  Status GetRowKey(JNIEnv* env, void** key, int* key_length);
+  Status GetRowKey(JNIEnv* env, void** key, int* key_length) WARN_UNUSED_RESULT;
 
   /// Write the current HBase row key into the tuple slot.
   /// This is used for retrieving binary encoded data directly into the tuple.
-  Status GetRowKey(JNIEnv* env, const SlotDescriptor* slot_desc, Tuple* tuple);
+  Status GetRowKey(
+      JNIEnv* env, const SlotDescriptor* slot_desc, Tuple* tuple) WARN_UNUSED_RESULT;
 
   /// Used to fetch HBase values in order of family/qualifier.
   /// Fetch the next value matching family and qualifier into value/value_length.
   /// If there is no match, value is set to NULL and value_length to 0.
   Status GetValue(JNIEnv* env, const std::string& family, const std::string& qualifier,
-      void** value, int* value_length);
+      void** value, int* value_length) WARN_UNUSED_RESULT;
 
   /// Used to fetch HBase values in order of family/qualifier.
   /// Fetch the next value matching family and qualifier into the tuple slot.
   /// If there is no match, the tuple slot is set to null.
   /// This is used for retrieving binary encoded data directly into the tuple.
   Status GetValue(JNIEnv* env, const std::string& family, const std::string& qualifier,
-      const SlotDescriptor* slot_desc, Tuple* tuple);
+      const SlotDescriptor* slot_desc, Tuple* tuple) WARN_UNUSED_RESULT;
 
   /// Close HTable and ResultScanner.
   void Close(JNIEnv* env);
@@ -262,7 +263,7 @@ class HBaseTableScanner {
   /// is returned in the status. In HBase 2.0, ScannerTimeoutException no longer
   /// exists and the error message is returned in the status.
   /// 'timeout' is true if a ScannerTimeoutException was thrown, false otherwise.
-  Status HandleResultScannerTimeout(JNIEnv* env, bool* timeout);
+  Status HandleResultScannerTimeout(JNIEnv* env, bool* timeout) WARN_UNUSED_RESULT;
 
   /// Lexicographically compares s with the string in data having given length.
   /// Returns a value > 0 if s is greater, a value < 0 if s is smaller,
@@ -270,39 +271,46 @@ class HBaseTableScanner {
   int CompareStrings(const std::string& s, void* data, int length);
 
   /// Turn strings into Java byte array.
-  Status CreateByteArray(JNIEnv* env, const std::string& s, jbyteArray* bytes);
+  Status CreateByteArray(
+      JNIEnv* env, const std::string& s, jbyteArray* bytes) WARN_UNUSED_RESULT;
 
   /// First time scanning the table, do some setup
   Status ScanSetup(JNIEnv* env, const TupleDescriptor* tuple_desc,
-                   const std::vector<THBaseFilter>& filters);
+      const std::vector<THBaseFilter>& filters) WARN_UNUSED_RESULT;
 
   /// Initialize the scan to the given range
-  Status InitScanRange(JNIEnv* env, const ScanRange& scan_range);
+  Status InitScanRange(JNIEnv* env, const ScanRange& scan_range) WARN_UNUSED_RESULT;
   /// Initialize the scan range to the scan range specified by the start and end byte
   /// arrays
-  Status InitScanRange(JNIEnv* env, jbyteArray start_bytes, jbyteArray end_bytes);
+  Status InitScanRange(
+      JNIEnv* env, jbyteArray start_bytes, jbyteArray end_bytes) WARN_UNUSED_RESULT;
 
   /// Copies the row key of cell into value_pool_ and returns it via *data and *length.
   /// Returns error status if memory limit is exceeded.
-  inline Status GetRowKey(JNIEnv* env, jobject cell, void** data, int* length);
+  inline Status GetRowKey(
+      JNIEnv* env, jobject cell, void** data, int* length) WARN_UNUSED_RESULT;
 
   /// Copies the column family of cell into value_pool_ and returns it
   /// via *data and *length. Returns error status if memory limit is exceeded.
-  inline Status GetFamily(JNIEnv* env, jobject cell, void** data, int* length);
+  inline Status GetFamily(
+      JNIEnv* env, jobject cell, void** data, int* length) WARN_UNUSED_RESULT;
 
   /// Copies the column qualifier of cell into value_pool_ and returns it
   /// via *data and *length. Returns error status if memory limit is exceeded.
-  inline Status GetQualifier(JNIEnv* env, jobject cell, void** data, int* length);
+  inline Status GetQualifier(
+      JNIEnv* env, jobject cell, void** data, int* length) WARN_UNUSED_RESULT;
 
   /// Copies the value of cell into value_pool_ and returns it via *data and *length.
   /// Returns error status if memory limit is exceeded.
-  inline Status GetValue(JNIEnv* env, jobject cell, void** data, int* length);
+  inline Status GetValue(
+      JNIEnv* env, jobject cell, void** data, int* length) WARN_UNUSED_RESULT;
 
   /// Returns the current value of cells_[cell_index_] in *data and *length
   /// if its family/qualifier match the given family/qualifier.
   /// Otherwise, sets *is_null to true indicating a mismatch in family or qualifier.
   inline Status GetCurrentValue(JNIEnv* env, const std::string& family,
-      const std::string& qualifier, void** data, int* length, bool* is_null);
+      const std::string& qualifier, void** data, int* length,
+      bool* is_null) WARN_UNUSED_RESULT;
 
   /// Write to a tuple slot with the given hbase binary formatted data, which is in
   /// big endian.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/hbase-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-writer.cc b/be/src/exec/hbase-table-writer.cc
index 14fe121..2723eb7 100644
--- a/be/src/exec/hbase-table-writer.cc
+++ b/be/src/exec/hbase-table-writer.cc
@@ -194,7 +194,7 @@ Status HBaseTableWriter::AppendRows(RowBatch* batch) {
     RETURN_IF_ERROR(table_->Put(put_list_));
   }
   // Now clean put_list_.
-  RETURN_IF_ERROR(JniUtil::FreeGlobalRef(env, put_list_));
+  env->DeleteGlobalRef(put_list_);
   put_list_ = NULL;
   return Status::OK();
 }
@@ -204,17 +204,12 @@ Status HBaseTableWriter::CleanUpJni() {
   if (env == NULL) return Status("Error getting JNIEnv.");
 
   if (put_list_ != NULL) {
-    RETURN_IF_ERROR(JniUtil::FreeGlobalRef(env, put_list_));
+    env->DeleteGlobalRef(put_list_);
     put_list_ = NULL;
   }
 
-  for (jbyteArray ref: cf_arrays_) {
-    RETURN_IF_ERROR(JniUtil::FreeGlobalRef(env, ref));
-  }
-  for (jbyteArray ref: qual_arrays_) {
-    RETURN_IF_ERROR(JniUtil::FreeGlobalRef(env, ref));
-  }
-
+  for (jbyteArray ref: cf_arrays_) env->DeleteGlobalRef(ref);
+  for (jbyteArray ref: qual_arrays_) env->DeleteGlobalRef(ref);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 1ab87ac..557d346 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -354,7 +354,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
 
     auto fn = [this]() { this->ScannerThread(); };
     scanner_threads_.AddThread(
-        new Thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn));
+        make_unique<Thread>(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 0845d9a..e192a86 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -164,7 +164,7 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
     auto fn = [this, token, name]() { this->RunScannerThread(name, token); };
     VLOG_RPC << "Thread started: " << name;
     scanner_threads_.AddThread(
-        new Thread(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn));
+        make_unique<Thread>(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/kudu-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index 23e7033..06f5f96 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -82,7 +82,7 @@ class KuduTableSink : public DataSink {
   /// appropriate counters for ignored errors.
   //
   /// Returns a bad Status if there are non-ignorable errors.
-  Status CheckForErrors(RuntimeState* state);
+  Status CheckForErrors(RuntimeState* state) WARN_UNUSED_RESULT;
 
   /// Used to get the KuduTableDescriptor from the RuntimeState
   TableId table_id_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exec/kudu-util.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h
index eb32bcd..4d9b77b 100644
--- a/be/src/exec/kudu-util.h
+++ b/be/src/exec/kudu-util.h
@@ -48,7 +48,7 @@ bool KuduClientIsSupported();
 /// Returns OK if Kudu is available or an error status containing the reason Kudu is not
 /// available. Kudu may not be available if no Kudu client is available for the platform
 /// or if Kudu was disabled by the startup flag --disable_kudu.
-Status CheckKuduAvailability();
+Status CheckKuduAvailability() WARN_UNUSED_RESULT;
 
 /// Convenience function for the bool equivalent of CheckKuduAvailability().
 bool KuduIsAvailable();
@@ -56,7 +56,7 @@ bool KuduIsAvailable();
 /// Creates a new KuduClient using the specified master adresses. If any error occurs,
 /// 'client' is not set and an error status is returned.
 Status CreateKuduClient(const std::vector<std::string>& master_addrs,
-    kudu::client::sp::shared_ptr<kudu::client::KuduClient>* client);
+    kudu::client::sp::shared_ptr<kudu::client::KuduClient>* client) WARN_UNUSED_RESULT;
 
 /// Returns a debug string for the KuduSchema.
 std::string KuduSchemaDebugString(const kudu::client::KuduSchema& schema);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/experiments/compression-test.cc
----------------------------------------------------------------------
diff --git a/be/src/experiments/compression-test.cc b/be/src/experiments/compression-test.cc
index f6b87bb..48ed666 100644
--- a/be/src/experiments/compression-test.cc
+++ b/be/src/experiments/compression-test.cc
@@ -74,12 +74,13 @@ void TestCompression(int num, int min_len, int max_len, THdfsCompression::type c
 
   int64_t compressed_len = compressor->MaxOutputLen(offset);
   uint8_t* compressed_buffer = (uint8_t*)malloc(compressed_len);
-  compressor->ProcessBlock(true, offset, buffer, &compressed_len, &compressed_buffer);
+  ABORT_IF_ERROR(
+      compressor->ProcessBlock(true, offset, buffer, &compressed_len, &compressed_buffer));
 
   int64_t sorted_compressed_len = compressor->MaxOutputLen(offset);
   uint8_t* sorted_compressed_buffer = (uint8_t*)malloc(sorted_compressed_len);
-  compressor->ProcessBlock(true, offset, sorted_buffer, &sorted_compressed_len,
-                           &sorted_compressed_buffer);
+  ABORT_IF_ERROR(compressor->ProcessBlock(true, offset, sorted_buffer,
+        &sorted_compressed_len, &sorted_compressed_buffer));
 
   cout << "NumStrings=" << num << " MinLen=" << min_len << " MaxLen=" << max_len
        << " Codec=" << codec << endl;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exprs/expr-codegen-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-codegen-test.cc b/be/src/exprs/expr-codegen-test.cc
index 24b897e..9ea865a 100644
--- a/be/src/exprs/expr-codegen-test.cc
+++ b/be/src/exprs/expr-codegen-test.cc
@@ -20,6 +20,10 @@
 #include "exprs/scalar-expr.h"
 #include "udf/udf.h"
 
+#ifdef IR_COMPILE
+#include "exprs/decimal-operators-ir.cc"
+#endif
+
 using namespace impala;
 using namespace impala_udf;
 
@@ -31,10 +35,6 @@ struct FnAttr {
   int arg2_type_size;
 };
 
-#ifdef IR_COMPILE
-#include "exprs/decimal-operators-ir.cc"
-#endif
-
 DecimalVal TestGetFnAttrs(
     FunctionContext* ctx, const DecimalVal& arg0, BooleanVal& arg1, StringVal& arg2) {
   FnAttr* state = reinterpret_cast<FnAttr*>(
@@ -359,7 +359,7 @@ int main(int argc, char **argv) {
   ::testing::InitGoogleTest(&argc, argv);
   InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST);
   InitFeSupport();
-  LlvmCodeGen::InitializeLlvm();
+  ABORT_IF_ERROR(LlvmCodeGen::InitializeLlvm());
 
   return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 1e3366f..e04abc5 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -3927,7 +3927,7 @@ TEST_F(ExprTest, SessionFunctions) {
 
   map<Session, map<Query, string>> results;
   for (Session session: {S1, S2}) {
-    executor_->Setup(); // Starts new session
+    ASSERT_OK(executor_->Setup()); // Starts new session
     results[session][Q1] = GetValue("current_session()", TYPE_STRING);
     results[session][Q2] = GetValue("current_sid()", TYPE_STRING);
   }
@@ -7307,7 +7307,7 @@ int main(int argc, char **argv) {
   InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST);
   ABORT_IF_ERROR(TimezoneDatabase::Initialize());
   InitFeSupport(false);
-  impala::LlvmCodeGen::InitializeLlvm();
+  ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm());
 
   // Disable llvm optimization passes if the env var is no set to true. Running without
   // the optimizations makes the tests run much faster.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/exprs/hive-udf-call.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/hive-udf-call.cc b/be/src/exprs/hive-udf-call.cc
index 3e071bf..a3e0fe6 100644
--- a/be/src/exprs/hive-udf-call.cc
+++ b/be/src/exprs/hive-udf-call.cc
@@ -246,8 +246,7 @@ void HiveUdfCall::CloseEvaluator(FunctionContext::FunctionStateScope scope,
       if (jni_ctx->executor != NULL) {
         env->CallNonvirtualVoidMethodA(
             jni_ctx->executor, executor_cl_, executor_close_id_, NULL);
-        Status status = JniUtil::FreeGlobalRef(env, jni_ctx->executor);
-        if (!status.ok()) LOG(ERROR) << status.GetDetail();
+        env->DeleteGlobalRef(jni_ctx->executor);
       }
       if (jni_ctx->input_values_buffer != NULL) {
         delete[] jni_ctx->input_values_buffer;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/rpc/auth-provider.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/auth-provider.h b/be/src/rpc/auth-provider.h
index 2b5b747..0021dc7 100644
--- a/be/src/rpc/auth-provider.h
+++ b/be/src/rpc/auth-provider.h
@@ -39,19 +39,21 @@ class Thread;
 class AuthProvider {
  public:
   /// Initialises any state required to perform authentication using this provider.
-  virtual Status Start() = 0;
+  virtual Status Start() WARN_UNUSED_RESULT = 0;
 
   /// Creates a new Thrift transport factory in the out parameter that performs
   /// authorisation per this provider's protocol.
   virtual Status GetServerTransportFactory(
-      boost::shared_ptr<apache::thrift::transport::TTransportFactory>* factory) = 0;
+      boost::shared_ptr<apache::thrift::transport::TTransportFactory>* factory)
+      WARN_UNUSED_RESULT = 0;
 
-  /// Called by Thrift clients to wrap a raw transport with any intermediate transport that
-  /// an auth protocol requires.
+  /// Called by Thrift clients to wrap a raw transport with any intermediate transport
+  /// that an auth protocol requires.
   virtual Status WrapClientTransport(const std::string& hostname,
       boost::shared_ptr<apache::thrift::transport::TTransport> raw_transport,
       const std::string& service_name,
-      boost::shared_ptr<apache::thrift::transport::TTransport>* wrapped_transport) = 0;
+      boost::shared_ptr<apache::thrift::transport::TTransport>* wrapped_transport)
+      WARN_UNUSED_RESULT = 0;
 
   /// Returns true if this provider uses Sasl at the transport layer.
   virtual bool is_sasl() = 0;
@@ -151,7 +153,7 @@ class SaslAuthProvider : public AuthProvider {
   void RunKinit(Promise<Status>* first_kinit);
 
   /// One-time kerberos-specific environment variable setup.  Called by InitKerberos().
-  Status InitKerberosEnv();
+  Status InitKerberosEnv() WARN_UNUSED_RESULT;
 };
 
 /// This provider implements no authentication, so any connection is immediately

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/rpc/thrift-client.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-client.cc b/be/src/rpc/thrift-client.cc
index a64cdd3..f8da136 100644
--- a/be/src/rpc/thrift-client.cc
+++ b/be/src/rpc/thrift-client.cc
@@ -42,16 +42,16 @@ ThriftClientImpl::ThriftClientImpl(const std::string& ipaddress, int port, bool
   : address_(MakeNetworkAddress(ipaddress, port)), ssl_(ssl) {
   if (ssl_) {
     SSLProtocol version;
-    socket_create_status_ =
+    init_status_ =
         SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &version);
-    if (!socket_create_status_.ok()) return;
+    if (!init_status_.ok()) return;
     ssl_factory_.reset(new TSSLSocketFactory(version));
   }
-  socket_create_status_ = CreateSocket();
+  init_status_ = CreateSocket();
 }
 
 Status ThriftClientImpl::Open() {
-  if (!socket_create_status_.ok()) return socket_create_status_;
+  RETURN_IF_ERROR(init_status_);
   try {
     if (!transport_->isOpen()) {
       transport_->open();
@@ -71,7 +71,7 @@ Status ThriftClientImpl::Open() {
 
 Status ThriftClientImpl::OpenWithRetry(uint32_t num_tries, uint64_t wait_ms) {
   // Socket creation failures are not recoverable.
-  if (!socket_create_status_.ok()) return socket_create_status_;
+  RETURN_IF_ERROR(init_status_);
 
   uint32_t try_count = 0L;
   while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/rpc/thrift-client.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-client.h b/be/src/rpc/thrift-client.h
index 778075c..8194016 100644
--- a/be/src/rpc/thrift-client.h
+++ b/be/src/rpc/thrift-client.h
@@ -68,7 +68,7 @@ class ThriftClientImpl {
   /// Set send timeout on the underlying TSocket.
   void setSendTimeout(int32_t ms) { socket_->setSendTimeout(ms); }
 
-  Status socket_create_status() { return socket_create_status_; }
+  Status init_status() { return init_status_; }
 
  protected:
   ThriftClientImpl(const std::string& ipaddress, int port, bool ssl);
@@ -83,7 +83,7 @@ class ThriftClientImpl {
   /// True if ssl encryption is enabled on this connection.
   bool ssl_;
 
-  Status socket_create_status_;
+  Status init_status_;
 
   /// Sasl Client object.  Contains client kerberos identification data.
   /// Will be NULL if kerberos is not being used.
@@ -145,15 +145,16 @@ ThriftClient<InterfaceType>::ThriftClient(const std::string& ipaddress, int port
   // not use the client after that.
   // TODO: Move initialization code that can fail into a separate Init() method.
   if (socket_ == NULL) {
-    DCHECK(!socket_create_status_.ok());
+    DCHECK(!init_status_.ok());
     return;
   }
 
   // transport_ is created by wrapping the socket_ in the TTransport provided by the
   // auth_provider_ and then a TBufferedTransport (IMPALA-1928).
   transport_ = socket_;
-  auth_provider_->WrapClientTransport(address_.hostname, transport_, service_name,
-      &transport_);
+  init_status_ = auth_provider_->WrapClientTransport(address_.hostname, transport_,
+      service_name, &transport_);
+  if (!init_status_.ok()) return; // The caller will decide what to do with the Status.
   ThriftServer::BufferedTransportFactory factory;
   transport_ = factory.getTransport(transport_);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 9fe0618..0b89498 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -1317,7 +1317,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
         cv, *item_desc, mem_pool_.get(), runtime_state_, array_len);
     Tuple* array_data;
     int num_rows;
-    builder.GetFreeMemory(&array_data, &num_rows);
+    ASSERT_OK(builder.GetFreeMemory(&array_data, &num_rows));
     expected_row_size += item_desc->byte_size() * array_len;
 
     // Fill the array with pointers to our constant strings.
@@ -1423,7 +1423,7 @@ TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
       cv, *item_desc, mem_pool_.get(), runtime_state_, array_len);
   Tuple* array_data;
   int num_rows;
-  builder.GetFreeMemory(&array_data, &num_rows);
+  ASSERT_OK(builder.GetFreeMemory(&array_data, &num_rows));
   expected_row_size += item_desc->byte_size() * array_len;
 
   // Fill the array with pointers to our constant strings.
@@ -1457,6 +1457,6 @@ int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
   impala::InitFeSupport();
-  impala::LlvmCodeGen::InitializeLlvm();
+  ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm());
   return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 14446e7..2eff955 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -89,7 +89,7 @@ class BufferPoolTest : public ::testing::Test {
     for (string created_tmp_dir : created_tmp_dirs_) {
       chmod((created_tmp_dir + SCRATCH_SUFFIX).c_str(), S_IRWXU);
     }
-    FileSystemUtil::RemovePaths(created_tmp_dirs_);
+    ASSERT_OK(FileSystemUtil::RemovePaths(created_tmp_dirs_));
     created_tmp_dirs_.clear();
     CpuTestUtil::ResetAffinity(); // Some tests modify affinity.
   }
@@ -806,14 +806,15 @@ TEST_F(BufferPoolTest, PinWithoutReservation) {
       TEST_BUFFER_LEN, NewProfile(), &client));
 
   BufferPool::PageHandle handle;
-  IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle), "");
+  IMPALA_ASSERT_DEBUG_DEATH(
+      discard_result(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle)), "");
 
   // Should succeed after increasing reservation.
   ASSERT_TRUE(client.IncreaseReservationToFit(TEST_BUFFER_LEN));
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
 
   // But we can't pin again.
-  IMPALA_ASSERT_DEBUG_DEATH(pool.Pin(&client, &handle), "");
+  IMPALA_ASSERT_DEBUG_DEATH(discard_result(pool.Pin(&client, &handle)), "");
 
   pool.DestroyPage(&client, &handle);
   pool.DeregisterClient(&client);
@@ -866,7 +867,8 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
   // Test that ExtractBuffer() DCHECKs for unpinned pages.
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
   pool.Unpin(&client, &page);
-  IMPALA_ASSERT_DEBUG_DEATH((void)pool.ExtractBuffer(&client, &page, &buffer), "");
+  IMPALA_ASSERT_DEBUG_DEATH(
+      discard_result(pool.ExtractBuffer(&client, &page, &buffer)), "");
   pool.DestroyPage(&client, &page);
 
   pool.DeregisterClient(&client);
@@ -955,7 +957,8 @@ TEST_F(BufferPoolTest, EvictPageSameClient) {
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
 
   // Do not have enough reservations because we pinned the page.
-  IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2), "");
+  IMPALA_ASSERT_DEBUG_DEATH(
+      discard_result(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2)), "");
 
   // We should be able to create a new page after unpinned and evicting the first one.
   pool.Unpin(&client, &handle1);
@@ -1319,7 +1322,7 @@ void BufferPoolTest::TestQueryTeardown(bool write_error) {
     string tmp_file_path = TmpFilePath(pages.data());
     FreeBuffers(pool, &client, &tmp_buffers);
 
-    PinAll(pool, &client, &pages);
+    ASSERT_OK(PinAll(pool, &client, &pages));
     // Remove temporary file to force future writes to that file to fail.
     DisableBackingFile(tmp_file_path);
   }
@@ -1367,7 +1370,7 @@ void BufferPoolTest::TestWriteError(int write_delay_ms) {
   UnpinAll(&pool, &client, &pages);
   WaitForAllWrites(&client);
   // Repin the pages
-  PinAll(&pool, &client, &pages);
+  ASSERT_OK(PinAll(&pool, &client, &pages));
   // Remove permissions to the backing storage so that future writes will fail
   ASSERT_GT(RemoveScratchPerms(), 0);
   // Give the first write a chance to fail before the second write starts.
@@ -1480,7 +1483,9 @@ TEST_F(BufferPoolTest, WriteErrorBlacklist) {
   PageHandle* error_page = FindPageInDir(pages[ERROR_QUERY], error_dir);
   ASSERT_TRUE(error_page != NULL) << "Expected a tmp file in dir " << error_dir;
   const string& error_file_path = TmpFilePath(error_page);
-  for (int i = 0; i < INITIAL_QUERIES; ++i) PinAll(&pool, &clients[i], &pages[i]);
+  for (int i = 0; i < INITIAL_QUERIES; ++i) {
+    ASSERT_OK(PinAll(&pool, &clients[i], &pages[i]));
+  }
   DisableBackingFile(error_file_path);
   for (int i = 0; i < INITIAL_QUERIES; ++i) UnpinAll(&pool, &clients[i], &pages[i]);
 
@@ -1489,7 +1494,7 @@ TEST_F(BufferPoolTest, WriteErrorBlacklist) {
 
   // Both clients should still be usable - test the API.
   for (int i = 0; i < INITIAL_QUERIES; ++i) {
-    PinAll(&pool, &clients[i], &pages[i]);
+    ASSERT_OK(PinAll(&pool, &clients[i], &pages[i]));
     VerifyData(pages[i], 0);
     UnpinAll(&pool, &clients[i], &pages[i]);
     ASSERT_OK(AllocateAndFree(&pool, &clients[i], TEST_BUFFER_LEN));
@@ -1521,7 +1526,7 @@ TEST_F(BufferPoolTest, WriteErrorBlacklist) {
   }
   DestroyAll(&pool, &clients[ERROR_QUERY], &error_new_pages);
 
-  PinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY]);
+  ASSERT_OK(PinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY]));
   UnpinAll(&pool, &clients[NO_ERROR_QUERY], &pages[NO_ERROR_QUERY]);
   WaitForAllWrites(&clients[NO_ERROR_QUERY]);
   EXPECT_TRUE(FindPageInDir(pages[NO_ERROR_QUERY], good_dir) != NULL);
@@ -1929,7 +1934,7 @@ int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
   impala::InitFeSupport();
-  impala::LlvmCodeGen::InitializeLlvm();
+  ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm());
   int result = 0;
   for (bool encryption : {false, true}) {
     for (bool numa : {false, true}) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/client-cache.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache.cc b/be/src/runtime/client-cache.cc
index 4f403ad..8c0b6aa 100644
--- a/be/src/runtime/client-cache.cc
+++ b/be/src/runtime/client-cache.cc
@@ -111,9 +111,9 @@ Status ClientCacheHelper::CreateClient(const TNetworkAddress& address,
   shared_ptr<ThriftClientImpl> client_impl(factory_method(address, client_key));
   VLOG(2) << "CreateClient(): creating new client for " << client_impl->address();
 
-  if (!client_impl->socket_create_status().ok()) {
+  if (!client_impl->init_status().ok()) {
     *client_key = nullptr;
-    return client_impl->socket_create_status();
+    return client_impl->init_status();
   }
 
   // Set the TSocket's send and receive timeouts.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/client-cache.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/client-cache.h b/be/src/runtime/client-cache.h
index 53bbabe..d0fd30e 100644
--- a/be/src/runtime/client-cache.h
+++ b/be/src/runtime/client-cache.h
@@ -80,14 +80,15 @@ class ClientCacheHelper {
   //
   /// If there is an error creating the new client, *client_key will be NULL.
   Status GetClient(const TNetworkAddress& address, ClientFactory factory_method,
-      ClientKey* client_key);
+      ClientKey* client_key) WARN_UNUSED_RESULT;
 
   /// Returns a newly-opened client in client_key. May reopen the existing client, or may
   /// replace it with a new one (created using 'factory_method').
   //
   /// Returns an error status and sets 'client_key' to NULL if a new client cannot
   /// created.
-  Status ReopenClient(ClientFactory factory_method, ClientKey* client_key);
+  Status ReopenClient(
+      ClientFactory factory_method, ClientKey* client_key) WARN_UNUSED_RESULT;
 
   /// Returns a client to the cache. Upon return, *client_key will be NULL, and the
   /// associated client will be available in the per-host cache.
@@ -190,7 +191,7 @@ class ClientCacheHelper {
 
   /// Create a new client for specific address in 'client' and put it in client_map_
   Status CreateClient(const TNetworkAddress& address, ClientFactory factory_method,
-      ClientKey* client_key);
+      ClientKey* client_key) WARN_UNUSED_RESULT;
 };
 
 /// A scoped client connection to help manage clients from a client cache. Clients of this
@@ -216,9 +217,7 @@ class ClientConnection {
     }
   }
 
-  Status Reopen() {
-    return client_cache_->ReopenClient(&client_);
-  }
+  Status Reopen() WARN_UNUSED_RESULT { return client_cache_->ReopenClient(&client_); }
 
   T* operator->() const { return client_; }
 
@@ -393,18 +392,18 @@ class ClientCache {
   /// Obtains a pointer to a Thrift interface object (of type T),
   /// backed by a live transport which is already open. Returns
   /// Status::OK unless there was an error opening the transport.
-  Status GetClient(const TNetworkAddress& address, T** iface) {
-    return client_cache_helper_.GetClient(address, client_factory_,
-        reinterpret_cast<ClientKey*>(iface));
+  Status GetClient(const TNetworkAddress& address, T** iface) WARN_UNUSED_RESULT {
+    return client_cache_helper_.GetClient(
+        address, client_factory_, reinterpret_cast<ClientKey*>(iface));
   }
 
   /// Close and delete the underlying transport. Return a new client connecting to the
   /// same host/port.
   /// Returns an error status if a new connection cannot be established and *client will
   /// be unaffected in that case.
-  Status ReopenClient(T** client) {
-    return client_cache_helper_.ReopenClient(client_factory_,
-        reinterpret_cast<ClientKey*>(client));
+  Status ReopenClient(T** client) WARN_UNUSED_RESULT {
+    return client_cache_helper_.ReopenClient(
+        client_factory_, reinterpret_cast<ClientKey*>(client));
   }
 
   /// Return the client to the cache and set *client to NULL.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/collection-value-builder.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/collection-value-builder.h b/be/src/runtime/collection-value-builder.h
index 1811a9a..1d20ef4 100644
--- a/be/src/runtime/collection-value-builder.h
+++ b/be/src/runtime/collection-value-builder.h
@@ -49,7 +49,7 @@ class CollectionValueBuilder {
   /// of tuples that may be written before calling CommitTuples() in 'num_tuples'. After
   /// calling CommitTuples(), GetMemory() can be called again. Allocates if there is no
   /// free tuple memory left. Returns error status if memory limit is exceeded.
-  Status GetFreeMemory(Tuple** tuple_mem, int* num_tuples) {
+  Status GetFreeMemory(Tuple** tuple_mem, int* num_tuples) WARN_UNUSED_RESULT {
     if (tuple_desc_.byte_size() == 0) {
       // No tuple memory necessary, so caller can write as many tuples as 'num_tuples'
       // field can count.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index 195880f..515b0b3 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -388,7 +388,10 @@ void Coordinator::BackendState::PublishFilter(
   TPublishFilterParams local_params(*rpc_params);
   local_params.__set_bloom_filter(rpc_params->bloom_filter);
   TPublishFilterResult res;
-  backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, local_params, &res);
+  status = backend_client.DoRpc(&ImpalaBackendClient::PublishFilter, local_params, &res);
+  if (!status.ok()) {
+    LOG(WARNING) << "Error publishing filter, continuing..." << status.GetDetail();
+  }
   // TODO: switch back to the following once we fix the lifecycle
   // problems of Coordinator
   //std::cref(fragment_inst->impalad_address()),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index 0c6d98e..35076f5 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -344,8 +344,10 @@ void DataStreamRecvr::CancelStream() {
 
 void DataStreamRecvr::Close() {
   // Remove this receiver from the DataStreamMgr that created it.
-  // TODO: log error msg
-  mgr_->DeregisterRecvr(fragment_instance_id(), dest_node_id());
+  const Status status = mgr_->DeregisterRecvr(fragment_instance_id(), dest_node_id());
+  if (!status.ok()) {
+    LOG(WARNING) << "Error deregistering receiver: " << status.GetDetail();
+  }
   mgr_ = NULL;
   for (int i = 0; i < sender_queues_.size(); ++i) {
     sender_queues_[i]->Close();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 93862fd..5ea6756 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -113,7 +113,7 @@ class DataStreamTest : public testing::Test {
  protected:
   DataStreamTest() : next_val_(0) {
     // Initialize MemTrackers and RuntimeState for use by the data stream receiver.
-    exec_env_.InitForFeTests();
+    ABORT_IF_ERROR(exec_env_.InitForFeTests());
     runtime_state_.reset(new RuntimeState(TQueryCtx(), &exec_env_));
     mem_pool_.reset(new MemPool(&tracker_));
 
@@ -307,7 +307,7 @@ class DataStreamTest : public testing::Test {
     ordering_exprs_.push_back(lhs_slot);
     less_than_ = obj_pool_.Add(new TupleRowComparator(ordering_exprs_,
         is_asc_, nulls_first_));
-    less_than_->Open(&obj_pool_, runtime_state_.get(), mem_pool_.get());
+    ASSERT_OK(less_than_->Open(&obj_pool_, runtime_state_.get(), mem_pool_.get()));
   }
 
   // Create batch_, but don't fill it with data yet. Assumes we created row_desc_.
@@ -459,8 +459,9 @@ class DataStreamTest : public testing::Test {
     boost::shared_ptr<ImpalaTestBackend> handler(
         new ImpalaTestBackend(dynamic_cast<DataStreamMgr*>(stream_mgr_)));
     boost::shared_ptr<TProcessor> processor(new ImpalaInternalServiceProcessor(handler));
-    ThriftServerBuilder("DataStreamTest backend", processor, FLAGS_port).Build(&server_);
-    server_->Start();
+    ThriftServerBuilder builder("DataStreamTest backend", processor, FLAGS_port);
+    ASSERT_OK(builder.Build(&server_));
+    ASSERT_OK(server_->Start());
   }
 
   void StopBackend() {
@@ -514,7 +515,7 @@ class DataStreamTest : public testing::Test {
       if (!info.status.ok()) break;
     }
     VLOG_QUERY << "closing sender" << sender_num;
-    sender.FlushFinal(&state);
+    info.status.MergeStatus(sender.FlushFinal(&state));
     sender.Close(&state);
     info.num_bytes_sent = sender.GetNumDataBytesSent();
 
@@ -656,6 +657,6 @@ int main(int argc, char **argv) {
   ::testing::InitGoogleTest(&argc, argv);
   InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST);
   InitFeSupport();
-  impala::LlvmCodeGen::InitializeLlvm();
+  ABORT_IF_ERROR(impala::LlvmCodeGen::InitializeLlvm());
   return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 21edbb2..0fdfb77 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -391,7 +391,7 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
     for (int j = 0; j < num_threads_per_disk; ++j) {
       stringstream ss;
       ss << "work-loop(Disk: " << i << ", Thread: " << j << ")";
-      disk_thread_group_.AddThread(new Thread("disk-io-mgr", ss.str(),
+      disk_thread_group_.AddThread(make_unique<Thread>("disk-io-mgr", ss.str(),
           &DiskIoMgr::WorkLoop, this, disk_queues_[i]));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 4551232..7f62c96 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -265,7 +265,7 @@ Status ExecEnv::StartServices() {
   buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, FLAGS_min_buffer_size);
   InitBufferPool(FLAGS_min_buffer_size, buffer_pool_limit);
 
-  metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr);
+  RETURN_IF_ERROR(metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr));
   impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
   catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
   RETURN_IF_ERROR(RegisterMemoryMetrics(
@@ -326,7 +326,7 @@ Status ExecEnv::StartServices() {
   TGetHadoopConfigRequest config_request;
   config_request.__set_name(DEFAULT_FS);
   TGetHadoopConfigResponse config_response;
-  frontend_->GetHadoopConfig(config_request, &config_response);
+  RETURN_IF_ERROR(frontend_->GetHadoopConfig(config_request, &config_response));
   if (config_response.__isset.value) {
     default_fs_ = config_response.value;
   } else {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 6293988..5d7a3d0 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -81,7 +81,7 @@ class ExecEnv {
   ~ExecEnv();
 
   /// Starts any dependent services in their correct order
-  Status StartServices();
+  Status StartServices() WARN_UNUSED_RESULT;
 
   /// TODO: Should ExecEnv own the ImpalaServer as well?
   void SetImpalaServer(ImpalaServer* server) { impala_server_ = server; }
@@ -127,7 +127,7 @@ class ExecEnv {
   const TNetworkAddress& backend_address() const { return backend_address_; }
 
   /// Initializes the exec env for running FE tests.
-  Status InitForFeTests();
+  Status InitForFeTests() WARN_UNUSED_RESULT;
 
   /// Returns true if this environment was created from the FE tests. This makes the
   /// environment special since the JVM is started first and libraries are loaded
@@ -135,14 +135,14 @@ class ExecEnv {
   bool is_fe_tests() { return is_fe_tests_; }
 
   /// Returns the configured defaultFs set in core-site.xml
-  string default_fs() { return default_fs_; }
+  const string& default_fs() { return default_fs_; }
 
   /// Gets a KuduClient for this list of master addresses. It will look up and share
   /// an existing KuduClient if possible. Otherwise, it will create a new KuduClient
   /// internally and return a pointer to it. All KuduClients accessed through this
   /// interface are owned by the ExecEnv. Thread safe.
-  Status GetKuduClient(
-      const std::vector<std::string>& master_addrs, kudu::client::KuduClient** client);
+  Status GetKuduClient(const std::vector<std::string>& master_addrs,
+      kudu::client::KuduClient** client) WARN_UNUSED_RESULT;
 
  private:
   boost::scoped_ptr<ObjectPool> obj_pool_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 99de62d..3f5a72f 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -97,7 +97,9 @@ done:
 }
 
 void FragmentInstanceState::Cancel() {
-  WaitForPrepare();  // make sure Prepare() finished
+  // Make sure Prepare() finished. We don't care about the status since the query is
+  // being cancelled.
+  discard_result(WaitForPrepare());
 
   // Ensure that the sink is closed from both sides. Although in ordinary executions we
   // rely on the consumer to do this, in error cases the consumer may not be able to send

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/hbase-table-factory.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/hbase-table-factory.cc b/be/src/runtime/hbase-table-factory.cc
index d0c97d8..7340473 100644
--- a/be/src/runtime/hbase-table-factory.cc
+++ b/be/src/runtime/hbase-table-factory.cc
@@ -92,7 +92,7 @@ HBaseTableFactory::~HBaseTableFactory() {
   lock_guard<mutex> lock(connection_lock_);
   if (connection_ != NULL) {
     env->CallObjectMethod(connection_, connection_close_id_);
-    JniUtil::FreeGlobalRef(env, connection_);
+    env->DeleteGlobalRef(connection_);
     connection_ = NULL;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/hbase-table.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/hbase-table.cc b/be/src/runtime/hbase-table.cc
index 14f402f..84d03eb 100644
--- a/be/src/runtime/hbase-table.cc
+++ b/be/src/runtime/hbase-table.cc
@@ -59,8 +59,7 @@ void HBaseTable::Close(RuntimeState* state) {
     env->CallObjectMethod(table_, table_close_id_);
     Status s = JniUtil::GetJniExceptionMsg(env, true, "HBaseTable::Close(): ");
     if (!s.ok()) state->LogError(s.msg());
-    s = JniUtil::FreeGlobalRef(env, table_);
-    if (!s.ok()) state->LogError(s.msg());
+    env->DeleteGlobalRef(table_);
   }
 
   table_ = NULL;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/parallel-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/parallel-executor.cc b/be/src/runtime/parallel-executor.cc
index 5b5d0e0..b7b3cc2 100644
--- a/be/src/runtime/parallel-executor.cc
+++ b/be/src/runtime/parallel-executor.cc
@@ -35,7 +35,7 @@ Status ParallelExecutor::Exec(Function function, void** args, int num_args,
   for (int i = 0; i < num_args; ++i) {
     stringstream ss;
     ss << "worker-thread(" << i << ")";
-    worker_threads.AddThread(new Thread("parallel-executor", ss.str(),
+    worker_threads.AddThread(make_unique<Thread>("parallel-executor", ss.str(),
         &ParallelExecutor::Worker, function, args[i], &lock, &status, latencies));
   }
   worker_threads.JoinAll();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 343ec93..c610529 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -17,6 +17,7 @@
 
 #include <cstdio>
 #include <cstdlib>
+#include <numeric>
 
 #include <boost/filesystem.hpp>
 #include <boost/scoped_ptr.hpp>
@@ -239,7 +240,7 @@ TEST_F(TmpFileMgrTest, TestOneDirPerDevice) {
   TmpFileMgr::File* file = files[0];
   // Check the prefix is the expected temporary directory.
   EXPECT_EQ(0, file->path().find(tmp_dirs[0]));
-  FileSystemUtil::RemovePaths(tmp_dirs);
+  ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs));
   file_group.Close();
   CheckMetrics(&tmp_file_mgr);
 }
@@ -266,7 +267,7 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
     // Check the prefix is the expected temporary directory.
     EXPECT_EQ(0, files[i]->path().find(tmp_dirs[i]));
   }
-  FileSystemUtil::RemovePaths(tmp_dirs);
+  ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs));
   file_group.Close();
   CheckMetrics(&tmp_file_mgr);
 }
@@ -312,7 +313,7 @@ TEST_F(TmpFileMgrTest, TestReportError) {
   // Attempts to allocate new files on bad device should succeed.
   unique_ptr<TmpFileMgr::File> bad_file2;
   ASSERT_OK(NewFile(&tmp_file_mgr, &file_group, bad_device, &bad_file2));
-  FileSystemUtil::RemovePaths(tmp_dirs);
+  ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs));
   file_group.Close();
   CheckMetrics(&tmp_file_mgr);
 }
@@ -343,7 +344,7 @@ TEST_F(TmpFileMgrTest, TestAllocateNonWritable) {
   ASSERT_OK(FileAllocateSpace(allocated_files[1], 1, &offset));
 
   chmod(scratch_subdirs[0].c_str(), S_IRWXU);
-  FileSystemUtil::RemovePaths(tmp_dirs);
+  ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs));
   file_group.Close();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index c99077f..1cf86b4 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -212,8 +212,7 @@ void TmpFileMgr::File::Blacklist(const ErrorMsg& msg) {
 
 Status TmpFileMgr::File::Remove() {
   // Remove the file if present (it may not be present if no writes completed).
-  FileSystemUtil::RemovePaths({path_});
-  return Status::OK();
+  return FileSystemUtil::RemovePaths({path_});
 }
 
 string TmpFileMgr::File::DebugString() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 1f233d2..b9b2c6e 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -509,7 +509,8 @@ void SchedulerWrapper::InitializeScheduler() {
 
   scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id, scheduler_backend_address,
       &metrics_, nullptr, nullptr));
-  scheduler_->Init();
+  const Status status = scheduler_->Init();
+  DCHECK(status.ok()) << "Scheduler init failed in test";
   // Initialize the scheduler backend maps.
   SendFullMembershipMap();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/scheduling/scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc
index c7b284b..c70e54f 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -43,7 +43,7 @@ TEST_F(SchedulerTest, SingleHostSingleFile) {
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
 
   EXPECT_EQ(1, result.NumTotalAssignments());
   EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
@@ -67,7 +67,7 @@ TEST_F(SchedulerTest, SingleCoordinatorNoExecutor) {
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
 
   EXPECT_EQ(2, result.NumDistinctBackends());
   EXPECT_EQ(0, result.NumDiskAssignments(0));
@@ -87,7 +87,7 @@ TEST_F(SchedulerTest, ExecAtCoord) {
   Result result(plan);
   SchedulerWrapper scheduler(plan);
   bool exec_at_coord = true;
-  scheduler.Compute(exec_at_coord, &result);
+  ASSERT_OK(scheduler.Compute(exec_at_coord, &result));
 
   EXPECT_EQ(3 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
@@ -108,7 +108,7 @@ TEST_F(SchedulerTest, ScanTableTwice) {
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
 
   EXPECT_EQ(4 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes());
   EXPECT_EQ(4 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes());
@@ -130,7 +130,7 @@ TEST_F(SchedulerTest, RandomReads) {
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
-  for (int i = 0; i < 100; ++i) scheduler.Compute(&result);
+  for (int i = 0; i < 100; ++i) ASSERT_OK(scheduler.Compute(&result));
 
   ASSERT_EQ(100, result.NumAssignments());
   EXPECT_EQ(100, result.NumTotalAssignments());
@@ -154,7 +154,7 @@ TEST_F(SchedulerTest, LocalReadsPickFirstReplica) {
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
-  for (int i = 0; i < 3; ++i) scheduler.Compute(&result);
+  for (int i = 0; i < 3; ++i) ASSERT_OK(scheduler.Compute(&result));
 
   EXPECT_EQ(3, result.NumTotalAssignments());
   EXPECT_EQ(3, result.NumDiskAssignments(0));
@@ -179,7 +179,7 @@ TEST_F(SchedulerTest, TestMediumSizedCluster) {
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
 
   EXPECT_EQ(16, result.NumTotalAssignments());
   EXPECT_EQ(16, result.NumDiskAssignments());
@@ -198,7 +198,7 @@ TEST_F(SchedulerTest, RemoteOnlyPlacement) {
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
 
   EXPECT_EQ(10, result.NumTotalAssignments());
   EXPECT_EQ(10, result.NumRemoteAssignments());
@@ -219,7 +219,7 @@ TEST_F(SchedulerTest, ManyScanRanges) {
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
 
   EXPECT_EQ(1000, result.NumTotalAssignments());
   EXPECT_EQ(1000, result.NumDiskAssignments());
@@ -245,7 +245,7 @@ TEST_F(SchedulerTest, DisjointClusterWithRemoteReads) {
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
 
   EXPECT_EQ(10, result.NumTotalAssignments());
   EXPECT_EQ(10, result.NumRemoteAssignments());
@@ -267,14 +267,14 @@ TEST_F(SchedulerTest, TestCachedReadPreferred) {
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
   EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes());
   EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes(1));
   EXPECT_EQ(0, result.NumDiskAssignedBytes());
   EXPECT_EQ(0, result.NumRemoteAssignedBytes());
 
   // Compute additional assignments.
-  for (int i = 0; i < 8; ++i) scheduler.Compute(&result);
+  for (int i = 0; i < 8; ++i) ASSERT_OK(scheduler.Compute(&result));
   EXPECT_EQ(9 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes());
   EXPECT_EQ(9 * Block::DEFAULT_BLOCK_SIZE, result.NumCachedAssignedBytes(1));
   EXPECT_EQ(0, result.NumDiskAssignedBytes());
@@ -296,13 +296,13 @@ TEST_F(SchedulerTest, TestDisableCachedReads) {
 
   Result result(plan);
   SchedulerWrapper scheduler(plan);
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
   EXPECT_EQ(0, result.NumCachedAssignedBytes());
   EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes());
   EXPECT_EQ(0, result.NumRemoteAssignedBytes());
 
   // Compute additional assignments.
-  for (int i = 0; i < 8; ++i) scheduler.Compute(&result);
+  for (int i = 0; i < 8; ++i) ASSERT_OK(scheduler.Compute(&result));
   EXPECT_EQ(0, result.NumCachedAssignedBytes());
   EXPECT_EQ(9 * Block::DEFAULT_BLOCK_SIZE, result.NumDiskAssignedBytes());
   EXPECT_EQ(0, result.NumRemoteAssignedBytes());
@@ -326,7 +326,7 @@ TEST_F(SchedulerTest, EmptyStatestoreMessage) {
   Result result(plan);
   SchedulerWrapper scheduler(plan);
 
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
   EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
@@ -335,7 +335,7 @@ TEST_F(SchedulerTest, EmptyStatestoreMessage) {
   result.Reset();
 
   scheduler.SendEmptyUpdate();
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
   EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(2));
@@ -359,7 +359,7 @@ TEST_F(SchedulerTest, TestSendUpdates) {
   Result result(plan);
   SchedulerWrapper scheduler(plan);
 
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
   // Two backends are registered, so the scheduler will pick a random one.
   EXPECT_EQ(0, result.NumTotalAssignedBytes(0));
   EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(1));
@@ -368,7 +368,7 @@ TEST_F(SchedulerTest, TestSendUpdates) {
   scheduler.RemoveBackend(cluster.hosts()[1]);
   result.Reset();
 
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
   EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(1));
 
@@ -376,7 +376,7 @@ TEST_F(SchedulerTest, TestSendUpdates) {
   scheduler.AddBackend(cluster.hosts()[1]);
   result.Reset();
 
-  scheduler.Compute(&result);
+  ASSERT_OK(scheduler.Compute(&result));
   // Two backends are registered, so the scheduler will pick a random one.
   EXPECT_EQ(1 * Block::DEFAULT_BLOCK_SIZE, result.NumTotalAssignedBytes(0));
   EXPECT_EQ(0, result.NumTotalAssignedBytes(1));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index d75e639..013e552 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -627,7 +627,7 @@ void ClientRequestState::Wait() {
     } else {
       query_events()->MarkEvent("Request finished");
     }
-    (void) UpdateQueryStatus(status);
+    discard_result(UpdateQueryStatus(status));
   }
   if (status.ok()) {
     UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
@@ -681,7 +681,7 @@ Status ClientRequestState::FetchRows(const int32_t max_rows,
   MarkActive();
 
   // ImpalaServer::FetchInternal has already taken our lock_
-  (void) UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows));
+  discard_result(UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows)));
 
   MarkInactive();
   return query_status_;
@@ -737,7 +737,7 @@ Status ClientRequestState::FetchRowsInternal(const int32_t max_rows,
     // max_rows <= 0 means no limit
     while ((num_rows < max_rows || max_rows <= 0)
         && num_rows_fetched_ < all_rows.size()) {
-      fetched_rows->AddOneRow(all_rows[num_rows_fetched_]);
+      RETURN_IF_ERROR(fetched_rows->AddOneRow(all_rows[num_rows_fetched_]));
       ++num_rows_fetched_;
       ++num_rows;
     }
@@ -867,7 +867,7 @@ Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) {
     bool already_done = eos_ || query_state_ == beeswax::QueryState::EXCEPTION;
     if (!already_done && cause != NULL) {
       DCHECK(!cause->ok());
-      (void) UpdateQueryStatus(*cause);
+      discard_result(UpdateQueryStatus(*cause));
       query_events_->MarkEvent("Cancelled");
       DCHECK_EQ(query_state_, beeswax::QueryState::EXCEPTION);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 379829d..2c38b30 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -69,9 +69,9 @@ Java_org_apache_impala_service_FeSupport_NativeFeTestInit(
   // Init the JVM to load the classes in JniUtil that are needed for returning
   // exceptions to the FE.
   InitCommonRuntime(1, &name, true, TestInfo::FE_TEST);
-  LlvmCodeGen::InitializeLlvm(true);
+  THROW_IF_ERROR(LlvmCodeGen::InitializeLlvm(true), env, JniUtil::internal_exc_class());
   ExecEnv* exec_env = new ExecEnv(); // This also caches it from the process.
-  exec_env->InitForFeTests();
+  THROW_IF_ERROR(exec_env->InitForFeTests(), env, JniUtil::internal_exc_class());
 }
 
 // Serializes expression value 'value' to thrift structure TColumnValue 'col_val'.
@@ -168,9 +168,12 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
   vector<TColumnValue> results;
   ObjectPool obj_pool;
 
-  DeserializeThriftMsg(env, thrift_expr_batch, &expr_batch);
-  DeserializeThriftMsg(env, thrift_query_ctx_bytes, &query_ctx);
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_expr_batch, &expr_batch), env,
+     JniUtil::internal_exc_class(), nullptr);
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_query_ctx_bytes, &query_ctx), env,
+     JniUtil::internal_exc_class(), nullptr);
   vector<TExpr>& texprs = expr_batch.exprs;
+
   // Disable codegen advisorily to avoid unnecessary latency. For testing purposes
   // (expr-test.cc), fe_support_disable_codegen may be set to false.
   query_ctx.disable_codegen_hint = fe_support_disable_codegen;
@@ -377,7 +380,8 @@ JNIEXPORT jbyteArray JNICALL
 Java_org_apache_impala_service_FeSupport_NativeCacheJar(
     JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) {
   TCacheJarParams params;
-  DeserializeThriftMsg(env, thrift_struct, &params);
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &params), env,
+      JniUtil::internal_exc_class(), nullptr);
 
   TCacheJarResult result;
   string local_path;
@@ -397,7 +401,8 @@ JNIEXPORT jbyteArray JNICALL
 Java_org_apache_impala_service_FeSupport_NativeLookupSymbol(
     JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) {
   TSymbolLookupParams lookup;
-  DeserializeThriftMsg(env, thrift_struct, &lookup);
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &lookup), env,
+      JniUtil::internal_exc_class(), nullptr);
 
   vector<ColumnType> arg_types;
   for (int i = 0; i < lookup.arg_types.size(); ++i) {
@@ -420,7 +425,8 @@ JNIEXPORT jbyteArray JNICALL
 Java_org_apache_impala_service_FeSupport_NativePrioritizeLoad(
     JNIEnv* env, jclass caller_class, jbyteArray thrift_struct) {
   TPrioritizeLoadRequest request;
-  DeserializeThriftMsg(env, thrift_struct, &request);
+  THROW_IF_ERROR_RET(DeserializeThriftMsg(env, thrift_struct, &request), env,
+      JniUtil::internal_exc_class(), nullptr);
 
   CatalogOpExecutor catalog_op_executor(ExecEnv::GetInstance(), NULL, NULL);
   TPrioritizeLoadResponse result;



[2/9] incubator-impala git commit: IMPALA-2615: support [[nodiscard]] on Status

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 026a06e..a170ea1 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -73,7 +73,7 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
   // set of in-flight queries.
   Status status = SetQueryInflight(session, request_state);
   if (!status.ok()) {
-    (void) UnregisterQuery(request_state->query_id(), false, &status);
+    discard_result(UnregisterQuery(request_state->query_id(), false, &status));
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
   TUniqueIdToQueryHandle(request_state->query_id(), &query_handle);
@@ -111,7 +111,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
   // set of in-flight queries.
   Status status = SetQueryInflight(session, request_state);
   if (!status.ok()) {
-    (void) UnregisterQuery(request_state->query_id(), false, &status);
+    discard_result(UnregisterQuery(request_state->query_id(), false, &status));
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
   // block until results are ready
@@ -121,7 +121,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
     status = request_state->query_status();
   }
   if (!status.ok()) {
-    (void) UnregisterQuery(request_state->query_id(), false, &status);
+    discard_result(UnregisterQuery(request_state->query_id(), false, &status));
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
 
@@ -171,7 +171,7 @@ void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle
   VLOG_ROW << "fetch result: #results=" << query_results.data.size()
            << " has_more=" << (query_results.has_more ? "true" : "false");
   if (!status.ok()) {
-    (void) UnregisterQuery(query_id, false, &status);
+    discard_result(UnregisterQuery(query_id, false, &status));
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 2528349..6a1b5f4 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -154,7 +154,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
 
   Status exec_status = request_state->Exec(*request);
   if (!exec_status.ok()) {
-    (void) UnregisterQuery(request_state->query_id(), false, &exec_status);
+    discard_result(UnregisterQuery(request_state->query_id(), false, &exec_status));
     status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
     status->__set_errorMessage(exec_status.GetDetail());
     status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
@@ -165,7 +165,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
 
   Status inflight_status = SetQueryInflight(session, request_state);
   if (!inflight_status.ok()) {
-    (void) UnregisterQuery(request_state->query_id(), false, &inflight_status);
+    discard_result(UnregisterQuery(request_state->query_id(), false, &inflight_status));
     status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
     status->__set_errorMessage(inflight_status.GetDetail());
     status->__set_sqlState(SQLSTATE_GENERAL_ERROR);
@@ -340,8 +340,8 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
       } else {
         // Normal configuration key. Use it to set session default query options.
         // Ignore failure (failures will be logged in SetQueryOption()).
-        SetQueryOption(v.first, v.second, &state->default_query_options,
-            &state->set_query_options_mask);
+        discard_result(SetQueryOption(v.first, v.second, &state->default_query_options,
+            &state->set_query_options_mask));
       }
     }
   }
@@ -465,7 +465,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
             session->hs2_version, *request_state->result_metadata(), nullptr),
         cache_num_rows);
     if (!status.ok()) {
-      (void) UnregisterQuery(request_state->query_id(), false, &status);
+      discard_result(UnregisterQuery(request_state->query_id(), false, &status));
       HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
     }
   }
@@ -476,7 +476,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
   // set of in-flight queries.
   status = SetQueryInflight(session, request_state);
   if (!status.ok()) {
-    (void) UnregisterQuery(request_state->query_id(), false, &status);
+    discard_result(UnregisterQuery(request_state->query_id(), false, &status));
     HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
   return_val.__isset.operationHandle = true;
@@ -795,7 +795,7 @@ void ImpalaServer::FetchResults(TFetchResultsResp& return_val,
     if (status.IsRecoverableError()) {
       DCHECK(fetch_first);
     } else {
-      (void) UnregisterQuery(query_id, false, &status);
+      discard_result(UnregisterQuery(query_id, false, &status));
     }
     HS2_RETURN_ERROR(return_val, status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 5a79d9d..79903b4 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -547,11 +547,12 @@ void ImpalaHttpHandler::CatalogObjectsHandler(const Webserver::ArgumentMap& args
 
     // Get the object type and name from the topic entry key
     TCatalogObject request;
-    TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request);
-
-    // Get the object and dump its contents.
     TCatalogObject result;
-    Status status = server_->exec_env_->frontend()->GetCatalogObject(request, &result);
+    Status status = TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request);
+    if (status.ok()) {
+      // Get the object and dump its contents.
+      status = server_->exec_env_->frontend()->GetCatalogObject(request, &result);
+    }
     if (status.ok()) {
       Value debug_string(ThriftDebugString(result).c_str(), document->GetAllocator());
       document->AddMember("thrift_string", debug_string, document->GetAllocator());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 00a9d9a..e173c84 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -73,6 +73,7 @@
 #include "util/runtime-profile.h"
 #include "util/string-parser.h"
 #include "util/summary-util.h"
+#include "util/test-info.h"
 #include "util/uid-util.h"
 
 #include "gen-cpp/Types_types.h"
@@ -357,21 +358,22 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
 
   ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env->metrics()));
 
-  // Register the membership callback if required
-  if (exec_env->subscriber() != nullptr) {
+  // Register the membership callback if running in a real cluster.
+  if (!TestInfo::is_test()) {
     auto cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
          vector<TTopicDelta>* topic_updates) {
       this->MembershipCallback(state, topic_updates);
     };
-    exec_env->subscriber()->AddTopic(Scheduler::IMPALA_MEMBERSHIP_TOPIC, true, cb);
+    ABORT_IF_ERROR(
+        exec_env->subscriber()->AddTopic(Scheduler::IMPALA_MEMBERSHIP_TOPIC, true, cb));
 
     if (FLAGS_is_coordinator) {
       auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
           vector<TTopicDelta>* topic_updates) {
         this->CatalogUpdateCallback(state, topic_updates);
       };
-      exec_env->subscriber()->AddTopic(CatalogServer::IMPALA_CATALOG_TOPIC, true,
-          catalog_cb);
+      ABORT_IF_ERROR(exec_env->subscriber()->AddTopic(
+            CatalogServer::IMPALA_CATALOG_TOPIC, true, catalog_cb));
     }
   }
 
@@ -581,11 +583,11 @@ void ImpalaServer::LogQueryEvents(const ClientRequestState& request_state) {
   if (IsAuditEventLoggingEnabled() &&
       (Frontend::IsAuthorizationError(request_state.query_status()) || log_events)) {
     // TODO: deal with an error status
-    (void) LogAuditRecord(request_state, request_state.exec_request());
+    discard_result(LogAuditRecord(request_state, request_state.exec_request()));
   }
   if (IsLineageLoggingEnabled() && log_events) {
     // TODO: deal with an error status
-    (void) LogLineageRecord(request_state);
+    discard_result(LogLineageRecord(request_state));
   }
 }
 
@@ -622,7 +624,7 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id,
       RETURN_IF_ERROR(CheckProfileAccess(user, request_state->effective_user(),
           request_state->user_has_profile_access()));
       if (base64_encoded) {
-        request_state->profile().SerializeToArchiveString(output);
+        RETURN_IF_ERROR(request_state->profile().SerializeToArchiveString(output));
       } else {
         request_state->profile().PrettyPrint(output);
       }
@@ -703,7 +705,10 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
 [[noreturn]] void ImpalaServer::LogFileFlushThread() {
   while (true) {
     sleep(5);
-    profile_logger_->Flush();
+    const Status status = profile_logger_->Flush();
+    if (!status.ok()) {
+      LOG(WARNING) << "Error flushing profile log: " << status.GetDetail();
+    }
   }
 }
 
@@ -736,14 +741,21 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
 }
 
 void ImpalaServer::ArchiveQuery(const ClientRequestState& query) {
-  const string& encoded_profile_str = query.profile().SerializeToArchiveString();
+  string encoded_profile_str;
+  Status status = query.profile().SerializeToArchiveString(&encoded_profile_str);
+  if (!status.ok()) {
+    // Didn't serialize the string. Continue with empty string.
+    LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
+                               << status.GetDetail();
+    return;
+  }
 
   // If there was an error initialising archival (e.g. directory is not writeable),
   // FLAGS_log_query_to_file will have been set to false
   if (FLAGS_log_query_to_file) {
     stringstream ss;
     ss << UnixMillis() << " " << query.query_id() << " " << encoded_profile_str;
-    Status status = profile_logger_->AppendEntry(ss.str());
+    status = profile_logger_->AppendEntry(ss.str());
     if (!status.ok()) {
       LOG_EVERY_N(WARNING, 1000) << "Could not write to profile log file file ("
                                  << google::COUNTER << " attempts failed): "
@@ -826,7 +838,7 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx,
   Status status = ExecuteInternal(*query_ctx, session_state, &registered_request_state,
       request_state);
   if (!status.ok() && registered_request_state) {
-    (void) UnregisterQuery((*request_state)->query_id(), false, &status);
+    discard_result(UnregisterQuery((*request_state)->query_id(), false, &status));
   }
   return status;
 }
@@ -1105,7 +1117,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
   Status status("Session closed");
   for (const TUniqueId& query_id: inflight_queries) {
     // TODO: deal with an error status
-    (void) UnregisterQuery(query_id, false, &status);
+    discard_result(UnregisterQuery(query_id, false, &status));
   }
   // Reconfigure the poll period of session_timeout_thread_ if necessary.
   int32_t session_timeout = session_state->session_timeout;
@@ -1427,7 +1439,7 @@ void ImpalaServer::CatalogUpdateCallback(
       }
       ImpaladMetrics::CATALOG_READY->set_value(new_catalog_version > 0);
       // TODO: deal with an error status
-      (void) UpdateCatalogMetrics();
+      discard_result(UpdateCatalogMetrics());
       // Remove all dropped objects from the library cache.
       // TODO: is this expensive? We'd like to process heartbeats promptly.
       for (TCatalogObject& object: dropped_objects) {
@@ -1690,7 +1702,12 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque
     request_state.profile().PrettyPrint(&ss);
     profile_str = ss.str();
     if (encoded_profile.empty()) {
-      encoded_profile_str = request_state.profile().SerializeToArchiveString();
+      Status status =
+          request_state.profile().SerializeToArchiveString(&encoded_profile_str);
+      if (!status.ok()) {
+        LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
+                                   << status.GetDetail();
+      }
     } else {
       encoded_profile_str = encoded_profile;
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 32fbcc8..8a7961c 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -77,7 +77,8 @@ int ImpaladMain(int argc, char** argv) {
 
   // start backend service for the coordinator on be_port
   ExecEnv exec_env;
-  StartThreadInstrumentation(exec_env.metrics(), exec_env.webserver(), true);
+  ABORT_IF_ERROR(
+      StartThreadInstrumentation(exec_env.metrics(), exec_env.webserver(), true));
   InitRpcEventTracing(exec_env.webserver());
 
   CommonMetrics::InitCommonMetrics(exec_env.metrics());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 9006947..397df5a 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -74,7 +74,8 @@ TEST(QueryOptions, SetFilterWait) {
   EXPECT_FALSE(SetQueryOption("RUNTIME_FILTER_WAIT_TIME_MS", "-1", &options, NULL).ok());
 
   EXPECT_FALSE(SetQueryOption("RUNTIME_FILTER_WAIT_TIME_MS",
-      lexical_cast<string>(numeric_limits<int32_t>::max() + 1), &options, NULL).ok());
+      lexical_cast<string>(static_cast<int64_t>(numeric_limits<int32_t>::max()) + 1),
+      &options, NULL).ok());
 
   EXPECT_OK(SetQueryOption("RUNTIME_FILTER_WAIT_TIME_MS", "0", &options, NULL));
   EXPECT_EQ(0, options.runtime_filter_wait_time_ms);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 9f6097c..5d7738c 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -720,8 +720,13 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
       // Schedule the next message.
       VLOG(3) << "Next " << (is_heartbeat ? "heartbeat" : "update") << " deadline for: "
               << subscriber->id() << " is in " << deadline_ms << "ms";
-      OfferUpdate(make_pair(deadline_ms, subscriber->id()), is_heartbeat ?
+      status = OfferUpdate(make_pair(deadline_ms, subscriber->id()), is_heartbeat ?
           &subscriber_heartbeat_threadpool_ : &subscriber_topic_update_threadpool_);
+      if (!status.ok()) {
+        LOG(INFO) << "Unable to send next " << (is_heartbeat ? "heartbeat" : "update")
+                  << " message to subscriber '" << subscriber->id() << "': "
+                  << status.GetDetail();
+      }
     }
   }
 }
@@ -755,7 +760,6 @@ void Statestore::UnregisterSubscriber(Subscriber* subscriber) {
   subscribers_.erase(subscriber->id());
 }
 
-Status Statestore::MainLoop() {
+void Statestore::MainLoop() {
   subscriber_topic_update_threadpool_.Join();
-  return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 44d9792..b3ba315 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -108,14 +108,12 @@ class Statestore : public CacheLineAligned {
   Status RegisterSubscriber(const SubscriberId& subscriber_id,
       const TNetworkAddress& location,
       const std::vector<TTopicRegistration>& topic_registrations,
-      TUniqueId* registration_id);
+      TUniqueId* registration_id) WARN_UNUSED_RESULT;
 
   void RegisterWebpages(Webserver* webserver);
 
   /// The main processing loop. Blocks until the exit flag is set.
-  //
-  /// Returns OK unless there is an unrecoverable error.
-  Status MainLoop();
+  void MainLoop();
 
   /// Returns the Thrift API interface that proxies requests onto the local Statestore.
   const boost::shared_ptr<StatestoreServiceIf>& thrift_iface() const {
@@ -439,10 +437,10 @@ class Statestore : public CacheLineAligned {
   /// Utility method to add an update to the given thread pool, and to fail if the thread
   /// pool is already at capacity.
   Status OfferUpdate(const ScheduledSubscriberUpdate& update,
-      ThreadPool<ScheduledSubscriberUpdate>* thread_pool);
+      ThreadPool<ScheduledSubscriberUpdate>* thread_pool) WARN_UNUSED_RESULT;
 
-  /// Sends either a heartbeat or topic update message to the subscriber in 'update' at the
-  /// closest possible time to the first member of 'update'.  If is_heartbeat is true,
+  /// Sends either a heartbeat or topic update message to the subscriber in 'update' at
+  /// the closest possible time to the first member of 'update'. If is_heartbeat is true,
   /// sends a heartbeat update, otherwise the set of pending topic updates is sent. Once
   /// complete, the next update is scheduled and added to the appropriate queue.
   void DoSubscriberUpdate(bool is_heartbeat, int thread_id,
@@ -458,14 +456,15 @@ class Statestore : public CacheLineAligned {
   /// will return OK (since there was no error) and the output parameter update_skipped is
   /// set to true. Otherwise, any updates returned by the subscriber are applied to their
   /// target topics.
-  Status SendTopicUpdate(Subscriber* subscriber, bool* update_skipped);
+  Status SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) WARN_UNUSED_RESULT;
 
   /// Sends a heartbeat message to subscriber. Returns false if there was some error
   /// performing the RPC.
-  Status SendHeartbeat(Subscriber* subscriber);
+  Status SendHeartbeat(Subscriber* subscriber) WARN_UNUSED_RESULT;
 
   /// Unregister a subscriber, removing all of its transient entries and evicting it from
-  /// the subscriber map. Callers must hold subscribers_lock_ prior to calling this method.
+  /// the subscriber map. Callers must hold subscribers_lock_ prior to calling this
+  /// method.
   void UnregisterSubscriber(Subscriber* subscriber);
 
   /// Populates a TUpdateStateRequest with the update state for this subscriber. Iterates

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/statestore/statestored-main.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestored-main.cc b/be/src/statestore/statestored-main.cc
index 1dcb682..1f06f04 100644
--- a/be/src/statestore/statestored-main.cc
+++ b/be/src/statestore/statestored-main.cc
@@ -67,10 +67,12 @@ int StatestoredMain(int argc, char** argv) {
     LOG(INFO) << "Not starting webserver";
   }
 
-  metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr);
+  ABORT_IF_ERROR(
+      metrics->Init(FLAGS_enable_webserver ? webserver.get() : nullptr));
   ABORT_IF_ERROR(RegisterMemoryMetrics(metrics.get(), false, nullptr, nullptr));
   StartMemoryMaintenanceThread();
-  StartThreadInstrumentation(metrics.get(), webserver.get(), false);
+  ABORT_IF_ERROR(
+    StartThreadInstrumentation(metrics.get(), webserver.get(), false));
   InitRpcEventTracing(webserver.get());
   // TODO: Add a 'common metrics' method to add standard metrics to
   // both statestored and impalad

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/death-test-util.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/death-test-util.h b/be/src/testutil/death-test-util.h
index 6421fb7..474025b 100644
--- a/be/src/testutil/death-test-util.h
+++ b/be/src/testutil/death-test-util.h
@@ -25,10 +25,10 @@
 // Wrapper around gtest's ASSERT_DEBUG_DEATH that prevents coredumps and minidumps
 // being generated as the result of the death test.
 #ifndef NDEBUG
-#define IMPALA_ASSERT_DEBUG_DEATH(fn, msg)    \
-  do {                                        \
-    ScopedCoredumpDisabler disable_coredumps; \
-    ASSERT_DEBUG_DEATH((void)fn, msg);              \
+#define IMPALA_ASSERT_DEBUG_DEATH(fn, msg)      \
+  do {                                          \
+    ScopedCoredumpDisabler disable_coredumps;   \
+    ASSERT_DEBUG_DEATH((void)fn, msg); \
   } while (false);
 #else
 // Gtest's ASSERT_DEBUG_DEATH macro has peculiar semantics where in debug builds it

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/fault-injection-util.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/fault-injection-util.cc b/be/src/testutil/fault-injection-util.cc
index e2c32b1..f378c48 100644
--- a/be/src/testutil/fault-injection-util.cc
+++ b/be/src/testutil/fault-injection-util.cc
@@ -19,6 +19,8 @@
 
 #include "testutil/fault-injection-util.h"
 
+#include <random>
+
 #include <thrift/transport/TSSLSocket.h>
 #include <thrift/transport/TTransportException.h>
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/impalad-query-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/impalad-query-executor.cc b/be/src/testutil/impalad-query-executor.cc
index db0aceb..111b2a6 100644
--- a/be/src/testutil/impalad-query-executor.cc
+++ b/be/src/testutil/impalad-query-executor.cc
@@ -44,7 +44,7 @@ ImpaladQueryExecutor::ImpaladQueryExecutor(const string& hostname, uint32_t port
 }
 
 ImpaladQueryExecutor::~ImpaladQueryExecutor() {
-  Close();
+  discard_result(Close());
 }
 
 Status ImpaladQueryExecutor::Setup() {
@@ -71,7 +71,7 @@ Status ImpaladQueryExecutor::Close() {
 Status ImpaladQueryExecutor::Exec(
     const string& query_string, vector<FieldSchema>* col_schema) {
   // close anything that ran previously
-  Close();
+  discard_result(Close());
   Query query;
   query.query = query_string;
   query.configuration = exec_options_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/in-process-servers.cc
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.cc b/be/src/testutil/in-process-servers.cc
index 7a81915..4fecfb3 100644
--- a/be/src/testutil/in-process-servers.cc
+++ b/be/src/testutil/in-process-servers.cc
@@ -68,7 +68,8 @@ InProcessImpalaServer* InProcessImpalaServer::StartWithEphemeralPorts(
     // pick a new set of ports
     Status started = impala->StartWithClientServers(beeswax_port, hs2_port);
     if (started.ok()) {
-      impala->SetCatalogInitialized();
+      const Status status = impala->SetCatalogInitialized();
+      if (!status.ok()) LOG(WARNING) << status.GetDetail();
       return impala;
     }
     delete impala;
@@ -88,13 +89,14 @@ InProcessImpalaServer::InProcessImpalaServer(const string& hostname, int backend
           statestore_host, statestore_port)) {
 }
 
-void InProcessImpalaServer::SetCatalogInitialized() {
+Status InProcessImpalaServer::SetCatalogInitialized() {
   DCHECK(impala_server_ != NULL) << "Call Start*() first.";
-  exec_env_->frontend()->SetCatalogInitialized();
+  return exec_env_->frontend()->SetCatalogInitialized();
 }
 
 Status InProcessImpalaServer::StartWithClientServers(int beeswax_port, int hs2_port) {
   RETURN_IF_ERROR(exec_env_->StartServices());
+
   beeswax_port_ = beeswax_port;
   hs2_port_ = hs2_port;
   ThriftServer* be_server;
@@ -158,7 +160,7 @@ InProcessStatestore::InProcessStatestore(int statestore_port, int webserver_port
 }
 
 Status InProcessStatestore::Start() {
-  webserver_->Start();
+  RETURN_IF_ERROR(webserver_->Start());
   boost::shared_ptr<TProcessor> processor(
       new StatestoreServiceProcessor(statestore_->thrift_iface()));
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/testutil/in-process-servers.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index 3f91b2a..03b02f3 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -74,7 +74,7 @@ class InProcessImpalaServer {
 
   /// Sets the catalog on this impalad to be initialized. If we don't
   /// start up a catalogd, then there is no one to initialize it otherwise.
-  void SetCatalogInitialized();
+  Status SetCatalogInitialized();
 
   uint32_t beeswax_port() const { return beeswax_port_; }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/util/benchmark.cc b/be/src/util/benchmark.cc
index 43570a6..6ffbf00 100644
--- a/be/src/util/benchmark.cc
+++ b/be/src/util/benchmark.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <cmath>
 #include <iomanip>
 #include <iostream>
 #include <sstream>

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util-test.cc b/be/src/util/bit-util-test.cc
index 2c40e569..5f8d443 100644
--- a/be/src/util/bit-util-test.cc
+++ b/be/src/util/bit-util-test.cc
@@ -17,10 +17,12 @@
 
 #include <stdlib.h>
 #include <stdio.h>
-#include <iostream>
-#include <algorithm>
 #include <limits.h>
 
+#include <algorithm>
+#include <iostream>
+#include <numeric>
+
 #include <boost/utility.hpp>
 
 #include "testutil/gtest-util.h"

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/codec.h
----------------------------------------------------------------------
diff --git a/be/src/util/codec.h b/be/src/util/codec.h
index 9475ec1..b150b3c 100644
--- a/be/src/util/codec.h
+++ b/be/src/util/codec.h
@@ -63,7 +63,8 @@ class Codec {
   /// If mem_pool is nullptr, then the resulting codec will never allocate memory and
   /// the caller must be responsible for it.
   static Status CreateDecompressor(MemPool* mem_pool, bool reuse,
-    THdfsCompression::type format, boost::scoped_ptr<Codec>* decompressor);
+      THdfsCompression::type format,
+      boost::scoped_ptr<Codec>* decompressor) WARN_UNUSED_RESULT;
 
   /// Alternate factory method: takes a codec string and populates a scoped pointer.
   static Status CreateDecompressor(MemPool* mem_pool, bool reuse,
@@ -88,7 +89,8 @@ class Codec {
   /// Return the name of a compression algorithm.
   static std::string GetCodecName(THdfsCompression::type);
   /// Returns the java class name for the given compression type
-  static Status GetHadoopCodecClassName(THdfsCompression::type, std::string* out_name);
+  static Status GetHadoopCodecClassName(
+      THdfsCompression::type, std::string* out_name) WARN_UNUSED_RESULT;
 
   virtual ~Codec() {}
 
@@ -109,7 +111,8 @@ class Codec {
   ///   input_length: length of the data to process
   ///   input: data to process
   virtual Status ProcessBlock(bool output_preallocated, int64_t input_length,
-      const uint8_t* input, int64_t* output_length, uint8_t** output) = 0;
+      const uint8_t* input, int64_t* output_length,
+      uint8_t** output) WARN_UNUSED_RESULT = 0;
 
   /// Wrapper to the actual ProcessBlock() function. This wrapper uses lengths as ints and
   /// not int64_ts. We need to keep this interface because the Parquet thrift uses ints.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/filesystem-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/filesystem-util.h b/be/src/util/filesystem-util.h
index 3e824b8..1d497c3 100644
--- a/be/src/util/filesystem-util.h
+++ b/be/src/util/filesystem-util.h
@@ -31,22 +31,23 @@ class FileSystemUtil {
   /// Create the specified directory and any ancestor directories that do not exist yet.
   /// The directory and its contents are destroyed if it already exists.
   /// Returns Status::OK if successful, or a runtime error with a message otherwise.
-  static Status RemoveAndCreateDirectory(const std::string& directory);
+  static Status RemoveAndCreateDirectory(const std::string& directory) WARN_UNUSED_RESULT;
 
   /// Create a file at the specified path.
-  static Status CreateFile(const std::string& file_path);
+  static Status CreateFile(const std::string& file_path) WARN_UNUSED_RESULT;
 
   /// Remove the specified paths and their enclosing files/directories.
-  static Status RemovePaths(const std::vector<std::string>& directories);
+  static Status RemovePaths(
+      const std::vector<std::string>& directories) WARN_UNUSED_RESULT;
 
   /// Verify that the specified path is an existing directory.
   /// Returns Status::OK if it is, or a runtime error with a message otherwise.
-  static Status VerifyIsDirectory(const std::string& directory_path);
+  static Status VerifyIsDirectory(const std::string& directory_path) WARN_UNUSED_RESULT;
 
   /// Returns the space available on the file system containing 'directory_path'
   /// in 'available_bytes'
-  static Status GetSpaceAvailable(const std::string& directory_path,
-      uint64_t* available_bytes);
+  static Status GetSpaceAvailable(
+      const std::string& directory_path, uint64_t* available_bytes) WARN_UNUSED_RESULT;
 
   /// Returns the currently allowed maximum of possible file descriptors. In case of an
   /// error returns 0.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/hdfs-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/hdfs-util-test.cc b/be/src/util/hdfs-util-test.cc
index 89cc43a..b389864 100644
--- a/be/src/util/hdfs-util-test.cc
+++ b/be/src/util/hdfs-util-test.cc
@@ -34,7 +34,8 @@ TEST(HdfsUtilTest, CheckFilesystemsMatch) {
   ExecEnv* exec_env = new ExecEnv();
 
   // We do this to retrieve the default FS from the frontend.
-  exec_env->StartServices();
+  // It doesn't matter if starting the services fails.
+  discard_result(exec_env->StartServices());
 
   // Tests with both paths qualified.
   EXPECT_TRUE(FilesystemsMatch("s3a://dummybucket/temp_dir/temp_path",

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/jni-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/jni-util.cc b/be/src/util/jni-util.cc
index 78f67a7..220eb83 100644
--- a/be/src/util/jni-util.cc
+++ b/be/src/util/jni-util.cc
@@ -100,12 +100,6 @@ Status JniUtil::LocalToGlobalRef(JNIEnv* env, jobject local_ref, jobject* global
   return Status::OK();
 }
 
-Status JniUtil::FreeGlobalRef(JNIEnv* env, jobject global_ref) {
-  env->DeleteGlobalRef(global_ref);
-  RETURN_ERROR_IF_EXC(env);
-  return Status::OK();
-}
-
 Status JniUtil::Init() {
   // Get the JNIEnv* corresponding to current thread.
   JNIEnv* env = getJNIEnv();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/jni-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index 9a9cb15..77abb4d 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -124,7 +124,7 @@ class JniLocalFrame {
   /// The number of local references created inside the frame might exceed max_local_ref,
   /// but there is no guarantee that memory will be available.
   /// Push should be called at most once.
-  Status push(JNIEnv* env, int max_local_ref=10);
+  Status push(JNIEnv* env, int max_local_ref = 10) WARN_UNUSED_RESULT;
 
  private:
   JNIEnv* env_;
@@ -187,7 +187,7 @@ class JniUtil {
   static void InitLibhdfs();
 
   /// Find JniUtil class, and get JniUtil.throwableToString method id
-  static Status Init();
+  static Status Init() WARN_UNUSED_RESULT;
 
   /// Returns true if the given class could be found on the CLASSPATH in env.
   /// Returns false otherwise, or if any other error occurred (e.g. a JNI exception).
@@ -204,13 +204,15 @@ class JniUtil {
   /// The returned reference must eventually be freed by calling FreeGlobalRef() (or have
   /// the lifetime of the impalad process).
   /// Catches Java exceptions and converts their message into status.
-  static Status GetGlobalClassRef(JNIEnv* env, const char* class_str, jclass* class_ref);
+  static Status GetGlobalClassRef(
+      JNIEnv* env, const char* class_str, jclass* class_ref) WARN_UNUSED_RESULT;
 
   /// Creates a global reference from a local reference returned into global_ref.
   /// The returned reference must eventually be freed by calling FreeGlobalRef() (or have
   /// the lifetime of the impalad process).
   /// Catches Java exceptions and converts their message into status.
-  static Status LocalToGlobalRef(JNIEnv* env, jobject local_ref, jobject* global_ref);
+  static Status LocalToGlobalRef(JNIEnv* env, jobject local_ref,
+      jobject* global_ref) WARN_UNUSED_RESULT;
 
   /// Templated wrapper for jobject subclasses (e.g. jclass, jarray). This is necessary
   /// because according to
@@ -224,15 +226,11 @@ class JniUtil {
   /// to use a subclass like _jclass**. This is safe in this case because the returned
   /// subclass is known to be correct.
   template <typename jobject_subclass>
-  static Status LocalToGlobalRef(JNIEnv* env, jobject local_ref,
-      jobject_subclass* global_ref) {
+  static Status LocalToGlobalRef(
+      JNIEnv* env, jobject local_ref, jobject_subclass* global_ref) {
     return LocalToGlobalRef(env, local_ref, reinterpret_cast<jobject*>(global_ref));
   }
 
-  /// Deletes 'global_ref'. Catches Java exceptions and converts their message into
-  /// status.
-  static Status FreeGlobalRef(JNIEnv* env, jobject global_ref);
-
   static jmethodID throwable_to_string_id() { return throwable_to_string_id_; }
   static jmethodID throwable_to_stack_trace_id() { return throwable_to_stack_trace_id_; }
 
@@ -246,30 +244,31 @@ class JniUtil {
   /// log_stack determines if the stack trace is written to the log
   /// prefix, if non-empty will be prepended to the error message.
   static Status GetJniExceptionMsg(JNIEnv* env, bool log_stack = true,
-      const std::string& prefix = "");
+      const std::string& prefix = "") WARN_UNUSED_RESULT;
 
   /// Populates 'result' with a list of memory metrics from the Jvm. Returns Status::OK
   /// unless there is an exception.
   static Status GetJvmMetrics(const TGetJvmMetricsRequest& request,
-      TGetJvmMetricsResponse* result);
+      TGetJvmMetricsResponse* result) WARN_UNUSED_RESULT;
 
   // Populates 'result' with information about live JVM threads. Returns
   // Status::OK unless there is an exception.
   static Status GetJvmThreadsInfo(const TGetJvmThreadsInfoRequest& request,
-      TGetJvmThreadsInfoResponse* result);
+      TGetJvmThreadsInfoResponse* result) WARN_UNUSED_RESULT;
 
   /// Loads a method whose signature is in the supplied descriptor. Returns Status::OK
   /// and sets descriptor->method_id to a JNI method handle if successful, otherwise an
   /// error status is returned.
   static Status LoadJniMethod(JNIEnv* jni_env, const jclass& jni_class,
-      JniMethodDescriptor* descriptor);
+      JniMethodDescriptor* descriptor) WARN_UNUSED_RESULT;
 
   /// Same as LoadJniMethod(...), except that this loads a static method.
   static Status LoadStaticJniMethod(JNIEnv* jni_env, const jclass& jni_class,
-      JniMethodDescriptor* descriptor);
+      JniMethodDescriptor* descriptor) WARN_UNUSED_RESULT;
 
   /// Utility methods to avoid repeating lots of the JNI call boilerplate.
-  static Status CallJniMethod(const jobject& obj, const jmethodID& method) {
+  static Status CallJniMethod(
+      const jobject& obj, const jmethodID& method) WARN_UNUSED_RESULT {
     JNIEnv* jni_env = getJNIEnv();
     JniLocalFrame jni_frame;
     RETURN_IF_ERROR(jni_frame.push(jni_env));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index ffc3d20..6f306f7 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -155,7 +155,7 @@ class JvmMetric : public IntGauge {
  public:
   /// Registers many Jvm memory metrics: one for every member of JvmMetricType for each
   /// pool (usually ~5 pools plus a synthetic 'total' pool).
-  static Status InitMetrics(MetricGroup* metrics);
+  static Status InitMetrics(MetricGroup* metrics) WARN_UNUSED_RESULT;
 
  protected:
   /// Searches through jvm_metrics_response_ for a matching memory pool and pulls out the
@@ -193,7 +193,7 @@ class JvmMetric : public IntGauge {
 class BufferPoolMetric : public UIntGauge {
  public:
   static Status InitMetrics(MetricGroup* metrics, ReservationTracker* global_reservations,
-      BufferPool* buffer_pool);
+      BufferPool* buffer_pool) WARN_UNUSED_RESULT;
 
   /// Global metrics, initialized by CreateAndRegisterMetrics().
   static BufferPoolMetric* LIMIT;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/metrics-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/metrics-test.cc b/be/src/util/metrics-test.cc
index 543e0e3..f25d5ad 100644
--- a/be/src/util/metrics-test.cc
+++ b/be/src/util/metrics-test.cc
@@ -217,7 +217,7 @@ TEST_F(MetricsTest, StatsMetricsSingle) {
 TEST_F(MetricsTest, MemMetric) {
 #ifndef ADDRESS_SANITIZER
   MetricGroup metrics("MemMetrics");
-  RegisterMemoryMetrics(&metrics, false, nullptr, nullptr);
+  ASSERT_OK(RegisterMemoryMetrics(&metrics, false, nullptr, nullptr));
   // Smoke test to confirm that tcmalloc metrics are returning reasonable values.
   UIntGauge* bytes_in_use =
       metrics.FindMetricForTesting<UIntGauge>("tcmalloc.bytes-in-use");
@@ -249,7 +249,7 @@ TEST_F(MetricsTest, MemMetric) {
 
 TEST_F(MetricsTest, JvmMetrics) {
   MetricGroup metrics("JvmMetrics");
-  RegisterMemoryMetrics(&metrics, true, nullptr, nullptr);
+  ASSERT_OK(RegisterMemoryMetrics(&metrics, true, nullptr, nullptr));
   UIntGauge* jvm_total_used =
       metrics.GetOrCreateChildGroup("jvm")->FindMetricForTesting<UIntGauge>(
           "jvm.total.current-usage-bytes");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/network-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h
index 2615184..1783589 100644
--- a/be/src/util/network-util.h
+++ b/be/src/util/network-util.h
@@ -32,7 +32,7 @@ typedef std::string IpAddr;
 /// 'address'. If the IP addresses of a host don't change, then subsequent calls will
 /// always return the same address. Returns an error status if any system call failed,
 /// otherwise OK. Even if OK is returned, addresses may still be of zero length.
-Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip);
+Status HostnameToIpAddr(const Hostname& hostname, IpAddr* ip) WARN_UNUSED_RESULT;
 
 /// Finds the first non-localhost IP address in the given list. Returns
 /// true if such an address was found, false otherwise.
@@ -40,7 +40,7 @@ bool FindFirstNonLocalhost(const std::vector<std::string>& addresses, std::strin
 
 /// Sets the output argument to the system defined hostname.
 /// Returns OK if a hostname can be found, false otherwise.
-Status GetHostname(std::string* hostname);
+Status GetHostname(std::string* hostname) WARN_UNUSED_RESULT;
 
 /// Utility method because Thrift does not supply useful constructors
 TNetworkAddress MakeNetworkAddress(const std::string& hostname, int port);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index e21cff6..d5b0d01 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -136,6 +136,7 @@ class ParquetLevelReader : public impala::RleDecoder {
 //     were actually written if the final run is a literal run, only if the final run is
 //     a repeated run (see util/rle-encoding.h for more details).
 // Returns the number of rows specified by the header.
+// Aborts the process if reading the file fails.
 int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_t* page) {
   const uint8_t* data = page;
   std::vector<uint8_t> decompressed_buffer;
@@ -143,8 +144,8 @@ int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_
     decompressed_buffer.resize(header.uncompressed_page_size);
 
     boost::scoped_ptr<impala::Codec> decompressor;
-    impala::Codec::CreateDecompressor(
-        NULL, false, impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor);
+    ABORT_IF_ERROR(impala::Codec::CreateDecompressor(NULL, false,
+        impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor));
 
     uint8_t* buffer_ptr = decompressed_buffer.data();
     int uncompressed_page_size = header.uncompressed_page_size;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/runtime-profile.cc
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index ec31fc9..12f4e25 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -33,6 +33,7 @@
 #include "util/periodic-counter-updater.h"
 #include "util/pretty-printer.h"
 #include "util/redactor.h"
+#include "util/scope-exit-trigger.h"
 
 #include "common/names.h"
 
@@ -723,37 +724,39 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
   }
 }
 
-string RuntimeProfile::SerializeToArchiveString() const {
+Status RuntimeProfile::SerializeToArchiveString(string* out) const {
   stringstream ss;
-  SerializeToArchiveString(&ss);
-  return ss.str();
+  RETURN_IF_ERROR(SerializeToArchiveString(&ss));
+  *out = ss.str();
+  return Status::OK();
 }
 
-void RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
+Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
+  Status status;
   TRuntimeProfileTree thrift_object;
   const_cast<RuntimeProfile*>(this)->ToThrift(&thrift_object);
   ThriftSerializer serializer(true);
   vector<uint8_t> serialized_buffer;
-  Status status = serializer.Serialize(&thrift_object, &serialized_buffer);
-  if (!status.ok()) return;
+  RETURN_IF_ERROR(serializer.Serialize(&thrift_object, &serialized_buffer));
 
   // Compress the serialized thrift string.  This uses string keys and is very
   // easy to compress.
   scoped_ptr<Codec> compressor;
-  status = Codec::CreateCompressor(NULL, false, THdfsCompression::DEFAULT, &compressor);
-  DCHECK(status.ok()) << status.GetDetail();
-  if (!status.ok()) return;
+  RETURN_IF_ERROR(
+      Codec::CreateCompressor(NULL, false, THdfsCompression::DEFAULT, &compressor));
+  const auto close_compressor =
+      MakeScopeExitTrigger([&compressor]() { compressor->Close(); });
 
   vector<uint8_t> compressed_buffer;
   compressed_buffer.resize(compressor->MaxOutputLen(serialized_buffer.size()));
   int64_t result_len = compressed_buffer.size();
   uint8_t* compressed_buffer_ptr = compressed_buffer.data();
-  compressor->ProcessBlock(true, serialized_buffer.size(), serialized_buffer.data(),
-      &result_len, &compressed_buffer_ptr);
+  RETURN_IF_ERROR(compressor->ProcessBlock(true, serialized_buffer.size(),
+      serialized_buffer.data(), &result_len, &compressed_buffer_ptr));
   compressed_buffer.resize(result_len);
 
   Base64Encode(compressed_buffer, out);
-  compressor->Close();
+  return Status::OK();;
 }
 
 void RuntimeProfile::ToThrift(TRuntimeProfileTree* tree) const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 244ab17..298c214 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -265,8 +265,8 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   /// object using thrift compact binary format, then gzip compresses it and
   /// finally encodes it as base64.  This is not a lightweight operation and
   /// should not be in the hot path.
-  std::string SerializeToArchiveString() const;
-  void SerializeToArchiveString(std::stringstream* out) const;
+  Status SerializeToArchiveString(std::string* out) const WARN_UNUSED_RESULT;
+  Status SerializeToArchiveString(std::stringstream* out) const WARN_UNUSED_RESULT;
 
   /// Divides all counters by n
   void Divide(int n);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/thread-pool.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread-pool.h b/be/src/util/thread-pool.h
index cbc0031..800f690 100644
--- a/be/src/util/thread-pool.h
+++ b/be/src/util/thread-pool.h
@@ -52,7 +52,7 @@ class ThreadPool : public CacheLineAligned {
     for (int i = 0; i < num_threads; ++i) {
       std::stringstream threadname;
       threadname << thread_prefix << "(" << i + 1 << ":" << num_threads << ")";
-      threads_.AddThread(new Thread(group, threadname.str(),
+      threads_.AddThread(std::make_unique<Thread>(group, threadname.str(),
           boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i)));
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index c84ef0b..0e08ab1 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -34,7 +34,6 @@
 #include "common/names.h"
 
 namespace this_thread = boost::this_thread;
-using boost::ptr_vector;
 using namespace rapidjson;
 
 namespace impala {
@@ -331,13 +330,12 @@ void Thread::SuperviseThread(const string& name, const string& category,
   thread_mgr_ref->RemoveThread(this_thread::get_id(), category_copy);
 }
 
-Status ThreadGroup::AddThread(Thread* thread) {
-  threads_.push_back(thread);
-  return Status::OK();
+void ThreadGroup::AddThread(unique_ptr<Thread> thread) {
+  threads_.emplace_back(move(thread));
 }
 
 void ThreadGroup::JoinAll() {
-  for (const Thread& thread: threads_) thread.Join();
+  for (auto& thread : threads_) thread->Join();
 }
 
 int ThreadGroup::Size() const {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/86e88cad/be/src/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index e21be7c..18f3a75 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -18,11 +18,13 @@
 #ifndef IMPALA_UTIL_THREAD_H
 #define IMPALA_UTIL_THREAD_H
 
+#include <memory>
+#include <vector>
+
 #include <boost/bind.hpp>
 #include <boost/function.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/thread.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
 
 #include "common/status.h"
 #include "util/promise.h"
@@ -173,7 +175,7 @@ class ThreadGroup {
   /// will destroy it when the ThreadGroup is destroyed.  Threads will linger until that
   /// point (even if terminated), however, so callers should be mindful of the cost of
   /// placing very many threads in this set.
-  Status AddThread(Thread* thread);
+  void AddThread(std::unique_ptr<Thread> thread);
 
   /// Waits for all threads to finish. DO NOT call this from a thread inside this set;
   /// deadlock will predictably ensue.
@@ -184,7 +186,7 @@ class ThreadGroup {
 
  private:
   /// All the threads grouped by this set.
-  boost::ptr_vector<Thread> threads_;
+  std::vector<std::unique_ptr<Thread>> threads_;
 };
 
 /// Initialises the threading subsystem. Must be called before a Thread is created.


[7/9] incubator-impala git commit: IMPALA-5452: Rewrite test case to avoid 'pos'.

Posted by ta...@apache.org.
IMPALA-5452: Rewrite test case to avoid 'pos'.

The original test case accessed the 'pos' field of nested
collections. The query results could vary when reloading
the data because the order of items within a nested
collection is not necessarily the same accross loads.

This patch reformulates the test to avoid 'pos'.

Change-Id: I32e47f0845da8b27652faaceae834e025ecff42a
Reviewed-on: http://gerrit.cloudera.org:8080/7708
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/79fba276
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/79fba276
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/79fba276

Branch: refs/heads/master
Commit: 79fba2768768107e408644e0caea15f60b5f3354
Parents: 5c82f9d
Author: Alex Behm <al...@cloudera.com>
Authored: Wed Aug 16 17:25:06 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Aug 18 02:53:23 2017 +0000

----------------------------------------------------------------------
 .../queries/QueryTest/nested-types-subplan.test        | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/79fba276/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
index 205978d..82930ad 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-subplan.test
@@ -657,3 +657,16 @@ where c_custkey % 100 = 0 and order_cnt != union_cnt - 3;
 ---- TYPES
 bigint,bigint,bigint
 ====
+---- QUERY
+# IMPALA-2368: Resetting nested subplans works correctly with an analytic sort.
+select count(o_orderkey)
+from tpch_nested_parquet.customer c
+inner join c.c_orders o
+where c_custkey < 10 and c_custkey in
+ (select lead(l.l_linenumber) over (partition by l.l_shipdate order by l.l_linenumber)
+  from o.o_lineitems l)
+---- RESULTS
+3
+---- TYPES
+bigint
+====


[9/9] incubator-impala git commit: IMPALA-5809: Relax max_minidumps in breakpad test

Posted by ta...@apache.org.
IMPALA-5809: Relax max_minidumps in breakpad test

The change to address IMPALA-5769 added periodic cleaning for minidumps,
which got in the way of the other minidump tests.

This change sets max_minidumps to the default value (9) for all tests to
keep the cleanup thread from interfering, and then sets a smaller limit
where needed.

Change-Id: I977930ae87b8d4671a89c1e07ba76b12eb92fa55
Reviewed-on: http://gerrit.cloudera.org:8080/7716
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/dc2f69e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/dc2f69e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/dc2f69e5

Branch: refs/heads/master
Commit: dc2f69e5a0b36ca721eeadeec661a251c957410b
Parents: 039255a
Author: Lars Volker <lv...@cloudera.com>
Authored: Thu Aug 17 14:22:48 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Aug 18 04:51:30 2017 +0000

----------------------------------------------------------------------
 tests/custom_cluster/test_breakpad.py | 19 +++++++++----------
 1 file changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dc2f69e5/tests/custom_cluster/test_breakpad.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_breakpad.py b/tests/custom_cluster/test_breakpad.py
index 424e1c0..87e3b51 100644
--- a/tests/custom_cluster/test_breakpad.py
+++ b/tests/custom_cluster/test_breakpad.py
@@ -37,9 +37,6 @@ class TestBreakpad(CustomClusterTestSuite):
   writing minidump files on unhandled signals and rotating old minidumps on startup. The
   tests kill the daemons by sending a SIGSEGV signal.
   """
-  # Limit for the number of minidumps that gets passed to the daemons as a startup flag.
-  MAX_MINIDUMPS = 2
-
   @classmethod
   def get_workload(cls):
     return 'functional-query'
@@ -80,8 +77,7 @@ class TestBreakpad(CustomClusterTestSuite):
     self._start_impala_cluster(cluster_options)
 
   def start_cluster(self):
-    self.start_cluster_with_args(minidump_path=self.tmp_dir,
-                                 max_minidumps=self.MAX_MINIDUMPS)
+    self.start_cluster_with_args(minidump_path=self.tmp_dir)
 
   def kill_cluster(self, signal):
     self.cluster.refresh()
@@ -219,11 +215,14 @@ class TestBreakpad(CustomClusterTestSuite):
     """Check that a limited number of minidumps is preserved during startup."""
     assert self.count_all_minidumps() == 0
     self.start_cluster()
+    cluster_size = self.get_num_processes('impalad')
     self.kill_cluster(SIGSEGV)
     self.assert_num_logfile_entries(1)
-    self.start_cluster()
-    expected_impalads = min(self.get_num_processes('impalad'), self.MAX_MINIDUMPS)
-    assert self.count_minidumps('impalad') == expected_impalads
+    # Maximum number of minidumps that the impalads should keep for this test.
+    max_minidumps = 2
+    self.start_cluster_with_args(minidump_path=self.tmp_dir,
+                                 max_minidumps=max_minidumps)
+    assert self.count_minidumps('impalad') == min(cluster_size, max_minidumps)
     assert self.count_minidumps('statestored') == 1
     assert self.count_minidumps('catalogd') == 1
 
@@ -231,7 +230,7 @@ class TestBreakpad(CustomClusterTestSuite):
   def test_minidump_cleanup_thread(self):
     """Check that periodic rotation preserves a limited number of minidumps."""
     assert self.count_all_minidumps() == 0
-    # Maximum number of minidump that the impalads should keep for this test.
+    # Maximum number of minidumps that the impalads should keep for this test.
     max_minidumps = 2
     # Sleep interval for the log rotation thread.
     rotation_interval = 1
@@ -267,7 +266,7 @@ class TestBreakpad(CustomClusterTestSuite):
     """Check that setting the minidump_path to an empty value disables minidump creation.
     """
     assert self.count_all_minidumps() == 0
-    self.start_cluster_with_args(minidump_path='', max_minidumps=self.MAX_MINIDUMPS)
+    self.start_cluster_with_args(minidump_path='')
     self.kill_cluster(SIGSEGV)
     self.assert_num_logfile_entries(0)