You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mj...@apache.org on 2017/07/12 15:17:07 UTC

[1/3] incubator-impala git commit: IMPALA-5240: Allow config of number of disk I/O threads per disk type

Repository: incubator-impala
Updated Branches:
  refs/heads/master 39e8cf313 -> 1d8274a89


IMPALA-5240: Allow config of number of disk I/O threads per disk type

Currently Impala defaults to 8 threads per flash disk and 1 thread per
rotational disk. This can be overridden with --num_threads_per_disk,
but that sets the thread count independent of disk type.

This would allow control of the number of disk I/O threads spawned
independently for solid state disks using
"--num_io_threads_per_solid_state_disk" and for rotational disks using
"--num_io_threads_per_rotational_disk" as startup parameters.

Testing:
Added backend tests that verify if "num_threads_per_disk",
"num_io_threads_per_solid_state_disk" and
"num_io_threads_per_rotational_disk" are used to spawn threads
appropriately. Existing tests have been updated to use the new
DiskIoMgr test constructor, but retain the same behavior as before.

Additionally made some changes to fix a bug, where impala would crash
if num_disks are set more than the number of logical disks on system and
num_threads_per_disk is not set. This bug also lets the user create more
disk queues than the num of logical disks which would eventually never
be used by the actual code. Moreover, after this fix, if num_disks is
set to more than the number of logical disks then it will default to
max available disks and log a warning.

Change-Id: I094aff3747104104fe0465d73dcdbef5d9892f7c
Reviewed-on: http://gerrit.cloudera.org:8080/7232
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4655c45e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4655c45e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4655c45e

Branch: refs/heads/master
Commit: 4655c45e9a797ab48008c92f6be1fd4abc7c24a9
Parents: 39e8cf3
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Tue Jun 20 11:39:44 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Jul 12 04:29:02 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/disk-io-mgr-stress.cc            |  4 +-
 be/src/runtime/disk-io-mgr-test.cc              | 60 ++++++++++++-----
 be/src/runtime/disk-io-mgr.cc                   | 69 +++++++++++++++-----
 be/src/runtime/disk-io-mgr.h                    | 21 ++++--
 be/src/util/disk-info.h                         | 10 ++-
 be/src/util/thread.cc                           |  4 ++
 be/src/util/thread.h                            |  5 +-
 .../impala/testutil/SentryServicePinger.java    |  2 +-
 8 files changed, 125 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/runtime/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-stress.cc b/be/src/runtime/disk-io-mgr-stress.cc
index af0c841..658b747 100644
--- a/be/src/runtime/disk-io-mgr-stress.cc
+++ b/be/src/runtime/disk-io-mgr-stress.cc
@@ -71,8 +71,8 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
   LOG(INFO) << "Running with rand seed: " << rand_seed;
   srand(rand_seed);
 
-  io_mgr_.reset(new DiskIoMgr(
-      num_disks, num_threads_per_disk, MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
+  io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, num_threads_per_disk,
+      MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
   Status status = io_mgr_->Init(&mem_tracker_);
   CHECK(status.ok());
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index c66fbf1..05c99e7 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -33,6 +33,10 @@
 
 #include "common/names.h"
 
+DECLARE_int32(num_remote_hdfs_io_threads);
+DECLARE_int32(num_s3_io_threads);
+DECLARE_int32(num_adls_io_threads);
+
 using boost::condition_variable;
 
 const int MIN_BUFFER_SIZE = 512;
@@ -200,7 +204,7 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
     EXPECT_TRUE(false);
   }
 
-  scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
+  scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
   ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
   DiskIoRequestContext* reader;
@@ -208,7 +212,7 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.reset(new ObjectPool);
-      DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
+      DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init(&mem_tracker));
       DiskIoRequestContext* writer;
       io_mgr.RegisterContext(&writer, &mem_tracker);
@@ -245,7 +249,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   MemTracker mem_tracker(LARGE_MEM_LIMIT);
   num_ranges_written_ = 0;
   string tmp_file = "/non-existent/file.txt";
-  DiskIoMgr io_mgr(1, 1, 1, 10);
+  DiskIoMgr io_mgr(1, 1, 1, 1, 10);
   ASSERT_OK(io_mgr.Init(&mem_tracker));
   DiskIoRequestContext* writer;
   io_mgr.RegisterContext(&writer, NULL);
@@ -306,7 +310,7 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
     EXPECT_TRUE(false);
   }
 
