You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/09/26 22:08:35 UTC

[1/3] incubator-impala git commit: IMPALA-3671: Add query option to limit scratch space usage

Repository: incubator-impala
Updated Branches:
  refs/heads/master d83783e24 -> 872d5462b


IMPALA-3671: Add query option to limit scratch space usage

Currently we can only disable spilling via a startup option which means
we need to restart the cluster for this.
This patch adds a new query option 'SCRATCH_LIMIT' that limits the amount of
scratch directory space that can be used. This would be useful to prevent
runaway queries or to prevent queries from spilling when that is not desired.
This also adds a 'ScratchSpace' counter to the runtime profile of the
BlockMgr that keeps track of the scratch space allocated.

Valid values for the SCRATCH_LIMIT query option are:
- unspecified or a limit of -1 means no limit
- a limit of 0 (zero) means spilling is disabled
- an int (= number of bytes)
- a float followed by "M" (MB) or "G" (GB)

Testing:
A new test file "test_scratch_limit.py" was added for testing functionality.

Change-Id: Ibf8842626ded1345b632a0ccdb9a580e6a0ad470
Reviewed-on: http://gerrit.cloudera.org:8080/4497
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/9313dcdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/9313dcdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/9313dcdb

Branch: refs/heads/master
Commit: 9313dcdb830b0cd24479ca892988d17defc9ca19
Parents: d83783e
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Wed Aug 3 11:11:54 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Sep 24 02:48:46 2016 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-block-mgr-test.cc  |  24 ++++-
 be/src/runtime/buffered-block-mgr.cc       |  52 ++++++-----
 be/src/runtime/buffered-block-mgr.h        |  19 ++--
 be/src/runtime/test-env.cc                 |   8 +-
 be/src/runtime/test-env.h                  |  11 ++-
 be/src/runtime/tmp-file-mgr-test.cc        | 117 ++++++++++++++++--------
 be/src/runtime/tmp-file-mgr.cc             |  45 ++++++++-
 be/src/runtime/tmp-file-mgr.h              |  98 ++++++++++++++++----
 be/src/service/query-options.cc            |  12 +++
 be/src/service/query-options.h             |   5 +-
 common/thrift/ImpalaInternalService.thrift |   3 +
 common/thrift/ImpalaService.thrift         |   7 +-
 common/thrift/generate_error_codes.py      |   5 +-
 tests/query_test/test_scratch_limit.py     | 105 +++++++++++++++++++++
 14 files changed, 407 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc
index 169baf2..5eb1f8c 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -141,19 +141,21 @@ class BufferedBlockMgrTest : public ::testing::Test {
 
   /// Helper to create a simple block manager.
   BufferedBlockMgr* CreateMgr(int64_t query_id, int max_buffers, int block_size,
-      RuntimeState** query_state = NULL) {
+      RuntimeState** query_state = NULL, TQueryOptions* query_options = NULL) {
     RuntimeState* state;
     EXPECT_TRUE(test_env_->CreateQueryState(query_id, max_buffers, block_size,
-        &state).ok());
+        &state, query_options).ok());
     if (query_state != NULL) *query_state = state;
     return state->block_mgr();
   }
 
   BufferedBlockMgr* CreateMgrAndClient(int64_t query_id, int max_buffers, int block_size,
       int reserved_blocks, bool tolerates_oversubscription, MemTracker* tracker,
-      BufferedBlockMgr::Client** client, RuntimeState** query_state = NULL) {
+      BufferedBlockMgr::Client** client, RuntimeState** query_state = NULL,
+      TQueryOptions* query_options = NULL) {
     RuntimeState* state;
-    BufferedBlockMgr* mgr = CreateMgr(query_id, max_buffers, block_size, &state);
+    BufferedBlockMgr* mgr = CreateMgr(query_id, max_buffers, block_size, &state,
+        query_options);
     EXPECT_TRUE(mgr->RegisterClient(Substitute("Client for query $0", query_id),
         reserved_blocks, tolerates_oversubscription, tracker, state, client).ok());
     EXPECT_TRUE(client != NULL);
@@ -1188,6 +1190,20 @@ TEST_F(BufferedBlockMgrTest, NoTmpDirs) {
   DeleteBlocks(blocks);
 }
 
+// Test that block manager can still allocate buffers when spilling is disabled by
+// setting scratch_limit = 0.
+TEST_F(BufferedBlockMgrTest, ScratchLimitZero) {
+  int max_num_buffers = 3;
+  BufferedBlockMgr::Client* client;
+  TQueryOptions query_options;
+  query_options.scratch_limit = 0;
+  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_,
+      0, false, client_tracker_.get(), &client, NULL, &query_options);
+  vector<BufferedBlockMgr::Block*> blocks;
+  AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
+  DeleteBlocks(blocks);
+}
+
 // Create two clients with different number of reserved buffers.
 TEST_F(BufferedBlockMgrTest, MultipleClients) {
   int client1_buffers = 3;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index d582c31..b2c3973 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -209,17 +209,19 @@ string BufferedBlockMgr::Block::DebugString() const {
 }
 
 BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr,
-    int64_t block_size)
+    int64_t block_size, int64_t scratch_limit)
   : max_block_size_(block_size),
     // Keep two writes in flight per scratch disk so the disks can stay busy.
     block_write_threshold_(tmp_file_mgr->num_active_tmp_devices() * 2),
-    disable_spill_(state->query_ctx().disable_spilling || block_write_threshold_ == 0),
+    disable_spill_(state->query_ctx().disable_spilling || block_write_threshold_ == 0 ||
+        scratch_limit == 0),
     query_id_(state->query_id()),
     tmp_file_mgr_(tmp_file_mgr),
     initialized_(false),
     unfullfilled_reserved_buffers_(0),
     total_pinned_buffers_(0),
     non_local_outstanding_writes_(0),
+    tmp_file_group(new TmpFileMgr::FileGroup(tmp_file_mgr, scratch_limit)),
     io_mgr_(state->io_mgr()),
     is_cancelled_(false),
     writes_issued_(0),
@@ -242,7 +244,8 @@ Status BufferedBlockMgr::Create(RuntimeState* state, MemTracker* parent,
       // all shared_ptr references have gone to 0 and it is in the process of
       // being deleted. This can happen if the last shared reference is released
       // but before the weak ptr is removed from the map.
-      block_mgr->reset(new BufferedBlockMgr(state, tmp_file_mgr, block_size));
+      block_mgr->reset(new BufferedBlockMgr(state, tmp_file_mgr, block_size,
+          state->query_options().scratch_limit));
       query_to_block_mgrs_[state->query_id()] = *block_mgr;
     }
   }
