You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/05/03 15:28:33 UTC
[14/15] impala git commit: IMPALA-4835: switch I/O buffers to buffer
pool
http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 2e3ab20..28138bd 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -324,42 +324,29 @@ class BaseScalarColumnReader : public ParquetColumnReader {
BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
const SlotDescriptor* slot_desc)
: ParquetColumnReader(parent, node, slot_desc),
- data_(NULL),
- data_end_(NULL),
- def_levels_(true),
- rep_levels_(false),
- page_encoding_(parquet::Encoding::PLAIN_DICTIONARY),
- num_buffered_values_(0),
- num_values_read_(0),
- metadata_(NULL),
- stream_(NULL),
data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
}
virtual ~BaseScalarColumnReader() { }
- /// This is called once for each row group in the file.
- Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) {
- DCHECK(stream != NULL);
- DCHECK(metadata != NULL);
-
- num_buffered_values_ = 0;
- data_ = NULL;
- data_end_ = NULL;
- stream_ = stream;
- metadata_ = metadata;
- num_values_read_ = 0;
- def_level_ = HdfsParquetScanner::INVALID_LEVEL;
- // See ColumnReader constructor.
- rep_level_ = max_rep_level() == 0 ? 0 : HdfsParquetScanner::INVALID_LEVEL;
- pos_current_value_ = HdfsParquetScanner::INVALID_POS;
-
- if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
- RETURN_IF_ERROR(Codec::CreateDecompressor(
- NULL, false, ConvertParquetToImpalaCodec(metadata_->codec), &decompressor_));
- }
- ClearDictionaryDecoder();
+ /// Resets the reader for each row group in the file and creates the scan
+ /// range for the column, but does not start it. To start scanning,
+ /// set_io_reservation() must be called to assign reservation to this
+ /// column, followed by StartScan().
+ Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk,
+ int row_group_idx);
+
+ /// Starts the column scan range. The reader must be Reset() and have a
+ /// reservation assigned via set_io_reservation(). This must be called
+ /// before any of the column data can be read (including dictionary and
+ /// data pages). Returns an error status if there was an error starting the
+ /// scan or allocating buffers for it.
+ Status StartScan();
+
+ /// Helper to start scans for multiple columns at once.
+ static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) {
+ for (BaseScalarColumnReader* reader : readers) RETURN_IF_ERROR(reader->StartScan());
return Status::OK();
}
@@ -374,22 +361,27 @@ class BaseScalarColumnReader : public ParquetColumnReader {
if (dict_decoder != nullptr) dict_decoder->Close();
}
+ io::ScanRange* scan_range() const { return scan_range_; }
int64_t total_len() const { return metadata_->total_compressed_size; }
int col_idx() const { return node_.col_idx; }
THdfsCompression::type codec() const {
if (metadata_ == NULL) return THdfsCompression::NONE;
return ConvertParquetToImpalaCodec(metadata_->codec);
}
+ void set_io_reservation(int bytes) { io_reservation_ = bytes; }
/// Reads the next definition and repetition levels for this column. Initializes the
/// next data page if necessary.
virtual bool NextLevels() { return NextLevels<true>(); }
- // Check the data stream to see if there is a dictionary page. If there is,
- // use that page to initialize dict_decoder_ and advance the data stream
- // past the dictionary page.
+ /// Check the data stream to see if there is a dictionary page. If there is,
+ /// use that page to initialize dict_decoder_ and advance the data stream
+ /// past the dictionary page.
Status InitDictionary();
+ /// Convenience function to initialize multiple dictionaries.
+ static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> readers);
+
// Returns the dictionary or NULL if the dictionary doesn't exist
virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; }
@@ -415,33 +407,45 @@ class BaseScalarColumnReader : public ParquetColumnReader {
// fit in as few cache lines as possible.
/// Pointer to start of next value in data page
- uint8_t* data_;
+ uint8_t* data_ = nullptr;
/// End of the data page.
- const uint8_t* data_end_;
+ const uint8_t* data_end_ = nullptr;
/// Decoder for definition levels.
- ParquetLevelDecoder def_levels_;
+ ParquetLevelDecoder def_levels_{true};
/// Decoder for repetition levels.
- ParquetLevelDecoder rep_levels_;
+ ParquetLevelDecoder rep_levels_{false};
/// Page encoding for values of the current data page. Cached here for perf. Set in
/// InitDataPage().
- parquet::Encoding::type page_encoding_;
+ parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
/// Num values remaining in the current data page
- int num_buffered_values_;
+ int num_buffered_values_ = 0;
// Less frequently used members that are not accessed in inner loop should go below
// here so they do not occupy precious cache line space.
/// The number of values seen so far. Updated per data page.
- int64_t num_values_read_;
+ int64_t num_values_read_ = 0;
+
+ /// Metadata for the column for the current row group.
+ const parquet::ColumnMetaData* metadata_ = nullptr;
- const parquet::ColumnMetaData* metadata_;
boost::scoped_ptr<Codec> decompressor_;
- ScannerContext::Stream* stream_;
+
+ /// The scan range for the column's data. Initialized for each row group by Reset().
+ io::ScanRange* scan_range_ = nullptr;
+
+ // Stream used to read data from 'scan_range_'. Initialized by StartScan().
+ ScannerContext::Stream* stream_ = nullptr;
+
+ /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set
+ /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group
+ /// by Reset().
+ int64_t io_reservation_ = 0;
/// Pool to allocate storage for data pages from - either decompression buffers for
/// compressed data pages or copies of the data page with var-len data to attach to
http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index abdde07..c669e65 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -41,14 +41,15 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT;
ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
- HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range,
- const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool)
+ BufferPool::ClientHandle* bp_client, HdfsPartitionDescriptor* partition_desc,
+ const vector<FilterContext>& filter_ctxs,
+ MemPool* expr_results_pool)
: state_(state),
scan_node_(scan_node),
+ bp_client_(bp_client),
partition_desc_(partition_desc),
filter_ctxs_(filter_ctxs),
expr_results_pool_(expr_results_pool) {
- AddStream(scan_range);
}
ScannerContext::~ScannerContext() {
@@ -66,19 +67,20 @@ void ScannerContext::ClearStreams() {
}
ScannerContext::Stream::Stream(ScannerContext* parent, ScanRange* scan_range,
- const HdfsFileDesc* file_desc)
+ int64_t reservation, const HdfsFileDesc* file_desc)
: parent_(parent),
scan_range_(scan_range),
file_desc_(file_desc),
+ reservation_(reservation),
file_len_(file_desc->file_length),
next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES),
boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())),
boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
}
-ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) {
- streams_.emplace_back(new Stream(
- this, range, scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
+ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range, int64_t reservation) {
+ streams_.emplace_back(new Stream(this, range, reservation,
+ scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
return streams_.back().get();
}
@@ -101,6 +103,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool done) {
Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
DCHECK_EQ(0, io_buffer_bytes_left_);
+ DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
if (io_buffer_ != nullptr) ReturnIoBuffer();
@@ -121,7 +124,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
SCOPED_TIMER(parent_->state_->total_storage_wait_timer());
int64_t read_past_buffer_size = 0;
- int64_t max_buffer_size = parent_->state_->io_mgr()->max_read_buffer_size();
+ int64_t max_buffer_size = io_mgr->max_buffer_size();
if (!read_past_size_cb_.empty()) read_past_buffer_size = read_past_size_cb_(offset);
if (read_past_buffer_size <= 0) {
// Either no callback was set or the callback did not return an estimate. Use
@@ -133,6 +136,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size);
+ read_past_buffer_size = ::min(read_past_buffer_size, reservation_);
// We're reading past the scan range. Be careful not to read past the end of file.
DCHECK_GE(read_past_buffer_size, 0);
if (read_past_buffer_size == 0) {
@@ -143,8 +147,23 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
ScanRange* range = parent_->scan_node_->AllocateScanRange(
scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
scan_range_->disk_id(), false, BufferOpts::Uncached());
- RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
- parent_->scan_node_->reader_context(), range, &io_buffer_));
+ bool needs_buffers;
+ RETURN_IF_ERROR(io_mgr->StartScanRange(
+ parent_->scan_node_->reader_context(), range, &needs_buffers));
+ if (needs_buffers) {
+ // Allocate fresh buffers. The buffers for 'scan_range_' should be released now
+ // since we hit EOS.
+ if (reservation_ < io_mgr->min_buffer_size()) {
+ return Status(Substitute("Could not read past end of scan range in file '$0'. "
+ "Reservation provided $1 was < the minimum I/O buffer size",
+ reservation_, io_mgr->min_buffer_size()));
+ }
+ RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
+ parent_->scan_node_->reader_context(), parent_->bp_client_, range,
+ reservation_));
+ }
+ RETURN_IF_ERROR(range->GetNext(&io_buffer_));
+ DCHECK(io_buffer_->eosr());
}
DCHECK(io_buffer_ != nullptr);
@@ -324,7 +343,8 @@ Status ScannerContext::Stream::CopyIoToBoundary(int64_t num_bytes) {
void ScannerContext::Stream::ReturnIoBuffer() {
DCHECK(io_buffer_ != nullptr);
- ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffer_));
+ ScanRange* range = io_buffer_->scan_range();
+ range->ReturnBuffer(move(io_buffer_));
io_buffer_pos_ = nullptr;
io_buffer_bytes_left_ = 0;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index a131d3f..6292486 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -27,6 +27,7 @@
#include "common/compiler-util.h"
#include "common/status.h"
#include "exec/filter-context.h"
+#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/io/request-ranges.h"
namespace impala {
@@ -84,10 +85,12 @@ class TupleRow;
class ScannerContext {
public:
/// Create a scanner context with the parent scan_node (where materialized row batches
- /// get pushed to) and the scan range to process.
- /// This context starts with 1 stream.
- ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
- io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
+ /// get pushed to) and the scan range to process. Buffers are allocated using
+ /// 'bp_client'.
+ ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
+ BufferPool::ClientHandle* bp_client,
+ HdfsPartitionDescriptor* partition_desc,
+ const std::vector<FilterContext>& filter_ctxs,
MemPool* expr_results_pool);
/// Destructor verifies that all stream objects have been released.
~ScannerContext();
@@ -150,6 +153,7 @@ class ScannerContext {
const char* filename() { return scan_range_->file(); }
const io::ScanRange* scan_range() { return scan_range_; }
const HdfsFileDesc* file_desc() { return file_desc_; }
+ int64_t reservation() const { return reservation_; }
/// Returns the buffer's current offset in the file.
int64_t file_offset() const { return scan_range_->offset() + total_bytes_returned_; }
@@ -211,9 +215,15 @@ class ScannerContext {
private:
friend class ScannerContext;
- ScannerContext* parent_;
- io::ScanRange* scan_range_;
- const HdfsFileDesc* file_desc_;
+ ScannerContext* const parent_;
+ io::ScanRange* const scan_range_;
+ const HdfsFileDesc* const file_desc_;
+
+ /// Reservation given to this stream for allocating I/O buffers. The reservation is
+ /// shared with 'scan_range_', so the context must be careful not to use this until
+ /// all of 'scan_ranges_'s buffers have been freed. Must be >= the minimum IoMgr
+ /// buffer size to allow reading past the end of 'scan_range_'.
+ const int64_t reservation_;
/// Total number of bytes returned from GetBytes()
int64_t total_bytes_returned_ = 0;
@@ -272,7 +282,8 @@ class ScannerContext {
/// output_buffer_bytes_left_ will be set to something else.
static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
- Stream(ScannerContext* parent, io::ScanRange* scan_range,
+ /// Private constructor. See AddStream() for public API.
+ Stream(ScannerContext* parent, io::ScanRange* scan_range, int64_t reservation,
const HdfsFileDesc* file_desc);
/// GetBytes helper to handle the slow path.
@@ -355,24 +366,37 @@ class ScannerContext {
/// size to 0.
void ClearStreams();
- /// Add a stream to this ScannerContext for 'range'. The stream is owned by this
- /// context.
- Stream* AddStream(io::ScanRange* range);
+ /// Add a stream to this ScannerContext for 'range'. 'range' must already have any
+ /// buffers that it needs allocated. 'reservation' is the amount of reservation that
+ /// is given to this stream for allocating I/O buffers. The reservation is shared with
+ /// 'range', so the context must be careful not to use this until all of 'range's
+ /// buffers have been freed. Must be >= the minimum IoMgr buffer size o allow reading
+ /// past the end of 'range'.
+ ///
+ /// Returns the added stream. The returned stream is owned by this context.
+ Stream* AddStream(io::ScanRange* range, int64_t reservation);
/// Returns true if RuntimeState::is_cancelled() is true, or if scan node is not
/// multi-threaded and is done (finished, cancelled or reached it's limit).
/// In all other cases returns false.
bool cancelled() const;
- HdfsPartitionDescriptor* partition_descriptor() { return partition_desc_; }
+ BufferPool::ClientHandle* bp_client() const { return bp_client_; }
+ HdfsPartitionDescriptor* partition_descriptor() const { return partition_desc_; }
const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; }
MemPool* expr_results_pool() const { return expr_results_pool_; }
private:
friend class Stream;
- RuntimeState* state_;
- HdfsScanNodeBase* scan_node_;
- HdfsPartitionDescriptor* partition_desc_;
+ RuntimeState* const state_;
+ HdfsScanNodeBase* const scan_node_;
+
+ /// Buffer pool client used to allocate I/O buffers. This is accessed by multiple
+ /// threads in the multi-threaded scan node, so those threads must take care to only
+ /// call thread-safe BufferPool methods with this client.
+ BufferPool::ClientHandle* const bp_client_;
+
+ HdfsPartitionDescriptor* const partition_desc_;
/// Vector of streams. Non-columnar formats will always have one stream per context.
std::vector<std::unique_ptr<Stream>> streams_;
http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 285aacb..d14da63 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -37,6 +37,7 @@
namespace impala {
+class MemTracker;
class ReservationTracker;
class RuntimeProfile;
class SystemAllocator;
http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index c46c5ea..e441402 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -522,15 +522,15 @@ TEST_F(ReservationTrackerTest, TransferReservation) {
TEST_F(ReservationTrackerTest, ReservationUtil) {
const int64_t MEG = 1024 * 1024;
const int64_t GIG = 1024 * 1024 * 1024;
- EXPECT_EQ(75 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
+ EXPECT_EQ(32 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(0));
EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(-1));
- EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(75 * MEG));
+ EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(32 * MEG));
EXPECT_EQ(8 * GIG, ReservationUtil::GetReservationLimitFromMemLimit(10 * GIG));
- EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
- EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
+ EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
+ EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
EXPECT_EQ(500 * MEG, ReservationUtil::GetMinMemLimitFromReservation(400 * MEG));
EXPECT_EQ(5 * GIG, ReservationUtil::GetMinMemLimitFromReservation(4 * GIG));
http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/bufferpool/reservation-util.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-util.cc b/be/src/runtime/bufferpool/reservation-util.cc
index 85718ab..a27ab9d 100644
--- a/be/src/runtime/bufferpool/reservation-util.cc
+++ b/be/src/runtime/bufferpool/reservation-util.cc
@@ -24,7 +24,7 @@ namespace impala {
// Most operators that accumulate memory use reservations, so the majority of memory
// should be allocated to buffer reservations, as a heuristic.
const double ReservationUtil::RESERVATION_MEM_FRACTION = 0.8;
-const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 75 * 1024 * 1024;
+const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 32 * 1024 * 1024;
int64_t ReservationUtil::GetReservationLimitFromMemLimit(int64_t mem_limit) {
int64_t max_reservation = std::min<int64_t>(
http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 47bb0bd..1a38bc7 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -36,9 +36,9 @@
#include "runtime/client-cache.h"
#include "runtime/coordinator.h"
#include "runtime/data-stream-mgr.h"
-#include "runtime/io/disk-io-mgr.h"
#include "runtime/hbase-table-factory.h"
#include "runtime/hdfs-fs-cache.h"
+#include "runtime/io/disk-io-mgr.h"
#include "runtime/krpc-data-stream-mgr.h"
#include "runtime/lib-cache.h"
#include "runtime/mem-tracker.h"
@@ -366,10 +366,7 @@ Status ExecEnv::Init() {
LOG(INFO) << "Buffer pool limit: "
<< PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES);
- RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get()));
-
- mem_tracker_->AddGcFunction(
- [this](int64_t bytes_to_free) { disk_io_mgr_->GcIoBuffers(bytes_to_free); });
+ RETURN_IF_ERROR(disk_io_mgr_->Init());
// Start services in order to ensure that dependencies between them are met
if (enable_webserver_) {
http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/io/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h
index e6962ea..292530f 100644
--- a/be/src/runtime/io/disk-io-mgr-internal.h
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -34,9 +34,25 @@
#include "util/filesystem-util.h"
#include "util/hdfs-util.h"
#include "util/impalad-metrics.h"
+#include "util/runtime-profile-counters.h"
/// This file contains internal structures shared between submodules of the IoMgr. Users
/// of the IoMgr do not need to include this file.
+
+// Macros to work around counters sometimes not being provided.
+// TODO: fix things so that counters are always non-NULL.
+#define COUNTER_ADD_IF_NOT_NULL(c, v) \
+ do { \
+ ::impala::RuntimeProfile::Counter* __ctr__ = (c); \
+ if (__ctr__ != nullptr) __ctr__->Add(v); \
+ } while (false);
+
+#define COUNTER_BITOR_IF_NOT_NULL(c, v) \
+ do { \
+ ::impala::RuntimeProfile::Counter* __ctr__ = (c); \
+ if (__ctr__ != nullptr) __ctr__->BitOr(v); \
+ } while (false);
+
namespace impala {
namespace io {
http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/io/disk-io-mgr-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress-test.cc b/be/src/runtime/io/disk-io-mgr-stress-test.cc
index 45b36ed..2ec1d09 100644
--- a/be/src/runtime/io/disk-io-mgr-stress-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress-test.cc
@@ -15,8 +15,13 @@
// specific language governing permissions and limitations
// under the License.
+#include <gflags/gflags.h>
+
+#include "common/init.h"
#include "runtime/io/disk-io-mgr-stress.h"
-#include "util/cpu-info.h"
+#include "common/init.h"
+#include "runtime/test-env.h"
+#include "service/fe-support.h"
#include "util/string-parser.h"
#include "common/names.h"
@@ -28,34 +33,32 @@ using namespace impala::io;
// can be passed to control how long to run this test (0 for forever).
// TODO: make these configurable once we decide how to run BE tests with args
-const int DEFAULT_DURATION_SEC = 1;
+constexpr int DEFAULT_DURATION_SEC = 1;
const int NUM_DISKS = 5;
const int NUM_THREADS_PER_DISK = 5;
const int NUM_CLIENTS = 10;
const bool TEST_CANCELLATION = true;
+const int64_t BUFFER_POOL_CAPACITY = 1024L * 1024L * 1024L * 4L;
+
+DEFINE_int64(duration_sec, DEFAULT_DURATION_SEC,
+ "Disk I/O Manager stress test duration in seconds. 0 means run indefinitely.");
int main(int argc, char** argv) {
- google::InitGoogleLogging(argv[0]);
- CpuInfo::Init();
- OsInfo::Init();
- impala::InitThreading();
- int duration_sec = DEFAULT_DURATION_SEC;
-
- if (argc == 2) {
- StringParser::ParseResult status;
- duration_sec = StringParser::StringToInt<int>(argv[1], strlen(argv[1]), &status);
- if (status != StringParser::PARSE_SUCCESS) {
- printf("Invalid arg: %s\n", argv[1]);
- return 1;
- }
- }
- if (duration_sec != 0) {
- printf("Running stress test for %d seconds.\n", duration_sec);
+ impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+ impala::InitFeSupport();
+
+ if (FLAGS_duration_sec != 0) {
+ printf("Running stress test for %ld seconds.\n", FLAGS_duration_sec);
} else {
printf("Running stress test indefinitely.\n");
}
- DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION);
- test.Run(duration_sec);
+ TestEnv test_env;
+ // Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool allows it.
+ test_env.SetBufferPoolArgs(DiskIoMgrStress::MIN_READ_BUFFER_SIZE, BUFFER_POOL_CAPACITY);
+ Status status = test_env.Init();
+ CHECK(status.ok()) << status.GetDetail();
+ DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION);
+ test.Run(FLAGS_duration_sec);
return 0;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 8815357..3fd33de 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -19,6 +19,8 @@
#include "runtime/io/disk-io-mgr-stress.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/exec-env.h"
#include "runtime/io/request-context.h"
#include "util/time.h"
@@ -27,18 +29,20 @@
using namespace impala;
using namespace impala::io;
-static const float ABORT_CHANCE = .10f;
-static const int MIN_READ_LEN = 1;
-static const int MAX_READ_LEN = 20;
+constexpr float DiskIoMgrStress::ABORT_CHANCE;
+const int DiskIoMgrStress::MIN_READ_LEN;
+const int DiskIoMgrStress::MAX_READ_LEN;
-static const int MIN_FILE_LEN = 10;
-static const int MAX_FILE_LEN = 1024;
+const int DiskIoMgrStress::MIN_FILE_LEN;
+const int DiskIoMgrStress::MAX_FILE_LEN;
// Make sure this is between MIN/MAX FILE_LEN to test more cases
-static const int MIN_READ_BUFFER_SIZE = 64;
-static const int MAX_READ_BUFFER_SIZE = 128;
+const int DiskIoMgrStress::MIN_READ_BUFFER_SIZE;
+const int DiskIoMgrStress::MAX_READ_BUFFER_SIZE;
-static const int CANCEL_READER_PERIOD_MS = 20; // in ms
+const int DiskIoMgrStress::MAX_BUFFER_BYTES_PER_SCAN_RANGE;
+
+const int DiskIoMgrStress::CANCEL_READER_PERIOD_MS;
static void CreateTempFile(const char* filename, const char* data) {
FILE* file = fopen(filename, "w");
@@ -47,7 +51,7 @@ static void CreateTempFile(const char* filename, const char* data) {
fclose(file);
}
-string GenerateRandomData() {
+string DiskIoMgrStress::GenerateRandomData() {
int rand_len = rand() % (MAX_FILE_LEN - MIN_FILE_LEN) + MIN_FILE_LEN;
stringstream ss;
for (int i = 0; i < rand_len; ++i) {
@@ -59,6 +63,8 @@ string GenerateRandomData() {
struct DiskIoMgrStress::Client {
boost::mutex lock;
+ /// Pool for objects that is cleared when the client is (re-)initialized in NewClient().
+ ObjectPool obj_pool;
unique_ptr<RequestContext> reader;
int file_idx;
vector<ScanRange*> scan_ranges;
@@ -77,7 +83,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, num_threads_per_disk,
MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
- Status status = io_mgr_->Init(&mem_tracker_);
+ Status status = io_mgr_->Init();
CHECK(status.ok());
// Initialize some data files. It doesn't really matter how many there are.
@@ -92,6 +98,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
clients_ = new Client[num_clients_];
client_mem_trackers_.resize(num_clients_);
+ buffer_pool_clients_.reset(new BufferPool::ClientHandle[num_clients_]);
for (int i = 0; i < num_clients_; ++i) {
NewClient(i);
}
@@ -110,9 +117,16 @@ void DiskIoMgrStress::ClientThread(int client_id) {
while (!eos) {
ScanRange* range;
- Status status = io_mgr_->GetNextRange(client->reader.get(), &range);
+ bool needs_buffers;
+ Status status =
+ io_mgr_->GetNextUnstartedRange(client->reader.get(), &range, &needs_buffers);
CHECK(status.ok() || status.IsCancelled());
if (range == NULL) break;
+ if (needs_buffers) {
+ status = io_mgr_->AllocateBuffersForRange(client->reader.get(),
+ &buffer_pool_clients_[client_id], range, MAX_BUFFER_BYTES_PER_SCAN_RANGE);
+ CHECK(status.ok()) << status.GetDetail();
+ }
while (true) {
unique_ptr<BufferDescriptor> buffer;
@@ -137,7 +151,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
// Copy the bytes from this read into the result buffer.
memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
- io_mgr_->ReturnBuffer(move(buffer));
+ range->ReturnBuffer(move(buffer));
bytes_read += len;
CHECK_GE(bytes_read, 0);
@@ -159,6 +173,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
// Unregister the old client and get a new one
unique_lock<mutex> lock(client->lock);
io_mgr_->UnregisterContext(client->reader.get());
+ client->reader.reset();
NewClient(client_id);
}
@@ -170,11 +185,9 @@ void DiskIoMgrStress::ClientThread(int client_id) {
// Cancel a random reader
void DiskIoMgrStress::CancelRandomReader() {
if (!includes_cancellation_) return;
-
- int rand_client = rand() % num_clients_;
-
- unique_lock<mutex> lock(clients_[rand_client].lock);
- io_mgr_->CancelContext(clients_[rand_client].reader.get());
+ Client* rand_client = &clients_[rand() % num_clients_];
+ unique_lock<mutex> lock(rand_client->lock);
+ rand_client->reader->Cancel();
}
void DiskIoMgrStress::Run(int sec) {
@@ -199,10 +212,18 @@ void DiskIoMgrStress::Run(int sec) {
for (int i = 0; i < num_clients_; ++i) {
unique_lock<mutex> lock(clients_[i].lock);
- if (clients_[i].reader != NULL) io_mgr_->CancelContext(clients_[i].reader.get());
+ if (clients_[i].reader != NULL) clients_[i].reader->Cancel();
}
-
readers_.join_all();
+
+ for (int i = 0; i < num_clients_; ++i) {
+ if (clients_[i].reader != nullptr) {
+ io_mgr_->UnregisterContext(clients_[i].reader.get());
+ }
+ ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
+ client_mem_trackers_[i]->Close();
+ }
+ mem_tracker_.Close();
}
// Initialize a client to read one of the files at random. The scan ranges are
@@ -223,25 +244,41 @@ void DiskIoMgrStress::NewClient(int i) {
}
}
- for (int i = 0; i < client.scan_ranges.size(); ++i) {
- delete client.scan_ranges[i];
- }
+ // Clean up leftover state from the previous client (if any).
client.scan_ranges.clear();
+ ExecEnv* exec_env = ExecEnv::GetInstance();
+ exec_env->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
+ if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close();
+ client.obj_pool.Clear();
int assigned_len = 0;
while (assigned_len < file_len) {
int range_len = rand() % (MAX_READ_LEN - MIN_READ_LEN) + MIN_READ_LEN;
range_len = min(range_len, file_len - assigned_len);
- ScanRange* range = new ScanRange();
+ ScanRange* range = client.obj_pool.Add(new ScanRange);
range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len,
0, false, BufferOpts::Uncached());
client.scan_ranges.push_back(range);
assigned_len += range_len;
}
- client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
- client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get());
- Status status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
+ string client_name = Substitute("Client $0", i);
+ client_mem_trackers_[i].reset(new MemTracker(-1, client_name, &mem_tracker_));
+ Status status = exec_env->buffer_pool()->RegisterClient(client_name, nullptr,
+ exec_env->buffer_reservation(), client_mem_trackers_[i].get(),
+ numeric_limits<int64_t>::max(), RuntimeProfile::Create(&client.obj_pool, client_name),
+ &buffer_pool_clients_[i]);
+ CHECK(status.ok());
+ // Reserve enough memory for 3 buffers per range, which should be enough to guarantee
+ // progress.
+ CHECK(buffer_pool_clients_[i].IncreaseReservationToFit(
+ MAX_BUFFER_BYTES_PER_SCAN_RANGE * client.scan_ranges.size()))
+ << buffer_pool_clients_[i].DebugString() << "\n"
+ << exec_env->buffer_pool()->DebugString() << "\n"
+ << exec_env->buffer_reservation()->DebugString();
+
+ client.reader = io_mgr_->RegisterContext();
+ status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);
CHECK(status.ok());
}
http://git-wip-us.apache.org/repos/asf/impala/blob/9bf324e7/be/src/runtime/io/disk-io-mgr-stress.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.h b/be/src/runtime/io/disk-io-mgr-stress.h
index b872694..574b58c 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.h
+++ b/be/src/runtime/io/disk-io-mgr-stress.h
@@ -22,8 +22,11 @@
#include <memory>
#include <vector>
#include <boost/scoped_ptr.hpp>
+#include <boost/thread/condition_variable.hpp>
#include <boost/thread/thread.hpp>
+#include "common/object-pool.h"
+#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/mem-tracker.h"
#include "runtime/thread-resource-mgr.h"
@@ -43,15 +46,29 @@ class DiskIoMgrStress {
/// Run the test for 'sec'. If 0, run forever
void Run(int sec);
+ static constexpr float ABORT_CHANCE = .10f;
+ static const int MIN_READ_LEN = 1;
+ static const int MAX_READ_LEN = 20;
+
+ static const int MIN_FILE_LEN = 10;
+ static const int MAX_FILE_LEN = 1024;
+
+ // Make sure this is between MIN/MAX FILE_LEN to test more cases
+ static const int MIN_READ_BUFFER_SIZE = 64;
+ static const int MAX_READ_BUFFER_SIZE = 128;
+
+ // Maximum bytes to allocate per scan range.
+ static const int MAX_BUFFER_BYTES_PER_SCAN_RANGE = MAX_READ_BUFFER_SIZE * 3;
+
+ static const int CANCEL_READER_PERIOD_MS = 20;
private:
struct Client;
struct File {
std::string filename;
- std::string data; // the data in the file, used to validate
+ std::string data; // the data in the file, used to validate
};
-
/// Files used for testing. These are created at startup and recycled
/// during the test
std::vector<File> files_;
@@ -72,6 +89,9 @@ class DiskIoMgrStress {
/// Client MemTrackers, one per client.
std::vector<std::unique_ptr<MemTracker>> client_mem_trackers_;
+ /// Buffer pool clients, one per client.
+ std::unique_ptr<BufferPool::ClientHandle[]> buffer_pool_clients_;
+
/// If true, tests cancelling readers
bool includes_cancellation_;
@@ -88,6 +108,8 @@ class DiskIoMgrStress {
/// Possibly cancels a random reader.
void CancelRandomReader();
+
+ static std::string GenerateRandomData();
};
}
}