-  scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 10));
+  scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
   ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
   DiskIoRequestContext* reader;
@@ -314,7 +318,7 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.reset(new ObjectPool);
-      DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 10);
+      DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init(&mem_tracker));
       DiskIoRequestContext* writer;
       io_mgr.RegisterContext(&writer, &mem_tracker);
@@ -376,7 +380,7 @@ TEST_F(DiskIoMgrTest, SingleReader) {
                     << " num_read_threads=" << num_read_threads;
 
           if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-          DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
+          DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
           ASSERT_OK(io_mgr.Init(&mem_tracker));
           MemTracker reader_mem_tracker;
@@ -430,7 +434,7 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
                   << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
 
         if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
+        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
@@ -502,8 +506,8 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
                   << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
 
         if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-        DiskIoMgr io_mgr(
-            num_disks, num_threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk,
+            MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
@@ -573,7 +577,7 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
                   << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
 
         if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, 1, 1);
+        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         MemTracker reader_mem_tracker;
@@ -638,7 +642,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     if (++iters % 1000 == 0) LOG(ERROR) << "Starting iteration " << iters;
 
     MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
-    DiskIoMgr io_mgr(1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+    DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
     ASSERT_OK(io_mgr.Init(&root_mem_tracker));
     MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
@@ -713,7 +717,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
   {
     pool_.reset(new ObjectPool);
     if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-    DiskIoMgr io_mgr(num_disks, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+    DiskIoMgr io_mgr(num_disks, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
     ASSERT_OK(io_mgr.Init(&mem_tracker));
     MemTracker reader_mem_tracker;
@@ -787,7 +791,8 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
   for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
     for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
       for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-        DiskIoMgr io_mgr(num_disks, threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+        DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
+            MAX_BUFFER_SIZE);
         ASSERT_OK(io_mgr.Init(&mem_tracker));
         for (int file_index = 0; file_index < num_contexts; ++file_index) {
           io_mgr.RegisterContext(&contexts[file_index], &mem_tracker);
@@ -900,7 +905,8 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
                     << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
           if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters;
 
-          DiskIoMgr io_mgr(num_disks, threads_per_disk, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+          DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
+              MAX_BUFFER_SIZE);
           EXPECT_OK(io_mgr.Init(&mem_tracker));
 
           for (int i = 0; i < NUM_READERS; ++i) {
@@ -952,7 +958,7 @@ TEST_F(DiskIoMgrTest, Buffers) {
   int max_buffer_size = 8 * 1024 * 1024; // 8 MB
   MemTracker root_mem_tracker(max_buffer_size * 2);
 
-  DiskIoMgr io_mgr(1, 1, min_buffer_size, max_buffer_size);
+  DiskIoMgr io_mgr(1, 1, 1, min_buffer_size, max_buffer_size);
   ASSERT_OK(io_mgr.Init(&root_mem_tracker));
   ASSERT_EQ(root_mem_tracker.consumption(), 0);
 
@@ -1029,7 +1035,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
   struct stat stat_val;
   stat(tmp_file, &stat_val);
 
-  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, read_len, read_len));
+  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
   MemTracker reader_mem_tracker;
@@ -1061,7 +1067,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
   int read_len = 4; // Make buffer size smaller than client-provided buffer.
   CreateTempFile(tmp_file, data);
 
-  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, read_len, read_len));
+  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
   // Reader doesn't need to provide mem tracker if it's providing buffers.
@@ -1101,7 +1107,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
   const char* tmp_file = "/file/that/does/not/exist";
   const int SCAN_LEN = 128;
 
-  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, SCAN_LEN, SCAN_LEN));
+  scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, SCAN_LEN, SCAN_LEN));
 
   ASSERT_OK(io_mgr->Init(&mem_tracker));
   // Reader doesn't need to provide mem tracker if it's providing buffers.
@@ -1132,6 +1138,24 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
   io_mgr.reset();
   EXPECT_EQ(mem_tracker.consumption(), 0);
 }