@@ -557,10 +560,7 @@ BufferedBlockMgr::~BufferedBlockMgr() {
   // See IMPALA-1890.
   DCHECK_EQ(non_local_outstanding_writes_, 0) << endl << DebugInternal();
   // Delete tmp files.
-  for (TmpFileMgr::File& file: tmp_files_) {
-    file.Remove();
-  }
-  tmp_files_.clear();
+  tmp_file_group->Close();
 
   // Validate that clients deleted all of their blocks. Since all writes have
   // completed at this point, any deleted blocks should be in unused_blocks_.
@@ -763,7 +763,7 @@ Status BufferedBlockMgr::WriteUnpinnedBlock(Block* block) {
   DCHECK_EQ(block->buffer_desc_->len, max_block_size_);
 
   if (block->write_range_ == NULL) {
-    if (tmp_files_.empty()) RETURN_IF_ERROR(InitTmpFiles());
+    if (tmp_file_group->NumFiles() == 0) RETURN_IF_ERROR(InitTmpFiles());
 
     // First time the block is being persisted - need to allocate tmp file space.
     TmpFileMgr::File* tmp_file;
@@ -823,12 +823,18 @@ Status BufferedBlockMgr::AllocateScratchSpace(int64_t block_size,
   // Assumes block manager lock is already taken.
   vector<Status> errs;
   // Find the next physical file in round-robin order and create a write range for it.
-  for (int attempt = 0; attempt < tmp_files_.size(); ++attempt) {
-    *tmp_file = &tmp_files_[next_block_index_];
-    next_block_index_ = (next_block_index_ + 1) % tmp_files_.size();
+  for (int attempt = 0; attempt < tmp_file_group->NumFiles(); ++attempt) {
+    *tmp_file = tmp_file_group->GetFileAt(next_block_index_);
+    next_block_index_ = (next_block_index_ + 1) % tmp_file_group->NumFiles();
     if ((*tmp_file)->is_blacklisted()) continue;
-    Status status = (*tmp_file)->AllocateSpace(max_block_size_, file_offset);
-    if (status.ok()) return Status::OK();
+    Status status = (*tmp_file)->AllocateSpace(block_size, file_offset);
+    if (status.ok()) {
+      scratch_space_bytes_used_counter_->Add(block_size);
+      return Status::OK();
+    } else if (status.code() == TErrorCode::SCRATCH_LIMIT_EXCEEDED) {
+      // We cannot allocate from any files if we're at the scratch limit.
+      return status;
+    }
     // Log error and try other files if there was a problem. Problematic files will be
     // blacklisted so we will not repeatedly log the same error.
     LOG(WARNING) << "Error while allocating range in scratch file '"
@@ -1315,6 +1321,8 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, RuntimeProfile* parent_profile,
   buffer_wait_timer_ = ADD_TIMER(profile_.get(), "TotalBufferWaitTime");
   encryption_timer_ = ADD_TIMER(profile_.get(), "TotalEncryptionTime");
   integrity_check_timer_ = ADD_TIMER(profile_.get(), "TotalIntegrityCheckTime");
+  scratch_space_bytes_used_counter_ =
+    ADD_COUNTER(profile_.get(), "ScratchFileUsedBytes", TUnit::BYTES);
 
   // Create a new mem_tracker and allocate buffers.
   mem_tracker_.reset(
@@ -1324,25 +1332,25 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, RuntimeProfile* parent_profile,
 }
 
 Status BufferedBlockMgr::InitTmpFiles() {
-  DCHECK(tmp_files_.empty());
+  DCHECK(tmp_file_group->NumFiles() == 0);
   DCHECK(tmp_file_mgr_ != NULL);
 
   vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr_->active_tmp_devices();
+  int files_allocated = 0;
   // Initialize the tmp files and the initial file to use.
-  tmp_files_.reserve(tmp_devices.size());
   for (int i = 0; i < tmp_devices.size(); ++i) {
-    TmpFileMgr::File* tmp_file;
     TmpFileMgr::DeviceId tmp_device_id = tmp_devices[i];
-    // It is possible for a device to be blacklisted after it was returned
-    // by active_tmp_devices() - handle this gracefully.
-    Status status = tmp_file_mgr_->GetFile(tmp_device_id, query_id_, &tmp_file);
-    if (status.ok()) tmp_files_.push_back(tmp_file);
+    // It is possible for a device to be blacklisted after it was returned by
+    // active_tmp_devices(), handle this gracefully by ignoring the return status of
+    // NewFile().
+    if (tmp_file_group->NewFile(tmp_device_id, query_id_).ok()) ++files_allocated;
   }
-  if (tmp_files_.empty()) {
+  DCHECK_EQ(tmp_file_group->NumFiles(), files_allocated);
+  if (tmp_file_group->NumFiles() == 0) {
     return Status("No spilling directories configured. Cannot spill. Set --scratch_dirs"
         " or see log for previous errors that prevented use of provided directories");
   }
-  next_block_index_ = rand() % tmp_files_.size();
+  next_block_index_ = rand() % tmp_file_group->NumFiles();
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/buffered-block-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h
index ac707bc..ad8ad85 100644
--- a/be/src/runtime/buffered-block-mgr.h
+++ b/be/src/runtime/buffered-block-mgr.h
@@ -423,7 +423,8 @@ class BufferedBlockMgr {
     }
   };
 
-  BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr, int64_t block_size);
+  BufferedBlockMgr(RuntimeState* state, TmpFileMgr* tmp_file_mgr,
+      int64_t block_size, int64_t scratch_limit);
 
   /// Initializes the block mgr. Idempotent and thread-safe.
   void Init(DiskIoMgr* io_mgr, RuntimeProfile* profile,
@@ -492,7 +493,8 @@ class BufferedBlockMgr {
   void WaitForWrite(boost::unique_lock<boost::mutex>& lock, Block* block);
 
   /// Allocate block_size bytes in a temporary file. Try multiple disks if error occurs.
-  /// Returns an error only if no temporary files are usable.
+  /// Returns an error only if no temporary files are usable or the scratch limit is
+  /// exceeded.
   Status AllocateScratchSpace(int64_t block_size, TmpFileMgr::File** tmp_file,
       int64_t* file_offset);
 
@@ -590,12 +592,12 @@ class BufferedBlockMgr {
   /// All allocated io-sized buffers.
   std::list<BufferDescriptor*> all_io_buffers_;
 
-  /// Temporary physical file handle, (one per tmp device) to which blocks may be written.
-  /// Blocks are round-robined across these files.
-  boost::ptr_vector<TmpFileMgr::File> tmp_files_;
+  /// Group of temporary physical files, (one per tmp device) to which
+  /// blocks may be written. Blocks are round-robined across these files.
+  boost::scoped_ptr<TmpFileMgr::FileGroup> tmp_file_group;
 
-  /// Index into tmp_files_ denoting the file to which the next block to be persisted will
-  /// be written.
+  /// Index into 'tmp_file_group_' denoting the file to which the next block will be
+  /// written.
   int next_block_index_;
 
   /// DiskIoMgr handles to read and write blocks.
@@ -641,6 +643,9 @@ class BufferedBlockMgr {
   /// Time spent in disk spill integrity generation and checking.
   RuntimeProfile::Counter* integrity_check_timer_;
 
+  /// Amount of scratch space allocated in bytes.
+  RuntimeProfile::Counter* scratch_space_bytes_used_counter_;
+
   /// Number of writes issued.
   int writes_issued_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 8690e39..c5a9a41 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -66,16 +66,18 @@ TestEnv::~TestEnv() {
   metrics_.reset();
 }
 
-RuntimeState* TestEnv::CreateRuntimeState(int64_t query_id) {
+RuntimeState* TestEnv::CreateRuntimeState(int64_t query_id,
+    TQueryOptions* query_options) {
   TExecPlanFragmentParams plan_params = TExecPlanFragmentParams();
+  if (query_options != NULL) plan_params.query_ctx.request.query_options = *query_options;
   plan_params.query_ctx.query_id.hi = 0;
   plan_params.query_ctx.query_id.lo = query_id;
   return new RuntimeState(plan_params, exec_env_.get());
 }
 
 Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_size,
-    RuntimeState** runtime_state) {
-  *runtime_state = CreateRuntimeState(query_id);
+    RuntimeState** runtime_state, TQueryOptions* query_options) {
+  *runtime_state = CreateRuntimeState(query_id, query_options);
   if (*runtime_state == NULL) {
     return Status("Unexpected error creating RuntimeState");
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index bfbe935..a3ab29a 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -37,10 +37,10 @@ class TestEnv {
   /// query states have been created.
   void InitTmpFileMgr(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device);
 
-  /// Create a RuntimeState for a query with a new block manager. The RuntimeState is
-  /// owned by the TestEnv.
+  /// Create a RuntimeState for a query with a new block manager and the given query
+  /// options. The RuntimeState is owned by the TestEnv.
   Status CreateQueryState(int64_t query_id, int max_buffers, int block_size,
-      RuntimeState** runtime_state);
+      RuntimeState** runtime_state, TQueryOptions* query_options = NULL);
 
   /// Create multiple separate RuntimeStates with associated block managers, e.g. as if
   /// multiple queries were executing. The RuntimeStates are owned by TestEnv.
@@ -65,8 +65,9 @@ class TestEnv {
   /// Recreate global metric groups.
   void InitMetrics();
 
-  /// Create a new RuntimeState sharing global environment.
-  RuntimeState* CreateRuntimeState(int64_t query_id);
+  /// Create a new RuntimeState sharing global environment with given query options
+  RuntimeState* CreateRuntimeState(int64_t query_id,
+      TQueryOptions* query_options = NULL);
 
   /// Global state for test environment.
   static boost::scoped_ptr<MetricGroup> static_metrics_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 9f47018..523dcff 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -24,6 +24,7 @@
 #include "common/init.h"
 #include "runtime/tmp-file-mgr.h"
 #include "service/fe-support.h"
+#include "testutil/test-macros.h"
 #include "util/filesystem-util.h"
 #include "util/metrics.h"
 
@@ -62,6 +63,12 @@ class TmpFileMgrTest : public ::testing::Test {
     }
   }
 
+  void RemoveAndCreateDirs(const vector<string>& dirs) {
+    for (const string& dir: dirs) {
+      ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(dir));
+    }
+  }
+
   scoped_ptr<MetricGroup> metrics_;
 };
 
@@ -69,15 +76,15 @@ class TmpFileMgrTest : public ::testing::Test {
 /// at the expected file offsets and expands the temporary file to the correct size.
 TEST_F(TmpFileMgrTest, TestFileAllocation) {
   TmpFileMgr tmp_file_mgr;
-  EXPECT_TRUE(tmp_file_mgr.Init(metrics_.get()).ok());
+  ASSERT_OK(tmp_file_mgr.Init(metrics_.get()));
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr);
   // Default configuration should give us one temporary device.
   EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices());
   vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr.active_tmp_devices();
   EXPECT_EQ(1, tmp_devices.size());
   TUniqueId id;
   TmpFileMgr::File *file;
-  Status status = tmp_file_mgr.GetFile(tmp_devices[0], id, &file);
-  EXPECT_TRUE(status.ok());
+  ASSERT_OK(file_group.NewFile(tmp_devices[0], id, &file));
   EXPECT_TRUE(file != NULL);
   // Apply writes of variable sizes and check space was allocated correctly.
   int64_t write_sizes[] = {
@@ -87,15 +94,13 @@ TEST_F(TmpFileMgrTest, TestFileAllocation) {
   int64_t next_offset = 0;
   for (int i = 0; i < num_write_sizes; ++i) {
     int64_t offset;
-    status = file->AllocateSpace(write_sizes[i], &offset);
-    EXPECT_TRUE(status.ok());
+    ASSERT_OK(file->AllocateSpace(write_sizes[i], &offset));
     EXPECT_EQ(next_offset, offset);
     next_offset = offset + write_sizes[i];
     EXPECT_EQ(next_offset, boost::filesystem::file_size(file->path()));
   }
   // Check that cleanup is correct.
-  status = file->Remove();
-  EXPECT_TRUE(status.ok());
+  file_group.Close();
   EXPECT_FALSE(boost::filesystem::exists(file->path()));
   CheckMetrics(&tmp_file_mgr);
 }
@@ -103,14 +108,11 @@ TEST_F(TmpFileMgrTest, TestFileAllocation) {
 /// Test that we can do initialization with two directories on same device and
 /// that validations prevents duplication of directories.
 TEST_F(TmpFileMgrTest, TestOneDirPerDevice) {
-  vector<string> tmp_dirs;
-  tmp_dirs.push_back("/tmp/tmp-file-mgr-test.1");
-  tmp_dirs.push_back("/tmp/tmp-file-mgr-test.2");
-  for (int i = 0; i < tmp_dirs.size(); ++i) {
-    EXPECT_TRUE(FileSystemUtil::RemoveAndCreateDirectory(tmp_dirs[i]).ok());
-  }
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
+  RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, true, metrics_.get());
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr);
 
   // Only the first directory should be used.
   EXPECT_EQ(1, tmp_file_mgr.num_active_tmp_devices());
@@ -118,23 +120,21 @@ TEST_F(TmpFileMgrTest, TestOneDirPerDevice) {
   EXPECT_EQ(1, devices.size());
   TUniqueId id;
   TmpFileMgr::File *file;
-  EXPECT_TRUE(tmp_file_mgr.GetFile(devices[0], id, &file).ok());
+  ASSERT_OK(file_group.NewFile(devices[0], id, &file));
   // Check the prefix is the expected temporary directory.
   EXPECT_EQ(0, file->path().find(tmp_dirs[0]));
   FileSystemUtil::RemovePaths(tmp_dirs);
+  file_group.Close();
   CheckMetrics(&tmp_file_mgr);
 }
 
 /// Test that we can do custom initialization with two dirs on same device.
 TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
-  vector<string> tmp_dirs;
-  tmp_dirs.push_back("/tmp/tmp-file-mgr-test.1");
-  tmp_dirs.push_back("/tmp/tmp-file-mgr-test.2");
-  for (int i = 0; i < tmp_dirs.size(); ++i) {
-    EXPECT_TRUE(FileSystemUtil::RemoveAndCreateDirectory(tmp_dirs[i]).ok());
-  }
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
+  RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr);
 
   // Both directories should be used.
   EXPECT_EQ(2, tmp_file_mgr.num_active_tmp_devices());
@@ -144,25 +144,23 @@ TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
     EXPECT_EQ(0, tmp_file_mgr.GetTmpDirPath(devices[i]).find(tmp_dirs[i]));
     TUniqueId id;
     TmpFileMgr::File *file;
-    EXPECT_TRUE(tmp_file_mgr.GetFile(devices[i], id, &file).ok());
+    ASSERT_OK(file_group.NewFile(devices[i], id, &file));
     // Check the prefix is the expected temporary directory.
     EXPECT_EQ(0, file->path().find(tmp_dirs[i]));
   }
   FileSystemUtil::RemovePaths(tmp_dirs);
+  file_group.Close();
   CheckMetrics(&tmp_file_mgr);
 }
 
 /// Test that reporting a write error is possible but does not result in
 /// blacklisting, which is disabled.
 TEST_F(TmpFileMgrTest, TestReportError) {
-  vector<string> tmp_dirs;
-  tmp_dirs.push_back("/tmp/tmp-file-mgr-test.1");
-  tmp_dirs.push_back("/tmp/tmp-file-mgr-test.2");
-  for (int i = 0; i < tmp_dirs.size(); ++i) {
-    EXPECT_TRUE(FileSystemUtil::RemoveAndCreateDirectory(tmp_dirs[i]).ok());
-  }
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
+  RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr);
 
   // Both directories should be used.
   vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.active_tmp_devices();
@@ -173,7 +171,7 @@ TEST_F(TmpFileMgrTest, TestReportError) {
   TUniqueId id;
   int good_device = 0, bad_device = 1;
   TmpFileMgr::File* bad_file;
-  EXPECT_TRUE(tmp_file_mgr.GetFile(devices[bad_device], id, &bad_file).ok());
+  ASSERT_OK(file_group.NewFile(devices[bad_device], id, &bad_file));
   ErrorMsg errmsg(TErrorCode::GENERAL, "A fake error");
   bad_file->ReportIOError(errmsg);
 
@@ -187,34 +185,35 @@ TEST_F(TmpFileMgrTest, TestReportError) {
 
   // Attempts to expand bad file should succeed.
   int64_t offset;
-  EXPECT_TRUE(bad_file->AllocateSpace(128, &offset).ok());
-  EXPECT_TRUE(bad_file->Remove().ok());
+  ASSERT_OK(bad_file->AllocateSpace(128, &offset));
   // The good device should still be usable.
   TmpFileMgr::File* good_file;
-  EXPECT_TRUE(tmp_file_mgr.GetFile(devices[good_device], id, &good_file).ok());
+  ASSERT_OK(file_group.NewFile(devices[good_device], id, &good_file));
   EXPECT_TRUE(good_file != NULL);
-  EXPECT_TRUE(good_file->AllocateSpace(128, &offset).ok());
+  ASSERT_OK(good_file->AllocateSpace(128, &offset));
   // Attempts to allocate new files on bad device should succeed.
-  EXPECT_TRUE(tmp_file_mgr.GetFile(devices[bad_device], id, &bad_file).ok());
+  ASSERT_OK(file_group.NewFile(devices[bad_device], id, &bad_file));
   FileSystemUtil::RemovePaths(tmp_dirs);
+  file_group.Close();
   CheckMetrics(&tmp_file_mgr);
 }
 
 TEST_F(TmpFileMgrTest, TestAllocateFails) {
   string tmp_dir("/tmp/tmp-file-mgr-test.1");
   string scratch_subdir = tmp_dir + "/impala-scratch";
-  vector<string> tmp_dirs(1, tmp_dir);
-  EXPECT_TRUE(FileSystemUtil::RemoveAndCreateDirectory(tmp_dir).ok());
+  vector<string> tmp_dirs({tmp_dir});
+  RemoveAndCreateDirs(tmp_dirs);
   TmpFileMgr tmp_file_mgr;
   tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr);
 
   TUniqueId id;
   TmpFileMgr::File* allocated_file1;
   TmpFileMgr::File* allocated_file2;
   int64_t offset;
-  EXPECT_TRUE(tmp_file_mgr.GetFile(0, id, &allocated_file1).ok());
-  EXPECT_TRUE(tmp_file_mgr.GetFile(0, id, &allocated_file2).ok());
-  EXPECT_TRUE(allocated_file1->AllocateSpace(1, &offset).ok());
+  ASSERT_OK(file_group.NewFile(0, id, &allocated_file1));
+  ASSERT_OK(file_group.NewFile(0, id, &allocated_file2));
+  ASSERT_OK(allocated_file1->AllocateSpace(1, &offset));
 
   // Make scratch non-writable and test for allocation errors at different stages:
   // new file creation, files with no allocated blocks. files with allocated space.
@@ -225,10 +224,50 @@ TEST_F(TmpFileMgrTest, TestAllocateFails) {
   EXPECT_FALSE(allocated_file2->AllocateSpace(1, &offset).ok());
   // Creating a new File object can succeed because it is not immediately created on disk.
   TmpFileMgr::File* unallocated_file;
-  EXPECT_TRUE(tmp_file_mgr.GetFile(0, id, &unallocated_file).ok());
+  ASSERT_OK(file_group.NewFile(0, id, &unallocated_file));
 
   chmod(scratch_subdir.c_str(), S_IRWXU);
   FileSystemUtil::RemovePaths(tmp_dirs);
+  file_group.Close();
+}
+
+// Test scratch limit is applied correctly to group of files.
+TEST_F(TmpFileMgrTest, TestScratchLimit) {
+  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
+  RemoveAndCreateDirs(tmp_dirs);
+  TmpFileMgr tmp_file_mgr;
+  tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get());
+
+  const int64_t LIMIT = 100;
+  const int64_t FILE1_ALLOC = 25;
+  const int64_t FILE2_ALLOC = LIMIT - FILE1_ALLOC;
+  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, LIMIT);
+  TmpFileMgr::File* file1;
+  TmpFileMgr::File* file2;
+  TUniqueId id;
+  ASSERT_OK(file_group.NewFile(0, id, &file1));
+  ASSERT_OK(file_group.NewFile(1, id, &file2));
+
+  // Test individual limit is enforced.
+  Status status;
+  int64_t offset;
+  status = file1->AllocateSpace(LIMIT + 1, &offset);
+  ASSERT_FALSE(status.ok());
+  ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
+  ASSERT_OK(file1->AllocateSpace(FILE1_ALLOC, &offset));
+  ASSERT_EQ(0, offset);
+
+  // Test aggregate limit is enforced.
+  status = file2->AllocateSpace(FILE2_ALLOC + 1, &offset);
+  ASSERT_FALSE(status.ok());
+  ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
+  ASSERT_OK(file2->AllocateSpace(FILE2_ALLOC, &offset));
+  ASSERT_EQ(0, offset);
+  status = file2->AllocateSpace(1, &offset);
+  ASSERT_FALSE(status.ok());
+  ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
+
+  file_group.Close();
 }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 73596ab..b616d18 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -136,11 +136,12 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dirs, bool one_dir_per_d
   return Status::OK();
 }
 
-Status TmpFileMgr::GetFile(const DeviceId& device_id, const TUniqueId& query_id,
-    File** new_file) {
+Status TmpFileMgr::NewFile(FileGroup* file_group, const DeviceId& device_id,
+    const TUniqueId& query_id, File** new_file) {
   DCHECK(initialized_);
   DCHECK_GE(device_id, 0);
   DCHECK_LT(device_id, tmp_dirs_.size());
+  DCHECK(file_group != NULL);
   if (IsBlacklisted(device_id)) {
     return Status(TErrorCode::TMP_DEVICE_BLACKLISTED, tmp_dirs_[device_id].path());
   }
@@ -152,7 +153,7 @@ Status TmpFileMgr::GetFile(const DeviceId& device_id, const TUniqueId& query_id,
   path new_file_path(tmp_dirs_[device_id].path());
   new_file_path /= file_name.str();
 
-  *new_file = new File(this, device_id, new_file_path.string());
+  *new_file = new File(this, file_group, device_id, new_file_path.string());
   return Status::OK();
 }
 
@@ -209,16 +210,24 @@ vector<TmpFileMgr::DeviceId> TmpFileMgr::active_tmp_devices() {
   return devices;
 }
 
-TmpFileMgr::File::File(TmpFileMgr* mgr, DeviceId device_id, const string& path)
+TmpFileMgr::File::File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId device_id,
+    const string& path)
   : mgr_(mgr),
+    file_group_(file_group),
     path_(path),
     device_id_(device_id),
     current_size_(0),
     blacklisted_(false) {
+  DCHECK(file_group != NULL);
 }
 
 Status TmpFileMgr::File::AllocateSpace(int64_t write_size, int64_t* offset) {
   DCHECK_GT(write_size, 0);
+  if (file_group_->bytes_limit_ != -1 &&
+      file_group_->current_bytes_allocated_ + write_size
+      > file_group_->bytes_limit_) {
+    return Status(TErrorCode::SCRATCH_LIMIT_EXCEEDED, file_group_->bytes_limit_);
+  }
   Status status;
   if (mgr_->IsBlacklisted(device_id_)) {
     blacklisted_ = true;
@@ -240,6 +249,7 @@ Status TmpFileMgr::File::AllocateSpace(int64_t write_size, int64_t* offset) {
     return status;
   }
   *offset = current_size_;
+  file_group_->current_bytes_allocated_ += write_size;
   current_size_ = new_size;
   return Status::OK();
 }
@@ -256,4 +266,31 @@ Status TmpFileMgr::File::Remove() {
   return Status::OK();
 }
 
+TmpFileMgr::FileGroup::FileGroup(TmpFileMgr* tmp_file_mgr, int64_t bytes_limit)
+  : tmp_file_mgr_(tmp_file_mgr),
+    current_bytes_allocated_(0),
+    bytes_limit_(bytes_limit) {
+  DCHECK(tmp_file_mgr != NULL);
+}
+
+Status TmpFileMgr::FileGroup::NewFile(const DeviceId& device_id,
+    const TUniqueId& query_id, File** new_file) {
+  TmpFileMgr::File* tmp_file;
+  RETURN_IF_ERROR(tmp_file_mgr_->NewFile(this, device_id, query_id, &tmp_file));
+  tmp_files_.emplace_back(tmp_file);
+  if (new_file != NULL) *new_file = tmp_file;
+  return Status::OK();
+}
+
+void TmpFileMgr::FileGroup::Close() {
+  for (std::unique_ptr<TmpFileMgr::File>& file: tmp_files_) {
+    Status status = file->Remove();
+    if (!status.ok()) {
+      LOG(WARNING) << "Error removing scratch file '" << file->path() << "': "
+                   << status.msg().msg();
+    }
+  }
+  tmp_files_.clear();
+}
+
 } //namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index d085f2c..bd6c366 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -28,12 +28,20 @@ namespace impala {
 /// TmpFileMgr creates and manages temporary files and directories on the local
 /// filesystem. It can manage multiple temporary directories across multiple devices.
 /// TmpFileMgr ensures that at most one directory per device is used unless overridden
-/// for testing. GetFile() returns a File handle with a unique filename on a device. The
-/// client owns the File handle and can use it to expand the file.
+/// for testing.
+///
+/// Every temporary File belongs to a FileGroup: to allocate temporary files, first a
+/// FileGroup is created, then FileGroup::NewFile() is called to create a new File with
+/// a unique filename on the specified temporary device. The client can use the File
+/// handle to allocate space in the file. FileGroups can be created with a limit on
+/// the total number of bytes allocated across all files in the group.
+///
 /// TODO: we could notify block managers about the failure so they can more take
 /// proactive action to avoid using the device.
 class TmpFileMgr {
  public:
+  class FileGroup;
+
   /// DeviceId is a unique identifier for a temporary device managed by TmpFileMgr.
   /// It is used as a handle for external classes to identify devices.
   typedef int DeviceId;
@@ -43,28 +51,31 @@ class TmpFileMgr {
   /// Creation of the file is deferred until the first call to AllocateSpace().
   class File {
    public:
-    /// Allocates 'write_size' bytes in this file for a new block of data.
+    /// Allocates 'write_size' bytes in this file for a new block of data only if it
+    /// does not cross the allocation limit of its associated FileGroup.
     /// The file size is increased by a call to truncate() if necessary.
     /// The physical file is created on the first call to AllocateSpace().
     /// Returns Status::OK() and sets offset on success.
-    /// Returns an error status if an unexpected error occurs.
+    /// Returns an error status if an unexpected error occurs or if allowing the
+    /// allocation would exceed the allocation limit of its associated FileGroup.
     /// If an error status is returned, the caller can try a different temporary file.
     Status AllocateSpace(int64_t write_size, int64_t* offset);
 
     /// Called to notify TmpFileMgr that an IO error was encountered for this file
     void ReportIOError(const ErrorMsg& msg);
 
-    /// Delete the physical file on disk, if one was created.
-    /// It is not valid to read or write to a file after calling Remove().
-    Status Remove();
-
     const std::string& path() const { return path_; }
     int disk_id() const { return disk_id_; }
     bool is_blacklisted() const { return blacklisted_; }
 
    private:
+    friend class FileGroup;
     friend class TmpFileMgr;
 
+    /// Delete the physical file on disk, if one was created.
+    /// It is not valid to read or write to a file after calling Remove().
+    Status Remove();
+
     /// The name of the sub-directory that Impala created within each configured scratch
     /// directory.
     const static std::string TMP_SUB_DIR_NAME;
@@ -73,11 +84,15 @@ class TmpFileMgr {
     /// directory. A warning is issued if available space is less than this threshold.
     const static uint64_t AVAILABLE_SPACE_THRESHOLD_MB;
 
-    File(TmpFileMgr* mgr, DeviceId device_id, const std::string& path);
+    File(TmpFileMgr* mgr, FileGroup* file_group, DeviceId device_id,
+        const std::string& path);
 
     /// TmpFileMgr this belongs to.
     TmpFileMgr* mgr_;
 
+    /// The FileGroup this belongs to. Cannot be null.
+    FileGroup* file_group_;
+
     /// Path of the physical file in the filesystem.
     std::string path_;
 
@@ -97,6 +112,56 @@ class TmpFileMgr {
     bool blacklisted_;
   };
 
+  /// Represents a group of files. The total allocated bytes of the group can be bound by
+  /// setting the space allocation limit. The owner of the FileGroup object is
+  /// responsible for calling the Close method to delete all the files in the group.
+  class FileGroup {
+  public:
+    FileGroup(TmpFileMgr* tmp_file_mgr, int64_t bytes_limit = -1);
+
+    ~FileGroup(){
+      DCHECK_EQ(NumFiles(), 0);
+    }
+
+    /// Creates a new File with a unique path for a query instance, adds it to the
+    /// group and returns a handle for that file. The file path is within the (single)
+    /// tmp directory on the specified device id.
+    /// If an error is encountered, e.g. the device is blacklisted, the file is not
+    /// added to this group and a non-ok status is returned.
+    Status NewFile(const DeviceId& device_id, const TUniqueId& query_id,
+        File** new_file = NULL);
+
+    /// Returns a file handle at the specified index in the group.
+    File* GetFileAt(int index) {
+      DCHECK_GE(index, 0);
+      DCHECK_LT(index, NumFiles());
+      return tmp_files_[index].get();
+    }
+
+    /// Calls Remove() on all the files in the group and deletes them.
+    void Close();
+
+    /// Returns the number of files that are a part of the group.
+    int NumFiles() {
+      return tmp_files_.size();
+    }
+
+  private:
+    friend class File;
+
+    /// The TmpFileMgr it is associated with.
+    TmpFileMgr* tmp_file_mgr_;
+
+    /// List of files representing the FileGroup.
+    std::vector<std::unique_ptr<File>> tmp_files_;
+
+    /// Total space allocated in this group's files.
+    int64_t current_bytes_allocated_;
+
+    /// Max write space allowed (-1 means no limit).
+    int64_t bytes_limit_;
+  };
+
   TmpFileMgr();
 
   /// Creates the configured tmp directories. If multiple directories are specified per
@@ -109,13 +174,6 @@ class TmpFileMgr {
   Status InitCustom(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device,
       MetricGroup* metrics);
 
-  /// Return a new File handle with a unique path for a query instance. The file path
-  /// is within the (single) tmp directory on the specified device id. The caller owns
-  /// the returned handle and is responsible for deleting it. The file is not created -
-  /// creation is deferred until the first call to File::AllocateSpace().
-  Status GetFile(const DeviceId& device_id, const TUniqueId& query_id,
-      File** new_file);
-
   /// Return the scratch directory path for the device.
   std::string GetTmpDirPath(DeviceId device_id) const;
 
@@ -128,6 +186,14 @@ class TmpFileMgr {
   std::vector<DeviceId> active_tmp_devices();
 
  private:
+  /// Return a new File handle with a unique path for a query instance. The file is
+  /// associated with the file_group and the file path is within the (single) tmp
+  /// directory on the specified device id. The caller owns the returned handle and is
+  /// responsible for deleting it. The file is not created - creation is deferred until
+  /// the first call to File::AllocateSpace().
+  Status NewFile(FileGroup* file_group, const DeviceId& device_id,
+      const TUniqueId& query_id, File** new_file);
+
   /// Dir stores information about a temporary directory.
   class Dir {
    public:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index ec1e632..d0e4275 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -428,6 +428,18 @@ Status impala::SetQueryOption(const string& key, const string& value,
             iequals(value, "true") || iequals(value, "1"));
         break;
       }
+      case TImpalaQueryOptions::SCRATCH_LIMIT: {
+        // Parse the scratch limit spec and validate it.
+        if (iequals(value, "-1")) {
+          query_options->__set_scratch_limit(-1);
+        } else {
+          int64_t bytes_limit;
+          RETURN_IF_ERROR(ParseMemValue(value, "Scratch space memory limit",
+              &bytes_limit));
+          query_options->__set_scratch_limit(bytes_limit);
+        }
+        break;
+      }
       default:
         // We hit this DCHECK(false) if we forgot to add the corresponding entry here
         // when we add a new query option.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 4a3b199..2c25700 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -35,7 +35,7 @@ class TQueryOptions;
 // the DCHECK.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::STRICT_MODE + 1);\
+      TImpalaQueryOptions::SCRATCH_LIMIT + 1);\
   QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR)\
   QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -84,7 +84,8 @@ class TQueryOptions;
   QUERY_OPT_FN(runtime_filter_min_size, RUNTIME_FILTER_MIN_SIZE)\
   QUERY_OPT_FN(runtime_filter_max_size, RUNTIME_FILTER_MAX_SIZE)\
   QUERY_OPT_FN(prefetch_mode, PREFETCH_MODE)\
-  QUERY_OPT_FN(strict_mode, STRICT_MODE);
+  QUERY_OPT_FN(strict_mode, STRICT_MODE)\
+  QUERY_OPT_FN(scratch_limit, SCRATCH_LIMIT);
 
 
 /// Converts a TQueryOptions struct into a map of key, value pairs.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 003a618..fdb8aa8 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -203,6 +203,9 @@ struct TQueryOptions {
 
   // Additional strict handling of invalid data parsing and type conversions.
   49: optional bool strict_mode = false
+
+  // A limit on the amount of scratch directory space that can be used;
+  50: optional i64 scratch_limit = -1
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 25dfddf..ae00ea2 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -233,7 +233,12 @@ enum TImpalaQueryOptions {
   PREFETCH_MODE,
 
   // Additional strict handling of invalid data parsing and type conversions.
-  STRICT_MODE
+  STRICT_MODE,
+
+  // A limit on the amount of scratch directory space that can be used;
+  // Unspecified or a limit of -1 means no limit;
+  // Otherwise specified in the same way as MEM_LIMIT.
+  SCRATCH_LIMIT
 }
 
 // The summary of an insert.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 61b478b..3d48005 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -280,7 +280,10 @@ error_codes = (
    "data of type $1: $2"),
 
   ("TEXT_PARSER_TRUNCATED_COLUMN", 90, "Length of column is $0 which exceeds maximum "
-   "supported length of 2147483647 bytes.")
+   "supported length of 2147483647 bytes."),
+
+  ("SCRATCH_LIMIT_EXCEEDED", 91, "Scratch space limit of $0 bytes exceeded for query "
+   "while spilling data to disk.")
 )
 
 import sys

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/9313dcdb/tests/query_test/test_scratch_limit.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scratch_limit.py b/tests/query_test/test_scratch_limit.py
new file mode 100644
index 0000000..f2cec78
--- /dev/null
+++ b/tests/query_test/test_scratch_limit.py
@@ -0,0 +1,105 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.test_dimensions import create_single_exec_option_dimension
+from tests.common.test_dimensions import create_uncompressed_text_dimension
+
+class TestScratchLimit(ImpalaTestSuite):
+  """
+  This class tests the functionality of setting the scratch limit as a query option
+  """
+
+  spill_query = """
+      select o_orderdate, o_custkey, o_comment
+      from tpch.orders
+      order by o_orderdate
+      """
+
+  # Block manager memory limit that is low enough to
+  # force Impala to spill to disk when executing 'spill_query'
+  max_block_mgr_memory = "64m"
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestScratchLimit, cls).add_test_dimensions()
+    # There is no reason to run these tests using all dimensions.
+    cls.TestMatrix.add_dimension(create_single_exec_option_dimension())
+    cls.TestMatrix.add_dimension(
+        create_uncompressed_text_dimension(cls.get_workload()))
+
+  def test_with_high_scratch_limit(self, vector):
+    """
+    Query runs to completion with a scratch limit well above
+    its required scratch space which in this case is 128m.
+    """
+    exec_option = vector.get_value('exec_option')
+    exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+    exec_option['scratch_limit'] = '500m'
+    self.execute_query_expect_success(self.client, self.spill_query, exec_option)
+
+  def test_with_low_scratch_limit(self, vector):
+    """
+    Query throws the appropriate exception with a scratch limit well below
+    its required scratch space which in this case is 128m.
+    """
+    exec_option = vector.get_value('exec_option')
+    exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+    exec_option['scratch_limit'] = '50m'
+    expected_error = 'Scratch space limit of %s bytes exceeded'
+    scratch_limit_in_bytes = 50 * 1024 * 1024
+    try:
+      self.execute_query(self.spill_query, exec_option)
+      assert False, "Query was expected to fail"
+    except ImpalaBeeswaxException as e:
+      assert expected_error % scratch_limit_in_bytes in str(e)
+
+  def test_with_zero_scratch_limit(self, vector):
+    """
+    Query throws the appropriate exception with a scratch limit of
+    zero which means no scratch space can be allocated.
+    """
+    exec_option = vector.get_value('exec_option')
+    exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+    exec_option['scratch_limit'] = '0'
+    self.execute_query_expect_failure(self.spill_query, exec_option)
+
+  def test_with_unlimited_scratch_limit(self, vector):
+    """
+    Query runs to completion with a scratch Limit of -1 means default/no limit.
+    """
+    exec_option = vector.get_value('exec_option')
+    exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+    exec_option['scratch_limit'] = '-1'
+    self.execute_query_expect_success(self.client, self.spill_query, exec_option)
+
+  def test_without_specifying_scratch_limit(self, vector):
+    """
+    Query runs to completion with the default setting of no scratch limit.
+    """
+    exec_option = vector.get_value('exec_option')
+    exec_option['max_block_mgr_memory'] = self.max_block_mgr_memory
+    self.execute_query_expect_success(self.client, self.spill_query, exec_option)
+
+  def test_with_zero_scratch_limit_no_memory_limit(self, vector):
+    """
+    Query runs to completion without spilling as there is no limit on block memory manger.
+    Scratch limit of zero ensures spilling is disabled.
+    """
+    exec_option = vector.get_value('exec_option')
+    exec_option['scratch_limit'] = '0'
+    self.execute_query_expect_success(self.client, self.spill_query, exec_option)



[2/3] incubator-impala git commit: IMPALA-4193: Warn when benchmarks run with sub-optimal CPU settings

Posted by he...@apache.org.
IMPALA-4193: Warn when benchmarks run with sub-optimal CPU settings

Change-Id: I5e879cb35cf736f6112c1caed829722a38849794
Reviewed-on: http://gerrit.cloudera.org:8080/4528
Reviewed-by: Jim Apple <jb...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/435aec54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/435aec54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/435aec54

Branch: refs/heads/master
Commit: 435aec5489338d35c64e6aaa748fd6cad8283215
Parents: 9313dcd
Author: Lars Volker <lv...@cloudera.com>
Authored: Fri Sep 23 12:24:30 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sat Sep 24 03:58:38 2016 +0000

----------------------------------------------------------------------
 be/src/util/benchmark.cc |  2 ++
 be/src/util/cpu-info.cc  | 39 +++++++++++++++++++++++++++++++++++++--
 be/src/util/cpu-info.h   |  7 +++++++
 3 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/435aec54/be/src/util/benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/util/benchmark.cc b/be/src/util/benchmark.cc
index 6f7901f..0001398 100644
--- a/be/src/util/benchmark.cc
+++ b/be/src/util/benchmark.cc
@@ -68,6 +68,8 @@ Benchmark::Benchmark(const string& name) : name_(name) {
 #ifndef NDEBUG
   LOG(ERROR) << "WARNING: Running benchmark in DEBUG mode.";
 #endif
+  CpuInfo::VerifyPerformanceGovernor();
+  CpuInfo::VerifyTurboDisabled();
 }
 
 int Benchmark::AddBenchmark(const string& name, BenchmarkFunction fn, void* args,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/435aec54/be/src/util/cpu-info.cc
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.cc b/be/src/util/cpu-info.cc
index 2a98102..ec667c4 100644
--- a/be/src/util/cpu-info.cc
+++ b/be/src/util/cpu-info.cc
@@ -45,6 +45,21 @@ DEFINE_int32(num_cores, 0, "(Advanced) If > 0, it sets the number of cores avail
     " Impala. Setting it to 0 means Impala will use all available cores on the machine"
     " according to /proc/cpuinfo.");
 
+namespace {
+// Helper function to warn if a given file does not contain an expected string as its
+// first line. If the file cannot be opened, no error is reported.
+void WarnIfFileNotEqual(
+    const string& filename, const string& expected, const string& warning_text) {
+  ifstream file(filename);
+  if (!file) return;
+  string line;
+  getline(file, line);
+  if (line != expected) {
+    LOG(ERROR) << "Expected " << expected << ", actual " << line << endl << warning_text;
+  }
+}
+} // end anonymous namespace
+
 namespace impala {
 
 bool CpuInfo::initialized_ = false;
@@ -90,7 +105,7 @@ void CpuInfo::Init() {
   int num_cores = 0;
 
   // Read from /proc/cpuinfo
-  ifstream cpuinfo("/proc/cpuinfo", ios::in);
+  ifstream cpuinfo("/proc/cpuinfo");
   while (cpuinfo) {
     getline(cpuinfo, line);
     size_t colon = line.find(':');
@@ -115,7 +130,6 @@ void CpuInfo::Init() {
       }
     }
   }
-  if (cpuinfo.is_open()) cpuinfo.close();
 
   if (max_mhz != 0) {
     cycles_per_ms_ = max_mhz * 1000;
@@ -142,6 +156,27 @@ void CpuInfo::VerifyCpuRequirements() {
   }
 }
 
+void CpuInfo::VerifyPerformanceGovernor() {
+  for (int cpu_id = 0; cpu_id < CpuInfo::num_cores(); ++cpu_id) {
+    const string governor_file =
+        Substitute("/sys/devices/system/cpu/cpu$0/cpufreq/scaling_governor", cpu_id);
+    const string warning_text = Substitute(
+        "WARNING: CPU $0 is not using 'performance' governor. Note that changing the "
+        "governor to 'performance' will reset the no_turbo setting to 0.",
+        cpu_id);
+    WarnIfFileNotEqual(governor_file, "performance", warning_text);
+  }
+}
+
+void CpuInfo::VerifyTurboDisabled() {
+  WarnIfFileNotEqual("/sys/devices/system/cpu/intel_pstate/no_turbo", "1",
+      "WARNING: CPU turbo is enabled. This setting can change the clock frequency of CPU "
+      "cores during the benchmark run, which can lead to inaccurate results. You can "
+      "disable CPU turbo by writing a 1 to "
+      "/sys/devices/system/cpu/intel_pstate/no_turbo. Note that changing the governor to "
+      "'performance' will reset this to 0.");
+}
+
 void CpuInfo::EnableFeature(long flag, bool enable) {
   DCHECK(initialized_);
   if (!enable) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/435aec54/be/src/util/cpu-info.h
----------------------------------------------------------------------
diff --git a/be/src/util/cpu-info.h b/be/src/util/cpu-info.h
index ac7f0c1..cb577c2 100644
--- a/be/src/util/cpu-info.h
+++ b/be/src/util/cpu-info.h
@@ -53,6 +53,13 @@ class CpuInfo {
   /// and terminate.
   static void VerifyCpuRequirements();
 
+  /// Determine if the CPU scaling governor is set to 'performance' and if not, issue an
+  /// error.
+  static void VerifyPerformanceGovernor();
+
+  /// Determine if CPU turbo is disabled and if not, issue an error.
+  static void VerifyTurboDisabled();
+
   /// Returns all the flags for this cpu
   static int64_t hardware_flags() {
     DCHECK(initialized_);


[3/3] incubator-impala git commit: IMPALA-4187: Switch RPC latency metrics to histograms

Posted by he...@apache.org.
IMPALA-4187: Switch RPC latency metrics to histograms

It's usually better to measure latency distributions with histograms,
not avg / min / max. This change switches out the metrics that measure
the latency of RPC processing time to histograms.

Add HistogramMetric::Reset() to allow histogram to remove all
entries. Added spinlock around histogram access.

On a 8-core i7 @ 3.4GHz, the following throughputs were observed:

1 thread -> 25M updates/sec
4 threads -> 7M updates/sec
16 threads -> 5M updates/sec

Each histogram takes ~108KB of storage for its buckets. This can be
reduced by reducing the maximum value, currently 60 minutes.

The new metrics have the following text representation:

Count: 148, 25th %-ile: 0, 50th %-ile: 0, 75th %-ile: 0, 90th %-ile: 0,
95th %-ile: 0, 99.9th %-ile: 1ms

Which changes from the old:

count: 345, last: 6ms, min: 0, max: 12ms, mean: 1ms, stddev: 1ms

Change-Id: I9ba6d4270dd5676eeeff35ad8d9dc5dcddd95e34
Reviewed-on: http://gerrit.cloudera.org:8080/4516
Reviewed-by: Henry Robinson <he...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/872d5462
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/872d5462
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/872d5462

Branch: refs/heads/master
Commit: 872d5462b9a17d9f2d9edeb08728e29d6eb6eca6
Parents: 435aec5
Author: Henry Robinson <he...@cloudera.com>
Authored: Wed Sep 21 16:29:57 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Mon Sep 26 22:07:17 2016 +0000

----------------------------------------------------------------------
 be/src/rpc/rpc-trace.cc        | 22 +++++++----
 be/src/rpc/rpc-trace.h         |  6 +--
 be/src/util/histogram-metric.h | 73 ++++++++++++++++++++++++-------------
 common/thrift/metrics.json     |  2 +-
 4 files changed, 65 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/872d5462/be/src/rpc/rpc-trace.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-trace.cc b/be/src/rpc/rpc-trace.cc
index 407c570..f4da19b 100644
--- a/be/src/rpc/rpc-trace.cc
+++ b/be/src/rpc/rpc-trace.cc
@@ -33,7 +33,7 @@ using namespace rapidjson;
 using namespace strings;
 
 // Metric key format for rpc call duration metrics.
-const string RPC_TIME_STATS_METRIC_KEY = "rpc-method.$0.call_duration";
+const string RPC_PROCESSING_TIME_DISTRIBUTION_METRIC_KEY = "rpc-method.$0.call_duration";
 
 // Singleton class to keep track of all RpcEventHandlers, and to render them to a
 // web-based summary page.
@@ -123,14 +123,14 @@ void RpcEventHandler::Reset(const string& method_name) {
   lock_guard<mutex> l(method_map_lock_);
   MethodMap::iterator it = method_map_.find(method_name);
   if (it == method_map_.end()) return;
-  it->second->time_stats->Reset();
+  it->second->processing_time_distribution->Reset();
   it->second->num_in_flight.Store(0L);
 }
 
 void RpcEventHandler::ResetAll() {
   lock_guard<mutex> l(method_map_lock_);
   for (const MethodMap::value_type& method: method_map_) {
-    method.second->time_stats->Reset();
+    method.second->processing_time_distribution->Reset();
     method.second->num_in_flight.Store(0L);
   }
 }
@@ -149,7 +149,8 @@ void RpcEventHandler::ToJson(Value* server, Document* document) {
     Value method(kObjectType);
     Value method_name(rpc.first.c_str(), document->GetAllocator());
     method.AddMember("name", method_name, document->GetAllocator());
-    const string& human_readable = rpc.second->time_stats->ToHumanReadable();
+    const string& human_readable =
+        rpc.second->processing_time_distribution->ToHumanReadable();
     Value summary(human_readable.c_str(), document->GetAllocator());
     method.AddMember("summary", summary, document->GetAllocator());
     method.AddMember("in_flight", rpc.second->num_in_flight.Load(),
@@ -171,9 +172,14 @@ void* RpcEventHandler::getContext(const char* fn_name, void* server_context) {
     if (it == method_map_.end()) {
       MethodDescriptor* descriptor = new MethodDescriptor();
       descriptor->name = fn_name;
-      const string& rpc_name = Substitute("$0.$1", server_name_, descriptor->name);
-      descriptor->time_stats = StatsMetric<double>::CreateAndRegister(metrics_,
-          RPC_TIME_STATS_METRIC_KEY, rpc_name);
+      const string& rpc_name = Substitute(RPC_PROCESSING_TIME_DISTRIBUTION_METRIC_KEY,
+          Substitute("$0.$1", server_name_, descriptor->name));
+      const TMetricDef& def =
+          MakeTMetricDef(rpc_name, TMetricKind::HISTOGRAM, TUnit::TIME_MS);
+      constexpr int32_t SIXTY_MINUTES_IN_MS = 60 * 1000 * 60;
+      // Store processing times of up to 60 minutes with 3 sig. fig.
+      descriptor->processing_time_distribution =
+          metrics_->RegisterMetric(new HistogramMetric(def, SIXTY_MINUTES_IN_MS, 3));
       it = method_map_.insert(make_pair(descriptor->name, descriptor)).first;
     }
   }
@@ -197,5 +203,5 @@ void RpcEventHandler::postWrite(void* ctx, const char* fn_name, uint32_t bytes)
   MethodDescriptor* descriptor = rpc_ctx->method_descriptor;
   delete rpc_ctx;
   descriptor->num_in_flight.Add(-1);
-  descriptor->time_stats->Update(elapsed_time);
+  descriptor->processing_time_distribution->Update(elapsed_time);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/872d5462/be/src/rpc/rpc-trace.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-trace.h b/be/src/rpc/rpc-trace.h
index 3ee2c6e..b2861af 100644
--- a/be/src/rpc/rpc-trace.h
+++ b/be/src/rpc/rpc-trace.h
@@ -18,8 +18,8 @@
 #ifndef IMPALA_RPC_RPC_TRACE_H
 #define IMPALA_RPC_RPC_TRACE_H
 
-#include "util/collection-metrics.h"
 #include "rpc/thrift-server.h"
+#include "util/histogram-metric.h"
 #include "util/internal-queue.h"
 
 #include <thrift/TProcessor.h>
@@ -83,8 +83,8 @@ class RpcEventHandler : public apache::thrift::TProcessorEventHandler {
     /// Name of the method
     std::string name;
 
-    /// Summary statistics for the time taken to respond to this method
-    StatsMetric<double>* time_stats;
+    /// Distribution of the time taken to process this RPC.
+    HistogramMetric* processing_time_distribution;
 
     /// Number of invocations in flight
     AtomicInt32 num_in_flight;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/872d5462/be/src/util/histogram-metric.h
----------------------------------------------------------------------
diff --git a/be/src/util/histogram-metric.h b/be/src/util/histogram-metric.h
index 211deee..5a61648 100644
--- a/be/src/util/histogram-metric.h
+++ b/be/src/util/histogram-metric.h
@@ -20,10 +20,12 @@
 
 #include "util/hdr-histogram.h"
 #include "util/metrics.h"
+#include "util/spinlock.h"
 
 namespace impala {
 
 /// Metric which constructs (using HdrHistogram) a histogram of a set of values.
+/// Thread-safe: histogram access protected by a spin lock.
 class HistogramMetric : public Metric {
  public:
   /// Constructs a new histogram metric. `highest_trackable_value` is the maximum value
@@ -31,27 +33,33 @@ class HistogramMetric : public Metric {
   /// that values must be stored with.
   HistogramMetric(
       const TMetricDef& def, uint64_t highest_trackable_value, int num_significant_digits)
-      : Metric(def), histogram_(highest_trackable_value, num_significant_digits),
-        unit_(def.units) { }
+    : Metric(def),
+      histogram_(new HdrHistogram(highest_trackable_value, num_significant_digits)),
+      unit_(def.units) {
+    DCHECK_EQ(TMetricKind::HISTOGRAM, def.kind);
+  }
 
   virtual void ToJson(rapidjson::Document* document, rapidjson::Value* value) {
     rapidjson::Value container(rapidjson::kObjectType);
     AddStandardFields(document, &container);
 
-    container.AddMember("25th %-ile", histogram_.ValueAtPercentile(25),
-        document->GetAllocator());
-    container.AddMember("50th %-ile", histogram_.ValueAtPercentile(50),
-        document->GetAllocator());
-    container.AddMember("75th %-ile", histogram_.ValueAtPercentile(75),
-        document->GetAllocator());
-    container.AddMember("90th %-ile", histogram_.ValueAtPercentile(90),
-        document->GetAllocator());
-    container.AddMember("95th %-ile", histogram_.ValueAtPercentile(95),
-        document->GetAllocator());
-    container.AddMember("99.9th %-ile", histogram_.ValueAtPercentile(99.9),
-        document->GetAllocator());
-    container.AddMember("count", histogram_.TotalCount(), document->GetAllocator());
-
+    {
+      boost::lock_guard<SpinLock> l(lock_);
+
+      container.AddMember(
+          "25th %-ile", histogram_->ValueAtPercentile(25), document->GetAllocator());
+      container.AddMember(
+          "50th %-ile", histogram_->ValueAtPercentile(50), document->GetAllocator());
+      container.AddMember(
+          "75th %-ile", histogram_->ValueAtPercentile(75), document->GetAllocator());
+      container.AddMember(
+          "90th %-ile", histogram_->ValueAtPercentile(90), document->GetAllocator());
+      container.AddMember(
+          "95th %-ile", histogram_->ValueAtPercentile(95), document->GetAllocator());
+      container.AddMember(
+          "99.9th %-ile", histogram_->ValueAtPercentile(99.9), document->GetAllocator());
+      container.AddMember("count", histogram_->TotalCount(), document->GetAllocator());
+    }
     rapidjson::Value type_value(PrintTMetricKind(TMetricKind::HISTOGRAM).c_str(),
         document->GetAllocator());
     container.AddMember("kind", type_value, document->GetAllocator());
@@ -61,33 +69,46 @@ class HistogramMetric : public Metric {
     *value = container;
   }
 
-  void Update(int64_t val) { histogram_.Increment(val); }
+  void Update(int64_t val) {
+    boost::lock_guard<SpinLock> l(lock_);
+    histogram_->Increment(val);
+  }
+
+  /// Reset the histogram by removing all previous entries.
+  void Reset() {
+    boost::lock_guard<SpinLock> l(lock_);
+    uint64_t highest = histogram_->highest_trackable_value();
+    int digits = histogram_->num_significant_digits();
+    histogram_.reset(new HdrHistogram(highest, digits));
+  }
 
   virtual void ToLegacyJson(rapidjson::Document*) { }
 
   const TUnit::type& unit() const { return unit_; }
 
   virtual std::string ToHumanReadable() {
+    boost::lock_guard<SpinLock> l(lock_);
     std::stringstream out;
-    out <<"Count: " << histogram_.TotalCount() << ", "
+    out << "Count: " << histogram_->TotalCount() << ", "
         << "25th %-ile: "
-        << PrettyPrinter::Print(histogram_.ValueAtPercentile(25), unit_) << ", "
+        << PrettyPrinter::Print(histogram_->ValueAtPercentile(25), unit_) << ", "
         << "50th %-ile: "
-        << PrettyPrinter::Print(histogram_.ValueAtPercentile(50), unit_) << ", "
+        << PrettyPrinter::Print(histogram_->ValueAtPercentile(50), unit_) << ", "
         << "75th %-ile: "
-        << PrettyPrinter::Print(histogram_.ValueAtPercentile(75), unit_) << ", "
+        << PrettyPrinter::Print(histogram_->ValueAtPercentile(75), unit_) << ", "
         << "90th %-ile: "
-        << PrettyPrinter::Print(histogram_.ValueAtPercentile(90), unit_) << ", "
+        << PrettyPrinter::Print(histogram_->ValueAtPercentile(90), unit_) << ", "
         << "95th %-ile: "
-        << PrettyPrinter::Print(histogram_.ValueAtPercentile(95), unit_) << ", "
+        << PrettyPrinter::Print(histogram_->ValueAtPercentile(95), unit_) << ", "
         << "99.9th %-ile: "
-        << PrettyPrinter::Print(histogram_.ValueAtPercentile(99.9), unit_);
+        << PrettyPrinter::Print(histogram_->ValueAtPercentile(99.9), unit_);
     return out.str();
   }
 
  private:
-  HdrHistogram histogram_;
-
+  // Protects histogram_ pointer itself.
+  SpinLock lock_;
+  boost::scoped_ptr<HdrHistogram> histogram_;
   const TUnit::type unit_;
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/872d5462/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index b600f80..70dbb12 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -748,7 +748,7 @@
     ],
     "label": "$0 RPC Call Duration",
     "units": "TIME_MS",
-    "kind": "STATS",
+    "kind": "HISTOGRAM",
     "key": "rpc-method.$0.call_duration"
   },
   {