+
+// Test to verify configuration parameters for number of I/O threads per disk.
+TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
+  const int num_io_threads_for_remote_disks = FLAGS_num_remote_hdfs_io_threads
+      + FLAGS_num_s3_io_threads + FLAGS_num_adls_io_threads;
+
+  // Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk.
+  // Since we do not have control over which disk is used, we check for either type
+  // (rotational/solid state)
+  MemTracker mem_tracker(LARGE_MEM_LIMIT);
+  const int num_io_threads_per_rotational_or_ssd = 2;
+  DiskIoMgr io_mgr(1, num_io_threads_per_rotational_or_ssd,
+      num_io_threads_per_rotational_or_ssd, 1, 10);
+  ASSERT_OK(io_mgr.Init(&mem_tracker));
+  const int num_io_threads = io_mgr.disk_thread_group_.Size();
+  ASSERT_TRUE(num_io_threads ==
+      num_io_threads_per_rotational_or_ssd + num_io_threads_for_remote_disks);
+}
 }
 
 int main(int argc, char** argv) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index d78bad3..3393ab3 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -41,22 +41,43 @@ using namespace strings;
 DEFINE_int32(num_disks, 0, "Number of disks on data node.");
 // Default IoMgr configs:
 // The maximum number of the threads per disk is also the max queue depth per disk.
-DEFINE_int32(num_threads_per_disk, 0, "number of threads per disk");
+DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads per disk");
+
+// Rotational disks should have 1 thread per disk to minimize seeks.  Non-rotational
+// don't have this penalty and benefit from multiple concurrent IO requests.
+static const int THREADS_PER_ROTATIONAL_DISK = 1;
+static const int THREADS_PER_SOLID_STATE_DISK = 8;
+
+// The maximum number of the threads per rotational disk is also the max queue depth per
+// rotational disk.
+static const string num_io_threads_per_rotational_disk_help_msg = Substitute("Number of "
+    "I/O threads per rotational disk. Has priority over num_threads_per_disk. If neither"
+    " is set, defaults to $0 thread(s) per rotational disk", THREADS_PER_ROTATIONAL_DISK);
+DEFINE_int32(num_io_threads_per_rotational_disk, 0,
+    num_io_threads_per_rotational_disk_help_msg.c_str());
+// The maximum number of the threads per solid state disk is also the max queue depth per
+// solid state disk.
+static const string num_io_threads_per_solid_state_disk_help_msg = Substitute("Number of"
+    " I/O threads per solid state disk. Has priority over num_threads_per_disk. If "
+    "neither is set, defaults to $0 thread(s) per solid state disk",
+    THREADS_PER_SOLID_STATE_DISK);
+DEFINE_int32(num_io_threads_per_solid_state_disk, 0,
+    num_io_threads_per_solid_state_disk_help_msg.c_str());
 // The maximum number of remote HDFS I/O threads.  HDFS access that are expected to be
 // remote are placed on a separate remote disk queue.  This is the queue depth for that
 // queue.  If 0, then the remote queue is not used and instead ranges are round-robined
 // across the local disk queues.
-DEFINE_int32(num_remote_hdfs_io_threads, 8, "number of remote HDFS I/O threads");
+DEFINE_int32(num_remote_hdfs_io_threads, 8, "Number of remote HDFS I/O threads");
 // The maximum number of S3 I/O threads. The default value of 16 was chosen emperically
 // to maximize S3 throughput. Maximum throughput is achieved with multiple connections
 // open to S3 and use of multiple CPU cores since S3 reads are relatively compute
 // expensive (SSL and JNI buffer overheads).
-DEFINE_int32(num_s3_io_threads, 16, "number of S3 I/O threads");
+DEFINE_int32(num_s3_io_threads, 16, "Number of S3 I/O threads");
 // The maximum number of ADLS I/O threads. This number is a good default to have for
 // clusters that may vary widely in size, due to an undocumented concurrency limit
 // enforced by ADLS for a cluster, which spans between 500-700. For smaller clusters
 // (~10 nodes), 64 threads would be more ideal.
-DEFINE_int32(num_adls_io_threads, 16, "number of ADLS I/O threads");
+DEFINE_int32(num_adls_io_threads, 16, "Number of ADLS I/O threads");
 // The read size is the size of the reads sent to hdfs/os.
 // There is a trade off of latency and throughout, trying to keep disks busy but
 // not introduce seeks.  The literature seems to agree that with 8 MB reads, random
@@ -76,11 +97,6 @@ DEFINE_int32(max_free_io_buffers, 128,
 DEFINE_uint64(max_cached_file_handles, 20000, "Maximum number of HDFS file handles "
     "that will be cached. Disabled if set to 0.");
 
-// Rotational disks should have 1 thread per disk to minimize seeks.  Non-rotational
-// don't have this penalty and benefit from multiple concurrent IO requests.
-static const int THREADS_PER_ROTATIONAL_DISK = 1;
-static const int THREADS_PER_FLASH_DISK = 8;
-
 // The IoMgr is able to run with a wide range of memory usage. If a query has memory
 // remaining less than this value, the IoMgr will stop all buffering regardless of the
 // current queue size.
@@ -251,8 +267,19 @@ static void CheckSseSupport() {
   }
 }
 
+// Utility function to select flag that is set (has a positive value) based on precedence
+static inline int GetFirstPositiveVal(const int first_val, const int second_val,
+    const int default_val) {
+  return first_val > 0 ? first_val : (second_val > 0 ? second_val : default_val);
+}
+
 DiskIoMgr::DiskIoMgr() :
-    num_threads_per_disk_(FLAGS_num_threads_per_disk),
+    num_io_threads_per_rotational_disk_(GetFirstPositiveVal(
+        FLAGS_num_io_threads_per_rotational_disk, FLAGS_num_threads_per_disk,
+        THREADS_PER_ROTATIONAL_DISK)),
+    num_io_threads_per_solid_state_disk_(GetFirstPositiveVal(
+        FLAGS_num_io_threads_per_solid_state_disk, FLAGS_num_threads_per_disk,
+        THREADS_PER_SOLID_STATE_DISK)),
     max_buffer_size_(FLAGS_read_size),
     min_buffer_size_(FLAGS_min_buffer_size),
     shut_down_(false),
@@ -262,14 +289,22 @@ DiskIoMgr::DiskIoMgr() :
         FileSystemUtil::MaxNumFileHandles())) {
   int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
   free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
-  int num_local_disks = FLAGS_num_disks == 0 ? DiskInfo::num_disks() : FLAGS_num_disks;
+  int num_local_disks = DiskInfo::num_disks();
+  if (FLAGS_num_disks < 0 || FLAGS_num_disks > DiskInfo::num_disks()) {
+    LOG(WARNING) << "Number of disks specified should be between 0 and the number of "
+        "logical disks on the system. Defaulting to system setting of " <<
+        DiskInfo::num_disks() << " disks";
+  } else if (FLAGS_num_disks > 0) {
+    num_local_disks = FLAGS_num_disks;
+  }
   disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
   CheckSseSupport();
 }
 
-DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_disk, int min_buffer_size,
-                     int max_buffer_size) :
-    num_threads_per_disk_(threads_per_disk),
+DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
+    int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size) :
+    num_io_threads_per_rotational_disk_(threads_per_rotational_disk),
+    num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk),
     max_buffer_size_(max_buffer_size),
     min_buffer_size_(min_buffer_size),
     shut_down_(false),
@@ -346,12 +381,10 @@ Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
       num_threads_per_disk = FLAGS_num_s3_io_threads;
     } else if (i == RemoteAdlsDiskId()) {
       num_threads_per_disk = FLAGS_num_adls_io_threads;
-    } else if (num_threads_per_disk_ != 0) {
-      num_threads_per_disk = num_threads_per_disk_;
     } else if (DiskInfo::is_rotational(i)) {
-      num_threads_per_disk = THREADS_PER_ROTATIONAL_DISK;
+      num_threads_per_disk = num_io_threads_per_rotational_disk_;
     } else {
-      num_threads_per_disk = THREADS_PER_FLASH_DISK;
+      num_threads_per_disk = num_io_threads_per_solid_state_disk_;
     }
     for (int j = 0; j < num_threads_per_disk; ++j) {
       stringstream ss;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index 0212f8d..138f973 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -628,15 +628,17 @@ class DiskIoMgr : public CacheLineAligned {
     WriteDoneCallback callback_;
   };
 
-  /// Create a DiskIoMgr object.
+  /// Create a DiskIoMgr object. This constructor is only used for testing.
   ///  - num_disks: The number of disks the IoMgr should use. This is used for testing.
   ///    Specify 0, to have the disk IoMgr query the os for the number of disks.
-  ///  - threads_per_disk: number of read threads to create per disk. This is also
-  ///    the max queue depth.
+  ///  - threads_per_rotational_disk: number of read threads to create per rotational
+  ///    disk. This is also the max queue depth.
+  ///  - threads_per_solid_state_disk: number of read threads to create per solid state
+  ///    disk. This is also the max queue depth.
   ///  - min_buffer_size: minimum io buffer size (in bytes)
   ///  - max_buffer_size: maximum io buffer size (in bytes). Also the max read size.
-  DiskIoMgr(int num_disks, int threads_per_disk, int min_buffer_size,
-      int max_buffer_size);
+  DiskIoMgr(int num_disks, int threads_per_rotational_disk,
+      int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size);
 
   /// Create DiskIoMgr with default configs.
   DiskIoMgr();
@@ -817,6 +819,7 @@ class DiskIoMgr : public CacheLineAligned {
   class RequestContextCache;
 
   friend class DiskIoMgrTest_Buffers_Test;
+  friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test;
 
   /// Memory tracker for unused I/O buffers owned by DiskIoMgr.
   boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_;
@@ -826,9 +829,13 @@ class DiskIoMgr : public CacheLineAligned {
   /// provide a MemTracker.
   boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_;
 
-  /// Number of worker(read) threads per disk. Also the max depth of queued
+  /// Number of worker(read) threads per rotational disk. Also the max depth of queued
   /// work to the disk.
-  const int num_threads_per_disk_;
+  const int num_io_threads_per_rotational_disk_;
+
+  /// Number of worker(read) threads per solid state disk. Also the max depth of queued
+  /// work to the disk.
+  const int num_io_threads_per_solid_state_disk_;
 
   /// Maximum read size. This is also the maximum size of each allocated buffer.
   const int max_buffer_size_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/util/disk-info.h
----------------------------------------------------------------------
diff --git a/be/src/util/disk-info.h b/be/src/util/disk-info.h
index 323a265..edfea21 100644
--- a/be/src/util/disk-info.h
+++ b/be/src/util/disk-info.h
@@ -54,12 +54,16 @@ class DiskInfo {
     return disks_[disk_id].name;
   }
 
+  /// Returns true if the disk with disk_id exists and is a rotational disk, is false
+  /// otherwise
   static bool is_rotational(int disk_id) {
     DCHECK_GE(disk_id, 0);
-    DCHECK_LT(disk_id, disks_.size());
+    // TODO: temporarily removed DCHECK due to an issue tracked in IMPALA-5574, put it
+    // back after its resolved
+    if (disk_id >= disks_.size()) return false;
     return disks_[disk_id].is_rotational;
   }
-  
+
   static std::string DebugString();
 
  private:
@@ -75,7 +79,7 @@ class DiskInfo {
 
     bool is_rotational;
 
-    Disk(const std::string& name = "", int id = -1, bool is_rotational = true) 
+    Disk(const std::string& name = "", int id = -1, bool is_rotational = true)
       : name(name), id(id), is_rotational(is_rotational) {}
   };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/util/thread.cc
----------------------------------------------------------------------
diff --git a/be/src/util/thread.cc b/be/src/util/thread.cc
index 39a344f..c84ef0b 100644
--- a/be/src/util/thread.cc
+++ b/be/src/util/thread.cc
@@ -340,6 +340,10 @@ void ThreadGroup::JoinAll() {
   for (const Thread& thread: threads_) thread.Join();
 }
 
+int ThreadGroup::Size() const {
+  return threads_.size();
+}
+
 namespace {
 
 void RegisterUrlCallbacks(bool include_jvm_threads, Webserver* webserver) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/be/src/util/thread.h
----------------------------------------------------------------------
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index 3a96233..e21be7c 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -164,7 +164,7 @@ class Thread {
 };
 
 /// Utility class to group together a set of threads. A replacement for
-/// boost::thread_group.
+/// boost::thread_group. Not thread safe.
 class ThreadGroup {
  public:
   ThreadGroup() {}
@@ -179,6 +179,9 @@ class ThreadGroup {
   /// deadlock will predictably ensue.
   void JoinAll();
 
+  /// Returns the number of threads in the group
+  int Size() const;
+
  private:
   /// All the threads grouped by this set.
   boost::ptr_vector<Thread> threads_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4655c45e/fe/src/test/java/org/apache/impala/testutil/SentryServicePinger.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/SentryServicePinger.java b/fe/src/test/java/org/apache/impala/testutil/SentryServicePinger.java
index 9bb3a5d..96a849b 100644
--- a/fe/src/test/java/org/apache/impala/testutil/SentryServicePinger.java
+++ b/fe/src/test/java/org/apache/impala/testutil/SentryServicePinger.java
@@ -76,7 +76,7 @@ public class SentryServicePinger {
         LOG.info("Sentry Service ping succeeded.");
         System.exit(0);
       } catch (Exception e) {
-        LOG.error(String.format("Error issing RPC to Sentry Service (attempt %d/%d): ",
+        LOG.error(String.format("Error issuing RPC to Sentry Service (attempt %d/%d): ",
             maxPings - numPings + 1, maxPings), e);
         Thread.sleep(sleepSecs * 1000);
       }


[2/3] incubator-impala git commit: IMPALA-5104: Admit queries with mem equal to proc mem_limit

Posted by mj...@apache.org.
IMPALA-5104: Admit queries with mem equal to proc mem_limit

This allows queries to be admitted with estimated or requested memory
equal to the process memory limit.

Added a corresponding test case.

Change-Id: I197648f4162f2057141517b4b42ab5196884a65a
Reviewed-on: http://gerrit.cloudera.org:8080/7401
Reviewed-by: Matthew Jacobs <mj...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/67bc7a77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/67bc7a77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/67bc7a77

Branch: refs/heads/master
Commit: 67bc7a774c778c59cdaf7be39ceb5620f75e4f34
Parents: 4655c45
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Tue Jul 11 16:39:09 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Jul 12 04:40:51 2017 +0000

----------------------------------------------------------------------
 be/src/scheduling/admission-controller.cc         |  2 +-
 tests/custom_cluster/test_admission_controller.py | 16 ++++++++++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/67bc7a77/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 73169f9..543fdf4 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -397,7 +397,7 @@ Status AdmissionController::RejectImmediately(QuerySchedule* schedule,
     reject_reason = Substitute(REASON_REQ_OVER_POOL_MEM, PrintBytes(cluster_mem_needed),
         PrintBytes(pool_cfg.max_mem_resources));
   } else if (pool_cfg.max_mem_resources > 0 &&
-      schedule->GetPerHostMemoryEstimate() >= GetProcMemLimit()) {
+      schedule->GetPerHostMemoryEstimate() > GetProcMemLimit()) {
     reject_reason = Substitute(REASON_REQ_OVER_NODE_MEM,
         PrintBytes(schedule->GetPerHostMemoryEstimate()), PrintBytes(GetProcMemLimit()));
   } else if (stats->agg_num_queued() >= pool_cfg.max_queued) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/67bc7a77/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 5201c75..e84db52 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -315,6 +315,22 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
       assert re.search("Rejected query from pool default-pool : request memory needed "
           ".* is greater than pool max mem resources 10.00 MB", str(ex))
 
+  # Process mem_limit used in test_mem_limit_upper_bound
+  PROC_MEM_TEST_LIMIT = 1024 * 1024 * 1024
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+      impalad_args=impalad_admission_ctrl_flags(1, 1, 10 * PROC_MEM_TEST_LIMIT,
+          PROC_MEM_TEST_LIMIT))
+  def test_mem_limit_upper_bound(self, vector):
+    """ Test to ensure that a query is admitted if the requested memory is equal to the
+    process mem limit"""
+    query = "select * from functional.alltypesagg limit 1"
+    exec_options = vector.get_value('exec_option')
+    # Setting requested memory equal to process memory limit
+    exec_options['mem_limit'] = self.PROC_MEM_TEST_LIMIT
+    self.execute_query_expect_success(self.client, query, exec_options)
+
 class TestAdmissionControllerStress(TestAdmissionControllerBase):
   """Submits a number of queries (parameterized) with some delay between submissions
   (parameterized) and the ability to submit to one impalad or many in a round-robin


[3/3] incubator-impala git commit: IMPALA-5585: Fix expr-test to call last_day() tests

Posted by mj...@apache.org.
IMPALA-5585: Fix expr-test to call last_day() tests

The tests for last_day() were not being called.

Change-Id: Idf989c8f490385daff1fa892a3f6cfe5724e164f
Reviewed-on: http://gerrit.cloudera.org:8080/7403
Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/1d8274a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/1d8274a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/1d8274a8

Branch: refs/heads/master
Commit: 1d8274a89230cf0a7e3d59268431fa3fa582bfd4
Parents: 67bc7a7
Author: Matthew Jacobs <mj...@cloudera.com>
Authored: Tue Jul 11 17:58:03 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Jul 12 05:18:53 2017 +0000

----------------------------------------------------------------------
 be/src/exprs/expr-test.cc | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/1d8274a8/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index a1fd9a1..45815dd 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -5788,6 +5788,9 @@ TEST_F(ExprTest, TimestampFunctions) {
 
   // next_day udf test for IMPALA-2459
   TestNextDayFunction();
+
+  // last_day udf test for IMPALA-5316
+  TestLastDayFunction();
 }
 
 TEST_F(ExprTest, ConditionalFunctions) {