You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/23 04:18:14 UTC
[09/11] impala git commit: IMPALA-4835: Part 3: switch I/O buffers to
buffer pool
IMPALA-4835: Part 3: switch I/O buffers to buffer pool
This is the final patch to switch the Disk I/O manager to allocate all
buffer from the buffer pool and to reserve the buffers required for
a query upfront.
* The planner reserves enough memory to run a single scanner per
scan node.
* The multi-threaded scan node must increase reservation before
spinning up more threads.
* The scanner implementations must be careful to stay within their
assigned reservation.
The row-oriented scanners were most straightforward, since they only
have a single scan range active at a time. A single I/O buffer is
sufficient to scan the whole file but more I/O buffers can improve I/O
throughput.
Parquet is more complex because it issues a scan range per column and
the sizes of the columns on disk are not known during planning. To
deal with this, the reservation in the frontend is based on a
heuristic involving the file size and # columns. The Parquet scanner
can then divvy up reservation to columns based on the size of column
data on disk.
I adjusted how the 'mem_limit' is divided between buffer pool and non
buffer pool memory for low mem_limits to account for the increase in
buffer pool memory.
Testing:
* Added more planner tests to cover reservation calcs for scan node.
* Test scanners for all file formats with the reservation denial debug
action, to test behaviour when the scanners hit reservation limits.
* Updated memory and buffer pool limits for tests.
* Added unit tests for dividing reservation between columns in parquet,
since the algorithm is non-trivial.
Perf:
I ran TPC-H and targeted perf locally comparing with master. Both
showed small improvements of a few percent and no regressions of
note. Cluster perf tests showed no significant change.
Change-Id: Ic09c6196b31e55b301df45cc56d0b72cfece6786
Reviewed-on: http://gerrit.cloudera.org:8080/8966
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/24b4ed0b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/24b4ed0b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/24b4ed0b
Branch: refs/heads/master
Commit: 24b4ed0b29a44090350e630d625291c01b753a36
Parents: 5699b59
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Jan 5 16:47:03 2018 -0800
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Fri Feb 23 04:17:41 2018 +0000
----------------------------------------------------------------------
be/src/exec/CMakeLists.txt | 1 +
be/src/exec/hdfs-parquet-scanner-test.cc | 96 +
be/src/exec/hdfs-parquet-scanner.cc | 218 ++-
be/src/exec/hdfs-parquet-scanner.h | 38 +-
be/src/exec/hdfs-scan-node-base.cc | 27 +-
be/src/exec/hdfs-scan-node-base.h | 3 +
be/src/exec/hdfs-scan-node-mt.cc | 11 +-
be/src/exec/hdfs-scan-node.cc | 143 +-
be/src/exec/hdfs-scan-node.h | 66 +-
be/src/exec/parquet-column-readers.cc | 108 +-
be/src/exec/parquet-column-readers.h | 88 +-
be/src/exec/scanner-context.cc | 28 +-
be/src/exec/scanner-context.h | 54 +-
.../bufferpool/reservation-tracker-test.cc | 8 +-
be/src/runtime/bufferpool/reservation-util.cc | 2 +-
be/src/runtime/io/disk-io-mgr-stress-test.cc | 38 +-
be/src/runtime/io/disk-io-mgr-stress.cc | 69 +-
be/src/runtime/io/disk-io-mgr-stress.h | 26 +-
be/src/runtime/io/disk-io-mgr-test.cc | 542 +++---
be/src/runtime/io/disk-io-mgr.cc | 34 +-
be/src/runtime/io/disk-io-mgr.h | 35 +-
be/src/runtime/io/request-context.cc | 29 +-
be/src/runtime/io/request-context.h | 14 +-
be/src/runtime/io/request-ranges.h | 15 +-
be/src/runtime/io/scan-range.cc | 6 +-
be/src/runtime/tmp-file-mgr.cc | 2 +-
common/thrift/PlanNodes.thrift | 3 +
.../apache/impala/analysis/SlotDescriptor.java | 19 +
.../org/apache/impala/analysis/SlotRef.java | 20 -
.../org/apache/impala/planner/HdfsScanNode.java | 167 +-
.../java/org/apache/impala/util/BitUtil.java | 6 +
.../org/apache/impala/util/BitUtilTest.java | 6 +
.../queries/PlannerTest/constant-folding.test | 42 +-
.../queries/PlannerTest/disable-codegen.test | 24 +-
.../PlannerTest/fk-pk-join-detection.test | 78 +-
.../queries/PlannerTest/max-row-size.test | 80 +-
.../PlannerTest/min-max-runtime-filters.test | 6 +-
.../queries/PlannerTest/mt-dop-validation.test | 40 +-
.../queries/PlannerTest/parquet-filtering.test | 42 +-
.../queries/PlannerTest/partition-pruning.test | 4 +-
.../PlannerTest/resource-requirements.test | 1814 ++++++++++++++----
.../PlannerTest/sort-expr-materialization.test | 32 +-
.../PlannerTest/spillable-buffer-sizing.test | 192 +-
.../queries/PlannerTest/tablesample.test | 44 +-
.../queries/PlannerTest/union.test | 8 +-
.../admission-reject-min-reservation.test | 12 +-
.../queries/QueryTest/analytic-fns.test | 5 +-
.../queries/QueryTest/codegen-mem-limit.test | 5 +-
.../QueryTest/disk-spill-encryption.test | 2 +-
.../queries/QueryTest/explain-level0.test | 4 +-
.../queries/QueryTest/explain-level1.test | 4 +-
.../queries/QueryTest/explain-level2.test | 14 +-
.../queries/QueryTest/explain-level3.test | 14 +-
.../queries/QueryTest/nested-types-tpch.test | 6 +-
.../queries/QueryTest/runtime_row_filters.test | 11 +-
.../queries/QueryTest/scanners.test | 7 +
.../functional-query/queries/QueryTest/set.test | 2 +-
.../queries/QueryTest/spilling-aggs.test | 19 +-
.../spilling-naaj-no-deny-reservation.test | 7 +-
.../queries/QueryTest/spilling-naaj.test | 8 +-
.../QueryTest/spilling-no-debug-action.test | 66 +
.../QueryTest/spilling-sorts-exhaustive.test | 10 +-
.../queries/QueryTest/spilling.test | 76 +-
.../queries/QueryTest/stats-extrapolation.test | 52 +-
.../tpch/queries/sort-reservation-usage.test | 9 +-
tests/common/test_dimensions.py | 16 +-
tests/custom_cluster/test_scratch_disk.py | 2 +-
tests/query_test/test_mem_usage_scaling.py | 11 +-
tests/query_test/test_query_mem_limit.py | 4 +-
tests/query_test/test_scanners.py | 25 +-
tests/query_test/test_scanners_fuzz.py | 9 +-
tests/query_test/test_sort.py | 16 +-
tests/query_test/test_spilling.py | 5 +
73 files changed, 3204 insertions(+), 1545 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index aab1383..c1f91d6 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -108,3 +108,4 @@ ADD_BE_TEST(parquet-version-test)
ADD_BE_TEST(row-batch-list-test)
ADD_BE_TEST(incr-stats-util-test)
ADD_BE_TEST(hdfs-avro-scanner-test)
+ADD_BE_TEST(hdfs-parquet-scanner-test)
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-parquet-scanner-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner-test.cc b/be/src/exec/hdfs-parquet-scanner-test.cc
new file mode 100644
index 0000000..cbc6e76
--- /dev/null
+++ b/be/src/exec/hdfs-parquet-scanner-test.cc
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs-parquet-scanner.h"
+#include "testutil/gtest-util.h"
+
+#include "common/names.h"
+
+static const int64_t MIN_BUFFER_SIZE = 64 * 1024;
+static const int64_t MAX_BUFFER_SIZE = 8 * 1024 * 1024;
+
+namespace impala {
+
+class HdfsParquetScannerTest : public testing::Test {
+ protected:
+ void TestDivideReservation(const vector<int64_t>& col_range_lengths,
+ int64_t total_col_reservation, const vector<int64_t>& expected_reservations);
+};
+
+/// Test that DivideReservationBetweenColumns() returns 'expected_reservations' for
+/// inputs 'col_range_lengths' and 'total_col_reservation'.
+void HdfsParquetScannerTest::TestDivideReservation(const vector<int64_t>& col_range_lengths,
+ int64_t total_col_reservation, const vector<int64_t>& expected_reservations) {
+ vector<pair<int, int64_t>> reservations =
+ HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
+ MIN_BUFFER_SIZE, MAX_BUFFER_SIZE, col_range_lengths, total_col_reservation);
+ for (int i = 0; i < reservations.size(); ++i) {
+ LOG(INFO) << i << " " << reservations[i].first << " " << reservations[i].second;
+ }
+ EXPECT_EQ(reservations.size(), expected_reservations.size());
+ vector<bool> present(expected_reservations.size(), false);
+ for (auto& reservation: reservations) {
+ // Ensure that each appears exactly once.
+ EXPECT_FALSE(present[reservation.first]);
+ present[reservation.first] = true;
+ EXPECT_EQ(expected_reservations[reservation.first], reservation.second)
+ << reservation.first;
+ }
+}
+
+TEST_F(HdfsParquetScannerTest, DivideReservation) {
+ // Test a long scan ranges with lots of memory - should allocate 3 max-size
+ // buffers per range.
+ TestDivideReservation({100 * 1024 * 1024}, 50 * 1024 * 1024, {3 * MAX_BUFFER_SIZE});
+ TestDivideReservation({100 * 1024 * 1024, 50 * 1024 * 1024}, 100 * 1024 * 1024,
+ {3 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
+
+ // Long scan ranges, not enough memory for 3 buffers each. Should only allocate
+ // max-sized buffers, preferring the longer scan range.
+ TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 5 * MAX_BUFFER_SIZE,
+ {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
+ TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024},
+ 5 * MAX_BUFFER_SIZE + MIN_BUFFER_SIZE,
+ {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
+ TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 6 * MAX_BUFFER_SIZE - 1,
+ {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
+
+ // Test a short range with lots of memory - should round up buffer size.
+ TestDivideReservation({100 * 1024}, 50 * 1024 * 1024, {128 * 1024});
+
+ // Test a range << MIN_BUFFER_SIZE - should round up to buffer size.
+ TestDivideReservation({13}, 50 * 1024 * 1024, {MIN_BUFFER_SIZE});
+
+ // Test long ranges with limited memory.
+ TestDivideReservation({100 * 1024 * 1024}, 100 * 1024, {MIN_BUFFER_SIZE});
+ TestDivideReservation({100 * 1024 * 1024}, MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE});
+ TestDivideReservation({100 * 1024 * 1024}, 2 * MIN_BUFFER_SIZE, {2 * MIN_BUFFER_SIZE});
+ TestDivideReservation({100 * 1024 * 1024}, MAX_BUFFER_SIZE - 1, {MAX_BUFFER_SIZE / 2});
+ TestDivideReservation({100 * 1024 * 1024, 1024 * 1024, MIN_BUFFER_SIZE},
+ 3 * MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE, MIN_BUFFER_SIZE, MIN_BUFFER_SIZE});
+
+ // Test a mix of scan range lengths larger than and smaller than the max I/O buffer
+ // size. Long ranges get allocated most memory.
+ TestDivideReservation(
+ {15145047, 5019635, 5019263, 15145047, 15145047, 5019635, 5019263, 317304},
+ 25165824,
+ {8388608, 2097152, 524288, 8388608, 4194304, 1048576, 262144, 262144});
+}
+
+}
+
+IMPALA_TEST_MAIN();
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 0188f08..51a39be 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -17,6 +17,7 @@
#include "exec/hdfs-parquet-scanner.h"
+#include <algorithm>
#include <queue>
#include <gutil/strings/substitute.h>
@@ -27,6 +28,7 @@
#include "exec/parquet-column-stats.h"
#include "exec/scanner-context.inline.h"
#include "runtime/collection-value-builder.h"
+#include "runtime/exec-env.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/runtime-state.h"
#include "runtime/runtime-filter.inline.h"
@@ -35,6 +37,7 @@
#include "common/names.h"
using std::move;
+using std::sort;
using namespace impala;
using namespace impala::io;
@@ -47,10 +50,6 @@ constexpr int BATCHES_PER_FILTER_SELECTIVITY_CHECK = 16;
static_assert(BitUtil::IsPowerOf2(BATCHES_PER_FILTER_SELECTIVITY_CHECK),
"BATCHES_PER_FILTER_SELECTIVITY_CHECK must be a power of two");
-// Max dictionary page header size in bytes. This is an estimate and only needs to be an
-// upper bound.
-const int MAX_DICT_HEADER_SIZE = 100;
-
// Max entries in the dictionary before switching to PLAIN encoding. If a dictionary
// has fewer entries, then the entire column is dictionary encoded. This threshold
// is guaranteed to be true for Impala versions 2.9 or below.
@@ -99,7 +98,7 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
static_cast<ScanRangeMetadata*>(split->meta_data());
// Each split is processed by first issuing a scan range for the file footer, which
// is done here, followed by scan ranges for the columns of each row group within
- // the actual split (in InitColumns()). The original split is stored in the
+ // the actual split (see InitScalarColumns()). The original split is stored in the
// metadata associated with the footer range.
ScanRange* footer_range;
if (footer_split != nullptr) {
@@ -229,9 +228,14 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
// Release I/O buffers immediately to make sure they are cleaned up
// in case we return a non-OK status anywhere below.
+ int64_t stream_reservation = stream_->reservation();
+ stream_ = nullptr;
context_->ReleaseCompletedResources(true);
context_->ClearStreams();
RETURN_IF_ERROR(footer_status);
+ // The scanner-wide stream was used only to read the file footer. Each column has added
+ // its own stream. We can use the reservation from 'stream_' for the columns now.
+ total_col_reservation_ = stream_reservation;
// Parse the file schema into an internal representation for schema resolution.
schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(),
@@ -248,10 +252,6 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
RETURN_IF_ERROR(InitDictFilterStructures());
-
- // The scanner-wide stream was used only to read the file footer. Each column has added
- // its own stream.
- stream_ = nullptr;
return Status::OK();
}
@@ -675,15 +675,13 @@ Status HdfsParquetScanner::NextRowGroup() {
}
InitCollectionColumns();
+ RETURN_IF_ERROR(InitScalarColumns());
- // Prepare dictionary filtering columns for first read
- // This must be done before dictionary filtering, because this code initializes
- // the column offsets and streams needed to read the dictionaries.
- // TODO: Restructure the code so that the dictionary can be read without the rest
- // of the column.
- RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, dict_filterable_readers_));
+ // Start scanning dictionary filtering column readers, so we can read the dictionary
+ // pages in EvalDictionaryFilters().
+ RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(dict_filterable_readers_));
- // InitColumns() may have allocated resources to scan columns. If we skip this row
+ // StartScans() may have allocated resources to scan columns. If we skip this row
// group below, we must call ReleaseSkippedRowGroupResources() before continuing.
// If there is a dictionary-encoded column where every value is eliminated
@@ -704,10 +702,10 @@ Status HdfsParquetScanner::NextRowGroup() {
}
// At this point, the row group has passed any filtering criteria
- // Prepare non-dictionary filtering column readers for first read and
- // initialize their dictionaries.
- RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, non_dict_filterable_readers_));
- status = InitDictionaries(non_dict_filterable_readers_);
+ // Start scanning non-dictionary filtering column readers and initialize their
+ // dictionaries.
+ RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(non_dict_filterable_readers_));
+ status = BaseScalarColumnReader::InitDictionaries(non_dict_filterable_readers_);
if (!status.ok()) {
// Either return an error or skip this row group if it is ok to ignore errors
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
@@ -742,7 +740,6 @@ Status HdfsParquetScanner::NextRowGroup() {
break;
}
}
-
DCHECK(parse_status_.ok());
return Status::OK();
}
@@ -800,6 +797,7 @@ void HdfsParquetScanner::PartitionReaders(
} else {
BaseScalarColumnReader* scalar_reader =
static_cast<BaseScalarColumnReader*>(reader);
+ scalar_readers_.push_back(scalar_reader);
if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) {
dict_filterable_readers_.push_back(scalar_reader);
} else {
@@ -991,7 +989,7 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
// Any columns that were not 100% dictionary encoded need to initialize
// their dictionaries here.
- RETURN_IF_ERROR(InitDictionaries(deferred_dict_init_list));
+ RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list));
return Status::OK();
}
@@ -1648,23 +1646,16 @@ void HdfsParquetScanner::InitCollectionColumns() {
}
}
-Status HdfsParquetScanner::InitScalarColumns(
- int row_group_idx, const vector<BaseScalarColumnReader*>& column_readers) {
+Status HdfsParquetScanner::InitScalarColumns() {
int64_t partition_id = context_->partition_descriptor()->id();
const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
DCHECK(file_desc != nullptr);
- parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx];
+ parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
- // All the scan ranges (one for each column).
- vector<ScanRange*> col_ranges;
// Used to validate that the number of values in each reader in column_readers_ at the
// same SchemaElement is the same.
unordered_map<const parquet::SchemaElement*, int> num_values_map;
- // Used to validate we issued the right number of scan ranges
- int num_scalar_readers = 0;
-
- for (BaseScalarColumnReader* scalar_reader: column_readers) {
- ++num_scalar_readers;
+ for (BaseScalarColumnReader* scalar_reader : scalar_readers_) {
const parquet::ColumnChunk& col_chunk = row_group.columns[scalar_reader->col_idx()];
auto num_values_it = num_values_map.find(&scalar_reader->schema_element());
int num_values = -1;
@@ -1673,84 +1664,115 @@ Status HdfsParquetScanner::InitScalarColumns(
} else {
num_values_map[&scalar_reader->schema_element()] = col_chunk.meta_data.num_values;
}
- int64_t col_start = col_chunk.meta_data.data_page_offset;
-
if (num_values != -1 && col_chunk.meta_data.num_values != num_values) {
// TODO: improve this error message by saying which columns are different,
// and also specify column in other error messages as appropriate
return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(),
col_chunk.meta_data.num_values, num_values, filename());
}
+ RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_));
+ }
+ RETURN_IF_ERROR(
+ DivideReservationBetweenColumns(scalar_readers_, total_col_reservation_));
+ return Status::OK();
+}
- RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(file_metadata_,
- filename(), row_group_idx, scalar_reader->col_idx(),
- scalar_reader->schema_element(), state_));
-
- if (col_chunk.meta_data.__isset.dictionary_page_offset) {
- // Already validated in ValidateColumnOffsets()
- DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
- col_start = col_chunk.meta_data.dictionary_page_offset;
- }
- int64_t col_len = col_chunk.meta_data.total_compressed_size;
- if (col_len <= 0) {
- return Status(Substitute("File '$0' contains invalid column chunk size: $1",
- filename(), col_len));
- }
- int64_t col_end = col_start + col_len;
-
- // Already validated in ValidateColumnOffsets()
- DCHECK_GT(col_end, 0);
- DCHECK_LT(col_end, file_desc->file_length);
- if (file_version_.application == "parquet-mr" && file_version_.VersionLt(1, 2, 9)) {
- // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
- // dictionary page header size in total_compressed_size and total_uncompressed_size
- // (see IMPALA-694). We pad col_len to compensate.
- int64_t bytes_remaining = file_desc->file_length - col_end;
- int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
- col_len += pad;
- }
-
- // TODO: this will need to change when we have co-located files and the columns
- // are different files.
- if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
- return Status(Substitute("Expected parquet column file path '$0' to match "
- "filename '$1'", col_chunk.file_path, filename()));
- }
-
- const ScanRange* split_range =
- static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
-
- // Determine if the column is completely contained within a local split.
- bool col_range_local = split_range->expected_local()
- && col_start >= split_range->offset()
- && col_end <= split_range->offset() + split_range->len();
- ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
- filename(), col_len, col_start, partition_id, split_range->disk_id(),
- col_range_local,
- BufferOpts(split_range->try_cache(), file_desc->mtime));
- col_ranges.push_back(col_range);
-
- // Get the stream that will be used for this column
- ScannerContext::Stream* stream = context_->AddStream(col_range);
- DCHECK(stream != nullptr);
+Status HdfsParquetScanner::DivideReservationBetweenColumns(
+ const vector<BaseScalarColumnReader*>& column_readers,
+ int64_t reservation_to_distribute) {
+ DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+ const int64_t min_buffer_size = io_mgr->min_buffer_size();
+ const int64_t max_buffer_size = io_mgr->max_buffer_size();
+ // The HdfsScanNode reservation calculation in the planner ensures that we have
+ // reservation for at least one buffer per column.
+ if (reservation_to_distribute < min_buffer_size * column_readers.size()) {
+ return Status(TErrorCode::INTERNAL_ERROR,
+ Substitute("Not enough reservation in Parquet scanner for file '$0'. Need at "
+ "least $1 bytes per column for $2 columns but had $3 bytes",
+ filename(), min_buffer_size, column_readers.size(),
+ reservation_to_distribute));
+ }
- RETURN_IF_ERROR(scalar_reader->Reset(&col_chunk.meta_data, stream));
+ vector<int64_t> col_range_lengths(column_readers.size());
+ for (int i = 0; i < column_readers.size(); ++i) {
+ col_range_lengths[i] = column_readers[i]->scan_range()->len();
}
- DCHECK_EQ(col_ranges.size(), num_scalar_readers);
+ vector<pair<int, int64_t>> tmp_reservations = DivideReservationBetweenColumnsHelper(
+ min_buffer_size, max_buffer_size, col_range_lengths, reservation_to_distribute);
+ for (auto& tmp_reservation : tmp_reservations) {
+ column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second);
+ }
+ return Status::OK();
+}
- DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr();
- // Issue all the column chunks to the IoMgr. We scan through all columns at the same
- // time so need to read from all of them concurrently.
- for (ScanRange* col_range : col_ranges) {
- bool needs_buffers;
- RETURN_IF_ERROR(io_mgr->StartScanRange(
- scan_node_->reader_context(), col_range, &needs_buffers));
- if (needs_buffers) {
- RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
- scan_node_->reader_context(), col_range, 3 * io_mgr->max_buffer_size()));
+vector<pair<int, int64_t>> HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
+ int64_t min_buffer_size, int64_t max_buffer_size,
+ const vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute) {
+ // Pair of (column index, reservation allocated).
+ vector<pair<int, int64_t>> tmp_reservations;
+ for (int i = 0; i < col_range_lengths.size(); ++i) tmp_reservations.emplace_back(i, 0);
+
+ // Sort in descending order of length, breaking ties by index so that larger columns
+ // get allocated reservation first. It is common to have dramatically different column
+ // sizes in a single file because of different value sizes and compressibility. E.g.
+ // consider a large STRING "comment" field versus a highly compressible
+ // dictionary-encoded column with only a few distinct values. We want to give max-sized
+ // buffers to large columns first to maximize the size of I/Os that we do while reading
+ // this row group.
+ sort(tmp_reservations.begin(), tmp_reservations.end(),
+ [&col_range_lengths](const pair<int, int64_t>& left, const pair<int, int64_t>& right) {
+ int64_t left_len = col_range_lengths[left.first];
+ int64_t right_len = col_range_lengths[right.first];
+ return left_len != right_len ? left_len > right_len : left.first < right.first;
+ });
+
+ // Set aside the minimum reservation per column.
+ reservation_to_distribute -= min_buffer_size * col_range_lengths.size();
+
+ // Allocate reservations to columns by repeatedly allocating either a max-sized buffer
+ // or a large enough buffer to fit the remaining data for each column. Do this
+ // round-robin up to the ideal number of I/O buffers.
+ for (int i = 0; i < DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) {
+ for (auto& tmp_reservation : tmp_reservations) {
+ // Add back the reservation we set aside above.
+ if (i == 0) reservation_to_distribute += min_buffer_size;
+
+ int64_t bytes_left_in_range =
+ col_range_lengths[tmp_reservation.first] - tmp_reservation.second;
+ int64_t bytes_to_add;
+ if (bytes_left_in_range >= max_buffer_size) {
+ if (reservation_to_distribute >= max_buffer_size) {
+ bytes_to_add = max_buffer_size;
+ } else if (i == 0) {
+ DCHECK_EQ(0, tmp_reservation.second);
+ // Ensure this range gets at least one buffer on the first iteration.
+ bytes_to_add = BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute);
+ } else {
+ DCHECK_GT(tmp_reservation.second, 0);
+ // We need to read more than the max buffer size, but can't allocate a
+ // max-sized buffer. Stop adding buffers to this column: we prefer to use
+ // the existing max-sized buffers without small buffers mixed in so that
+ // we will alway do max-sized I/Os, which make efficient use of I/O devices.
+ bytes_to_add = 0;
+ }
+ } else if (bytes_left_in_range > 0 &&
+ reservation_to_distribute >= min_buffer_size) {
+ // Choose a buffer size that will fit the rest of the bytes left in the range.
+ bytes_to_add = max(min_buffer_size, BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range));
+ // But don't add more reservation than is available.
+ bytes_to_add =
+ min(bytes_to_add, BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute));
+ } else {
+ bytes_to_add = 0;
+ }
+ DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << bytes_to_add;
+ reservation_to_distribute -= bytes_to_add;
+ tmp_reservation.second += bytes_to_add;
+ DCHECK_GE(reservation_to_distribute, 0);
+ DCHECK_GT(tmp_reservation.second, 0);
}
}
- return Status::OK();
+ return tmp_reservations;
}
Status HdfsParquetScanner::InitDictionaries(
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 1fc3239..92f2550 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -34,7 +34,7 @@ class CollectionValueBuilder;
struct HdfsFileDesc;
/// Internal schema representation and resolution.
-class SchemaNode;
+struct SchemaNode;
/// Class that implements Parquet definition and repetition level decoding.
class ParquetLevelDecoder;
@@ -361,6 +361,7 @@ class HdfsParquetScanner : public HdfsScanner {
template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
friend class ScalarColumnReader;
friend class BoolColumnReader;
+ friend class HdfsParquetScannerTest;
/// Size of the file footer. This is a guess. If this value is too little, we will
/// need to issue another read.
@@ -429,7 +430,7 @@ class HdfsParquetScanner : public HdfsScanner {
/// Number of scratch batches processed so far.
int64_t row_batches_produced_;
- /// Column reader for each materialized columns for this file.
+ /// Column reader for each top-level materialized slot in the output tuple.
std::vector<ParquetColumnReader*> column_readers_;
/// Column readers will write slot values into this scratch batch for
@@ -445,6 +446,9 @@ class HdfsParquetScanner : public HdfsScanner {
/// Scan range for the metadata.
const io::ScanRange* metadata_range_;
+ /// Reservation available for scanning columns, in bytes.
+ int64_t total_col_reservation_ = 0;
+
/// Pool to copy dictionary page buffer into. This pool is shared across all the
/// pages in a column chunk.
boost::scoped_ptr<MemPool> dictionary_pool_;
@@ -461,6 +465,9 @@ class HdfsParquetScanner : public HdfsScanner {
/// or nested within a collection.
std::vector<BaseScalarColumnReader*> non_dict_filterable_readers_;
+ /// Flattened list of all scalar column readers in column_readers_.
+ std::vector<BaseScalarColumnReader*> scalar_readers_;
+
/// Flattened collection column readers that point to readers in column_readers_.
std::vector<CollectionColumnReader*> collection_readers_;
@@ -627,12 +634,24 @@ class HdfsParquetScanner : public HdfsScanner {
WARN_UNUSED_RESULT;
/// Walks file_metadata_ and initiates reading the materialized columns. This
- /// initializes 'column_readers' and issues the reads for the columns. 'column_readers'
- /// includes a mix of scalar readers from multiple schema nodes (i.e., readers of
- /// top-level scalar columns and readers of scalar columns within a collection node).
- Status InitScalarColumns(
- int row_group_idx, const std::vector<BaseScalarColumnReader*>& column_readers)
- WARN_UNUSED_RESULT;
+ /// initializes 'scalar_readers_' and divides reservation between the columns but
+ /// does not start any scan ranges.
+ Status InitScalarColumns() WARN_UNUSED_RESULT;
+
+ /// Decides how to divide 'reservation_to_distribute' bytes of reservation between the
+ /// columns. Sets the reservation on each corresponding reader in 'column_readers'.
+ Status DivideReservationBetweenColumns(
+ const std::vector<BaseScalarColumnReader*>& column_readers,
+ int64_t reservation_to_distribute);
+
+ /// Helper for DivideReservationBetweenColumns. Implements the core algorithm for
+ /// dividing a reservation of 'reservation_to_distribute' bytes between columns with
+ /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns
+ /// a vector with an entry per column with the index into 'col_range_lengths' and the
+ /// amount of reservation in bytes to give to that column.
+ static std::vector<std::pair<int, int64_t>> DivideReservationBetweenColumnsHelper(
+ int64_t min_buffer_size, int64_t max_buffer_size,
+ const std::vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute);
/// Initializes the column readers in collection_readers_.
void InitCollectionColumns();
@@ -668,7 +687,8 @@ class HdfsParquetScanner : public HdfsScanner {
/// Partitions the readers into scalar and collection readers. The collection readers
/// are flattened into collection_readers_. The scalar readers are partitioned into
/// dict_filterable_readers_ and non_dict_filterable_readers_ depending on whether
- /// dictionary filtering is enabled and the reader can be dictionary filtered.
+ /// dictionary filtering is enabled and the reader can be dictionary filtered. All
+ /// scalar readers are also flattened into scalar_readers_.
void PartitionReaders(const vector<ParquetColumnReader*>& readers,
bool can_eval_dict_filters);
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 94af63b..b6a51f6 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -35,6 +35,7 @@
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
#include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/io/request-context.h"
@@ -64,6 +65,7 @@ const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ScanNode(pool, tnode, descs),
+ ideal_scan_range_reservation_(tnode.hdfs_scan_node.ideal_scan_range_reservation),
min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ?
tnode.hdfs_scan_node.min_max_tuple_id : -1),
skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
@@ -80,6 +82,7 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
&tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
disks_accessed_bitmap_(TUnit::UNIT, 0),
active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
+ DCHECK_GE(ideal_scan_range_reservation_, resource_profile_.min_reservation);
}
HdfsScanNodeBase::~HdfsScanNodeBase() {
@@ -109,7 +112,6 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts,
*min_max_row_desc, state, &min_max_conjuncts_));
}
-
return Status::OK();
}
@@ -240,6 +242,16 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size());
ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
+ // Check if reservation was enough to allocate at least one buffer. The
+ // reservation calculation in HdfsScanNode.java should guarantee this.
+ // Hitting this error indicates a misconfiguration or bug.
+ int64_t min_buffer_size = ExecEnv::GetInstance()->disk_io_mgr()->min_buffer_size();
+ if (scan_range_params_->size() > 0
+ && resource_profile_.min_reservation < min_buffer_size) {
+ return Status(TErrorCode::INTERNAL_ERROR,
+ Substitute("HDFS scan min reservation $0 must be >= min buffer size $1",
+ resource_profile_.min_reservation, min_buffer_size));
+ }
// Add per volume stats to the runtime profile
PerVolumeStats per_volume_stats;
stringstream str;
@@ -326,7 +338,18 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
partition_desc->partition_key_value_evals(), scan_node_pool_.get(), state);
}
- reader_context_ = runtime_state_->io_mgr()->RegisterContext(mem_tracker());
+ RETURN_IF_ERROR(ClaimBufferReservation(state));
+ // We got the minimum reservation. Now try to get ideal reservation.
+ if (resource_profile_.min_reservation != ideal_scan_range_reservation_) {
+ bool increased = buffer_pool_client_.IncreaseReservation(
+ ideal_scan_range_reservation_ - resource_profile_.min_reservation);
+ VLOG_FILE << "Increasing reservation from minimum "
+ << resource_profile_.min_reservation << "B to ideal "
+ << ideal_scan_range_reservation_ << "B "
+ << (increased ? "succeeded" : "failed");
+ }
+
+ reader_context_ = runtime_state_->io_mgr()->RegisterContext();
// Initialize HdfsScanNode specific counters
// TODO: Revisit counters and move the counters specific to multi-threaded scans
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 70fbac2..3a9c37f 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -322,6 +322,9 @@ class HdfsScanNodeBase : public ScanNode {
friend class ScannerContext;
friend class HdfsScanner;
+ /// Ideal reservation to process each input split, computed by the planner.
+ const int64_t ideal_scan_range_reservation_;
+
/// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics.
const int min_max_tuple_id_;
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index be75677..f4948d9 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -86,17 +86,18 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
StopAndFinalizeCounters();
return Status::OK();
}
+ int64_t scanner_reservation = buffer_pool_client_.GetReservation();
if (needs_buffers) {
- RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(), scan_range_,
- 3 * io_mgr->max_buffer_size()));
+ RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(),
+ &buffer_pool_client_, scan_range_, scanner_reservation));
}
ScanRangeMetadata* metadata =
static_cast<ScanRangeMetadata*>(scan_range_->meta_data());
int64_t partition_id = metadata->partition_id;
HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
- scanner_ctx_.reset(new ScannerContext(
- runtime_state_, this, partition, scan_range_, filter_ctxs(),
- expr_results_pool()));
+ scanner_ctx_.reset(new ScannerContext(runtime_state_, this, &buffer_pool_client_,
+ partition, filter_ctxs(), expr_results_pool()));
+ scanner_ctx_->AddStream(scan_range_, scanner_reservation);
Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_);
if (!status.ok()) {
DCHECK(scanner_ == NULL);
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index f9d71e9..a95e47a 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -46,16 +46,6 @@ DECLARE_bool(skip_file_runtime_filtering);
using namespace impala;
using namespace impala::io;
-// Amount of memory that we approximate a scanner thread will use not including IoBuffers.
-// The memory used does not vary considerably between file formats (just a couple of MBs).
-// This value is conservative and taken from running against the tpch lineitem table.
-// TODO: revisit how we do this.
-const int SCANNER_THREAD_MEM_USAGE = 32 * 1024 * 1024;
-
-// Estimated upper bound on the compression ratio of compressed text files. Used to
-// estimate scanner thread memory usage.
-const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11;
-
// Amount of time to block waiting for GetNext() to release scanner threads between
// checking if a scanner thread should yield itself back to the global thread pool.
const int SCANNER_THREAD_WAIT_TIME_MS = 20;
@@ -63,11 +53,6 @@ const int SCANNER_THREAD_WAIT_TIME_MS = 20;
HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: HdfsScanNodeBase(pool, tnode, descs),
- ranges_issued_barrier_(1),
- scanner_thread_bytes_required_(0),
- done_(false),
- all_ranges_started_(false),
- thread_avail_cb_id_(-1),
max_num_scanner_threads_(CpuInfo::num_cores()) {
}
@@ -168,36 +153,6 @@ Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
Status HdfsScanNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
-
- // Compute the minimum bytes required to start a new thread. This is based on the
- // file format.
- // The higher the estimate, the less likely it is the query will fail but more likely
- // the query will be throttled when it does not need to be.
- // TODO: how many buffers should we estimate per range. The IoMgr will throttle down to
- // one but if there are already buffers queued before memory pressure was hit, we can't
- // reclaim that memory.
- if (per_type_files_[THdfsFileFormat::PARQUET].size() > 0) {
- // Parquet files require buffers per column
- scanner_thread_bytes_required_ =
- materialized_slots_.size() * 3 * runtime_state_->io_mgr()->max_buffer_size();
- } else {
- scanner_thread_bytes_required_ =
- 3 * runtime_state_->io_mgr()->max_buffer_size();
- }
- // scanner_thread_bytes_required_ now contains the IoBuffer requirement.
- // Next we add in the other memory the scanner thread will use.
- // e.g. decompression buffers, tuple buffers, etc.
- // For compressed text, we estimate this based on the file size (since the whole file
- // will need to be decompressed at once). For all other formats, we use a constant.
- // TODO: can we do something better?
- int64_t scanner_thread_mem_usage = SCANNER_THREAD_MEM_USAGE;
- for (HdfsFileDesc* file: per_type_files_[THdfsFileFormat::TEXT]) {
- if (file->file_compression != THdfsCompression::NONE) {
- int64_t bytes_required = file->file_length * COMPRESSED_TEXT_COMPRESSION_RATIO;
- scanner_thread_mem_usage = ::max(bytes_required, scanner_thread_mem_usage);
- }
- }
- scanner_thread_bytes_required_ += scanner_thread_mem_usage;
row_batches_get_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueueGetWaitTime");
row_batches_put_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueuePutWaitTime");
return Status::OK();
@@ -219,10 +174,9 @@ Status HdfsScanNode::Open(RuntimeState* state) {
max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads;
}
DCHECK_GT(max_num_scanner_threads_, 0);
-
+ spare_reservation_.Store(buffer_pool_client_.GetReservation());
thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
-
return Status::OK();
}
@@ -263,37 +217,28 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges,
return Status::OK();
}
-// For controlling the amount of memory used for scanners, we approximate the
-// scanner mem usage based on scanner_thread_bytes_required_, rather than the
-// consumption in the scan node's mem tracker. The problem with the scan node
-// trackers is that it does not account for the memory the scanner will use.
-// For example, if there is 110 MB of memory left (based on the mem tracker)
-// and we estimate that a scanner will use 100MB, we want to make sure to only
-// start up one additional thread. However, after starting the first thread, the
-// mem tracker value will not change immediately (it takes some time before the
-// scanner is running and starts using memory). Therefore we just use the estimate
-// based on the number of running scanner threads.
-bool HdfsScanNode::EnoughMemoryForScannerThread(bool new_thread) {
- int64_t committed_scanner_mem =
- active_scanner_thread_counter_.value() * scanner_thread_bytes_required_;
- int64_t tracker_consumption = mem_tracker()->consumption();
- int64_t est_additional_scanner_mem = committed_scanner_mem - tracker_consumption;
- if (est_additional_scanner_mem < 0) {
- // This is the case where our estimate was too low. Expand the estimate based
- // on the usage.
- int64_t avg_consumption =
- tracker_consumption / active_scanner_thread_counter_.value();
- // Take the average and expand it by 50%. Some scanners will not have hit their
- // steady state mem usage yet.
- // TODO: how can we scale down if we've overestimated.
- // TODO: better heuristic?
- scanner_thread_bytes_required_ = static_cast<int64_t>(avg_consumption * 1.5);
- est_additional_scanner_mem = 0;
- }
+bool HdfsScanNode::EnoughReservationForExtraThread(const unique_lock<mutex>& lock) {
+ DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+ if (spare_reservation_.Load() >= ideal_scan_range_reservation_) return true;
+ int64_t increase = ideal_scan_range_reservation_ - spare_reservation_.Load();
+ if (!buffer_pool_client_.IncreaseReservation(increase)) return false;
+ spare_reservation_.Add(increase);
+ return true;
+}
- // If we are starting a new thread, take that into account now.
- if (new_thread) est_additional_scanner_mem += scanner_thread_bytes_required_;
- return est_additional_scanner_mem < mem_tracker()->SpareCapacity();
+int64_t HdfsScanNode::DeductReservationForScannerThread(const unique_lock<mutex>& lock,
+ bool first_thread) {
+ DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+ int64_t amount;
+ if (first_thread) {
+ amount = spare_reservation_.Load() >= ideal_scan_range_reservation_ ?
+ ideal_scan_range_reservation_ : resource_profile_.min_reservation;
+ } else {
+ amount = ideal_scan_range_reservation_;
+ }
+ int64_t remainder = spare_reservation_.Add(-amount);
+ DCHECK_GE(remainder, 0);
+ return amount;
}
void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
@@ -323,36 +268,45 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
// TODO: This still leans heavily on starvation-free locks, come up with a more
// correct way to communicate between this method and ScannerThreadHelper
unique_lock<mutex> lock(lock_);
+
+ const int64_t num_active_scanner_threads = active_scanner_thread_counter_.value();
+ const bool first_thread = num_active_scanner_threads == 0;
// Cases 1, 2, 3.
if (done_ || all_ranges_started_ ||
- active_scanner_thread_counter_.value() >= progress_.remaining()) {
+ num_active_scanner_threads >= progress_.remaining()) {
break;
}
// Cases 5 and 6.
- if (active_scanner_thread_counter_.value() > 0 &&
+ if (!first_thread &&
(materialized_row_batches_->Size() >= max_materialized_row_batches_ ||
- !EnoughMemoryForScannerThread(true))) {
+ !EnoughReservationForExtraThread(lock))) {
break;
}
// Case 7 and 8.
- bool is_reserved = false;
- if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ ||
- !pool->TryAcquireThreadToken(&is_reserved)) {
+ if (num_active_scanner_threads >= max_num_scanner_threads_ ||
+ !pool->TryAcquireThreadToken()) {
break;
}
+ // Deduct the reservation. We haven't dropped the lock since the
+ // first_thread/EnoughReservationForExtraThread() checks so spare reservation
+ // must be available.
+ int64_t scanner_thread_reservation =
+ DeductReservationForScannerThread(lock, first_thread);
COUNTER_ADD(&active_scanner_thread_counter_, 1);
string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
PrintId(runtime_state_->fragment_instance_id()), id(),
num_scanner_threads_started_counter_->value());
-
- auto fn = [this]() { this->ScannerThread(); };
+ auto fn = [this, scanner_thread_reservation]() {
+ this->ScannerThread(scanner_thread_reservation);
+ };
std::unique_ptr<Thread> t;
status =
Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
if (!status.ok()) {
+ ReturnReservationFromScannerThread(scanner_thread_reservation);
COUNTER_ADD(&active_scanner_thread_counter_, -1);
// Release the token and skip running callbacks to find a replacement. Skipping
// serves two purposes. First, it prevents a mutual recursion between this function
@@ -373,7 +327,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
}
}
-void HdfsScanNode::ScannerThread() {
+void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) {
SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
DiskIoMgr* io_mgr = runtime_state_->io_mgr();
@@ -402,8 +356,7 @@ void HdfsScanNode::ScannerThread() {
// this thread.
unique_lock<mutex> l(lock_);
if (active_scanner_thread_counter_.value() > 1) {
- if (runtime_state_->resource_pool()->optional_exceeded() ||
- !EnoughMemoryForScannerThread(false)) {
+ if (runtime_state_->resource_pool()->optional_exceeded()) {
// We can't break here. We need to update the counter with the lock held or else
// all threads might see active_scanner_thread_counter_.value > 1
COUNTER_ADD(&active_scanner_thread_counter_, -1);
@@ -438,12 +391,13 @@ void HdfsScanNode::ScannerThread() {
if (status.ok() && scan_range != nullptr) {
if (needs_buffers) {
status = io_mgr->AllocateBuffersForRange(
- reader_context_.get(), scan_range, 3 * io_mgr->max_buffer_size());
+ reader_context_.get(), &buffer_pool_client_, scan_range,
+ scanner_thread_reservation);
}
if (status.ok()) {
// Got a scan range. Process the range end to end (in this thread).
status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
- &expr_results_pool, scan_range);
+ &expr_results_pool, scan_range, scanner_thread_reservation);
}
}
@@ -483,6 +437,7 @@ void HdfsScanNode::ScannerThread() {
COUNTER_ADD(&active_scanner_thread_counter_, -1);
exit:
+ ReturnReservationFromScannerThread(scanner_thread_reservation);
runtime_state_->resource_pool()->ReleaseThreadToken(false);
for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
filter_mem_pool.FreeAll();
@@ -490,7 +445,8 @@ exit:
}
Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
- MemPool* expr_results_pool, ScanRange* scan_range) {
+ MemPool* expr_results_pool, ScanRange* scan_range,
+ int64_t scanner_thread_reservation) {
DCHECK(scan_range != NULL);
ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data());
@@ -515,8 +471,9 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
return Status::OK();
}
- ScannerContext context(
- runtime_state_, this, partition, scan_range, filter_ctxs, expr_results_pool);
+ ScannerContext context(runtime_state_, this, &buffer_pool_client_, partition,
+ filter_ctxs, expr_results_pool);
+ context.AddStream(scan_range, scanner_thread_reservation);
scoped_ptr<HdfsScanner> scanner;
Status status = CreateAndOpenScanner(partition, &context, &scanner);
if (!status.ok()) {
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index a1c97cf..81e826e 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -27,6 +27,7 @@
#include <boost/scoped_ptr.hpp>
#include <boost/thread/mutex.hpp>
+#include "common/atomic.h"
#include "exec/filter-context.h"
#include "exec/hdfs-scan-node-base.h"
#include "runtime/io/disk-io-mgr.h"
@@ -58,8 +59,14 @@ class TPlanNode;
/// 5. The scanner finishes the scan range and informs the scan node so it can track
/// end of stream.
///
-/// TODO: This class allocates a bunch of small utility objects that should be
-/// recycled.
+/// Buffer management:
+/// ------------------
+/// The different scanner threads all allocate I/O buffers from the node's Buffer Pool
+/// client. The scan node ensures that enough reservation is available to start a
+/// scanner thread before launching each one with (see
+/// EnoughReservationForExtraThread()), after which the scanner thread is responsible
+/// for staying within the reservation handed off to it.
+///
/// TODO: Remove this class once the fragment-based multi-threaded execution is
/// fully functional.
class HdfsScanNode : public HdfsScanNodeBase {
@@ -100,12 +107,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
private:
/// Released when initial ranges are issued in the first call to GetNext().
- CountingBarrier ranges_issued_barrier_;
-
- /// The estimated memory required to start up a new scanner thread. If the memory
- /// left (due to limits) is less than this value, we won't start up optional
- /// scanner threads.
- int64_t scanner_thread_bytes_required_;
+ CountingBarrier ranges_issued_barrier_{1};
/// Thread group for all scanner worker threads
ThreadGroup scanner_threads_;
@@ -130,16 +132,16 @@ class HdfsScanNode : public HdfsScanNodeBase {
/// are finished, an error/cancellation occurred, or the limit was reached.
/// Setting this to true triggers the scanner threads to clean up.
/// This should not be explicitly set. Instead, call SetDone().
- bool done_;
+ bool done_ = false;
/// Set to true if all ranges have started. Some of the ranges may still be in flight
/// being processed by scanner threads, but no new ScannerThreads should be started.
- bool all_ranges_started_;
+ bool all_ranges_started_ = false;
/// The id of the callback added to the thread resource manager when thread token
/// is available. Used to remove the callback before this scan node is destroyed.
/// -1 if no callback is registered.
- int thread_avail_cb_id_;
+ int thread_avail_cb_id_ = -1;
/// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query
/// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads
@@ -147,6 +149,14 @@ class HdfsScanNode : public HdfsScanNodeBase {
/// the number of cores.
int max_num_scanner_threads_;
+ /// Amount of the 'buffer_pool_client_' reservation that is not allocated to scanner
+ /// threads. Doled out to scanner threads when they are started and returned when
+ /// those threads no longer need it. Can be atomically incremented without holding
+ /// 'lock_' but 'lock_' is held when decrementing to ensure that the check for
+ /// reservation and the actual deduction are atomic with respect to other threads
+ /// trying to claim reservation.
+ AtomicInt64 spare_reservation_{0};
+
/// The wait time for fetching a row batch from the row batch queue.
RuntimeProfile::Counter* row_batches_get_timer_;
@@ -160,21 +170,35 @@ class HdfsScanNode : public HdfsScanNodeBase {
/// Main function for scanner thread. This thread pulls the next range to be
/// processed from the IoMgr and then processes the entire range end to end.
/// This thread terminates when all scan ranges are complete or an error occurred.
- void ScannerThread();
+ /// The caller must have reserved 'scanner_thread_reservation' bytes of memory for
+ /// this thread with DeductReservationForScannerThread().
+ void ScannerThread(int64_t scanner_thread_reservation);
/// Process the entire scan range with a new scanner object. Executed in scanner
/// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows
/// in this split.
Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
- MemPool* expr_results_pool, io::ScanRange* scan_range) WARN_UNUSED_RESULT;
-
- /// Returns true if there is enough memory (against the mem tracker limits) to
- /// have a scanner thread.
- /// If new_thread is true, the calculation is for starting a new scanner thread.
- /// If false, it determines whether there's adequate memory for the existing
- /// set of scanner threads.
- /// lock_ must be taken before calling this.
- bool EnoughMemoryForScannerThread(bool new_thread);
+ MemPool* expr_results_pool, io::ScanRange* scan_range,
+ int64_t scanner_thread_reservation) WARN_UNUSED_RESULT;
+
+ /// Return true if there is enough reservation to start an extra scanner thread.
+ /// Tries to increase reservation if enough is not already available in
+ /// 'spare_reservation_'. 'lock_' must be held via 'lock'
+ bool EnoughReservationForExtraThread(const boost::unique_lock<boost::mutex>& lock);
+
+ /// Deduct reservation to start a new scanner thread from 'spare_reservation_'. If
+ /// 'first_thread' is true, this is the first thread to be started and only the
+ /// minimum reservation is required to be available. Otherwise
+ /// EnoughReservationForExtra() thread must have returned true in the current
+ /// critical section so that 'ideal_scan_range_bytes_' is available for the extra
+ /// thread. Returns the amount deducted. 'lock_' must be held via 'lock'.
+ int64_t DeductReservationForScannerThread(const boost::unique_lock<boost::mutex>& lock,
+ bool first_thread);
+
+ /// Called by scanner thread to return or all of its reservation that is not needed.
+ void ReturnReservationFromScannerThread(int64_t bytes) {
+ spare_reservation_.Add(bytes);
+ }
/// Checks for eos conditions and returns batches from materialized_row_batches_.
Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos)
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 7cf89ba..5afb4e5 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -30,6 +30,7 @@
#include "exec/scanner-context.inline.h"
#include "rpc/thrift-util.h"
#include "runtime/collection-value-builder.h"
+#include "runtime/exec-env.h"
#include "runtime/tuple-row.h"
#include "runtime/tuple.h"
#include "runtime/runtime-state.h"
@@ -47,6 +48,10 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
"When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
"be converted from UTC to local time. Writes are unaffected.");
+// Max dictionary page header size in bytes. This is an estimate and only needs to be an
+// upper bound.
+static const int MAX_DICT_HEADER_SIZE = 100;
+
// Max data page header size in bytes. This is an estimate and only needs to be an upper
// bound. It is theoretically possible to have a page header of any size due to string
// value statistics, but in practice we'll have trouble reading string values this large.
@@ -66,6 +71,8 @@ static int debug_count = 0;
#define SHOULD_TRIGGER_DEBUG_ACTION(x) (false)
#endif
+using namespace impala::io;
+
namespace impala {
const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
@@ -834,8 +841,99 @@ static bool RequiresSkippedDictionaryHeaderCheck(
return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
}
+Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
+ const parquet::ColumnChunk& col_chunk, int row_group_idx) {
+ num_buffered_values_ = 0;
+ data_ = nullptr;
+ data_end_ = nullptr;
+ stream_ = nullptr;
+ io_reservation_ = 0;
+ metadata_ = &col_chunk.meta_data;
+ num_values_read_ = 0;
+ def_level_ = -1;
+ // See ColumnReader constructor.
+ rep_level_ = max_rep_level() == 0 ? 0 : -1;
+ pos_current_value_ = -1;
+
+ if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
+ RETURN_IF_ERROR(Codec::CreateDecompressor(
+ nullptr, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_));
+ }
+ int64_t col_start = col_chunk.meta_data.data_page_offset;
+
+ RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_,
+ parent_->filename(), row_group_idx, col_idx(), schema_element(),
+ parent_->state_));
+
+ if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+ // Already validated in ValidateColumnOffsets()
+ DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
+ col_start = col_chunk.meta_data.dictionary_page_offset;
+ }
+ int64_t col_len = col_chunk.meta_data.total_compressed_size;
+ if (col_len <= 0) {
+ return Status(Substitute("File '$0' contains invalid column chunk size: $1",
+ filename(), col_len));
+ }
+ int64_t col_end = col_start + col_len;
+
+ // Already validated in ValidateColumnOffsets()
+ DCHECK_GT(col_end, 0);
+ DCHECK_LT(col_end, file_desc.file_length);
+ const ParquetFileVersion& file_version = parent_->file_version_;
+ if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 9)) {
+ // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
+ // dictionary page header size in total_compressed_size and total_uncompressed_size
+ // (see IMPALA-694). We pad col_len to compensate.
+ int64_t bytes_remaining = file_desc.file_length - col_end;
+ int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
+ col_len += pad;
+ }
+
+ // TODO: this will need to change when we have co-located files and the columns
+ // are different files.
+ if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
+ return Status(Substitute("Expected parquet column file path '$0' to match "
+ "filename '$1'", col_chunk.file_path, filename()));
+ }
+
+ const ScanRange* metadata_range = parent_->metadata_range_;
+ int64_t partition_id = parent_->context_->partition_descriptor()->id();
+ const ScanRange* split_range =
+ static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
+ // Determine if the column is completely contained within a local split.
+ bool col_range_local = split_range->expected_local()
+ && col_start >= split_range->offset()
+ && col_end <= split_range->offset() + split_range->len();
+ scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
+ filename(), col_len, col_start, partition_id, split_range->disk_id(),
+ col_range_local,
+ BufferOpts(split_range->try_cache(), file_desc.mtime));
+ ClearDictionaryDecoder();
+ return Status::OK();
+}
+
+Status BaseScalarColumnReader::StartScan() {
+ DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
+ DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
+ ScannerContext* context = parent_->context_;
+ DCHECK_GT(io_reservation_, 0);
+ bool needs_buffers;
+ RETURN_IF_ERROR(io_mgr->StartScanRange(
+ parent_->scan_node_->reader_context(), scan_range_, &needs_buffers));
+ if (needs_buffers) {
+ RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
+ parent_->scan_node_->reader_context(), context->bp_client(),
+ scan_range_, io_reservation_));
+ }
+ stream_ = parent_->context_->AddStream(scan_range_, io_reservation_);
+ DCHECK(stream_ != nullptr);
+ return Status::OK();
+}
+
Status BaseScalarColumnReader::ReadPageHeader(bool peek,
parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* eos) {
+ DCHECK(stream_ != nullptr);
*eos = false;
uint8_t* buffer;
@@ -919,7 +1017,7 @@ Status BaseScalarColumnReader::InitDictionary() {
bool eos;
parquet::PageHeader next_page_header;
uint32_t next_header_size;
-
+ DCHECK(stream_ != nullptr);
DCHECK(!HasDictionaryDecoder());
RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header,
@@ -1031,6 +1129,14 @@ Status BaseScalarColumnReader::InitDictionary() {
return Status::OK();
}
+Status BaseScalarColumnReader::InitDictionaries(
+ const vector<BaseScalarColumnReader*> readers) {
+ for (BaseScalarColumnReader* reader : readers) {
+ RETURN_IF_ERROR(reader->InitDictionary());
+ }
+ return Status::OK();
+}
+
Status BaseScalarColumnReader::ReadDataPage() {
// We're about to move to the next data page. The previous data page is
// now complete, free up any memory allocated for it. If the data page contained
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 86ca239..f19d40a 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -322,42 +322,29 @@ class BaseScalarColumnReader : public ParquetColumnReader {
BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
const SlotDescriptor* slot_desc)
: ParquetColumnReader(parent, node, slot_desc),
- data_(NULL),
- data_end_(NULL),
- def_levels_(true),
- rep_levels_(false),
- page_encoding_(parquet::Encoding::PLAIN_DICTIONARY),
- num_buffered_values_(0),
- num_values_read_(0),
- metadata_(NULL),
- stream_(NULL),
data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
}
virtual ~BaseScalarColumnReader() { }
- /// This is called once for each row group in the file.
- Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) {
- DCHECK(stream != NULL);
- DCHECK(metadata != NULL);
-
- num_buffered_values_ = 0;
- data_ = NULL;
- data_end_ = NULL;
- stream_ = stream;
- metadata_ = metadata;
- num_values_read_ = 0;
- def_level_ = -1;
- // See ColumnReader constructor.
- rep_level_ = max_rep_level() == 0 ? 0 : -1;
- pos_current_value_ = -1;
-
- if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
- RETURN_IF_ERROR(Codec::CreateDecompressor(
- NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_));
- }
- ClearDictionaryDecoder();
+ /// Resets the reader for each row group in the file and creates the scan
+ /// range for the column, but does not start it. To start scanning,
+ /// set_io_reservation() must be called to assign reservation to this
+ /// column, followed by StartScan().
+ Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk,
+ int row_group_idx);
+
+ /// Starts the column scan range. The reader must be Reset() and have a
+ /// reservation assigned via set_io_reservation(). This must be called
+ /// before any of the column data can be read (including dictionary and
+ /// data pages). Returns an error status if there was an error starting the
+ /// scan or allocating buffers for it.
+ Status StartScan();
+
+ /// Helper to start scans for multiple columns at once.
+ static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) {
+ for (BaseScalarColumnReader* reader : readers) RETURN_IF_ERROR(reader->StartScan());
return Status::OK();
}
@@ -372,22 +359,27 @@ class BaseScalarColumnReader : public ParquetColumnReader {
if (dict_decoder != nullptr) dict_decoder->Close();
}
+ io::ScanRange* scan_range() const { return scan_range_; }
int64_t total_len() const { return metadata_->total_compressed_size; }
int col_idx() const { return node_.col_idx; }
THdfsCompression::type codec() const {
if (metadata_ == NULL) return THdfsCompression::NONE;
return PARQUET_TO_IMPALA_CODEC[metadata_->codec];
}
+ void set_io_reservation(int bytes) { io_reservation_ = bytes; }
/// Reads the next definition and repetition levels for this column. Initializes the
/// next data page if necessary.
virtual bool NextLevels() { return NextLevels<true>(); }
- // Check the data stream to see if there is a dictionary page. If there is,
- // use that page to initialize dict_decoder_ and advance the data stream
- // past the dictionary page.
+ /// Check the data stream to see if there is a dictionary page. If there is,
+ /// use that page to initialize dict_decoder_ and advance the data stream
+ /// past the dictionary page.
Status InitDictionary();
+ /// Convenience function to initialize multiple dictionaries.
+ static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> readers);
+
// Returns the dictionary or NULL if the dictionary doesn't exist
virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; }
@@ -413,33 +405,45 @@ class BaseScalarColumnReader : public ParquetColumnReader {
// fit in as few cache lines as possible.
/// Pointer to start of next value in data page
- uint8_t* data_;
+ uint8_t* data_ = nullptr;
/// End of the data page.
- const uint8_t* data_end_;
+ const uint8_t* data_end_ = nullptr;
/// Decoder for definition levels.
- ParquetLevelDecoder def_levels_;
+ ParquetLevelDecoder def_levels_{true};
/// Decoder for repetition levels.
- ParquetLevelDecoder rep_levels_;
+ ParquetLevelDecoder rep_levels_{false};
/// Page encoding for values of the current data page. Cached here for perf. Set in
/// InitDataPage().
- parquet::Encoding::type page_encoding_;
+ parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
/// Num values remaining in the current data page
- int num_buffered_values_;
+ int num_buffered_values_ = 0;
// Less frequently used members that are not accessed in inner loop should go below
// here so they do not occupy precious cache line space.
/// The number of values seen so far. Updated per data page.
- int64_t num_values_read_;
+ int64_t num_values_read_ = 0;
+
+ /// Metadata for the column for the current row group.
+ const parquet::ColumnMetaData* metadata_ = nullptr;
- const parquet::ColumnMetaData* metadata_;
boost::scoped_ptr<Codec> decompressor_;
- ScannerContext::Stream* stream_;
+
+ /// The scan range for the column's data. Initialized for each row group by Reset().
+ io::ScanRange* scan_range_ = nullptr;
+
+ // Stream used to read data from 'scan_range_'. Initialized by StartScan().
+ ScannerContext::Stream* stream_ = nullptr;
+
+ /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set
+ /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group
+ /// by Reset().
+ int64_t io_reservation_ = 0;
/// Pool to allocate storage for data pages from - either decompression buffers for
/// compressed data pages or copies of the data page with var-len data to attach to
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index 0abf82f..c669e65 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -41,14 +41,15 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024;
const int64_t ScannerContext::Stream::OUTPUT_BUFFER_BYTES_LEFT_INIT;
ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
- HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range,
- const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool)
+ BufferPool::ClientHandle* bp_client, HdfsPartitionDescriptor* partition_desc,
+ const vector<FilterContext>& filter_ctxs,
+ MemPool* expr_results_pool)
: state_(state),
scan_node_(scan_node),
+ bp_client_(bp_client),
partition_desc_(partition_desc),
filter_ctxs_(filter_ctxs),
expr_results_pool_(expr_results_pool) {
- AddStream(scan_range);
}
ScannerContext::~ScannerContext() {
@@ -66,19 +67,20 @@ void ScannerContext::ClearStreams() {
}
ScannerContext::Stream::Stream(ScannerContext* parent, ScanRange* scan_range,
- const HdfsFileDesc* file_desc)
+ int64_t reservation, const HdfsFileDesc* file_desc)
: parent_(parent),
scan_range_(scan_range),
file_desc_(file_desc),
+ reservation_(reservation),
file_len_(file_desc->file_length),
next_read_past_size_bytes_(INIT_READ_PAST_SIZE_BYTES),
boundary_pool_(new MemPool(parent->scan_node_->mem_tracker())),
boundary_buffer_(new StringBuffer(boundary_pool_.get())) {
}
-ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) {
- streams_.emplace_back(new Stream(
- this, range, scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
+ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range, int64_t reservation) {
+ streams_.emplace_back(new Stream(this, range, reservation,
+ scan_node_->GetFileDesc(partition_desc_->id(), range->file())));
return streams_.back().get();
}
@@ -101,7 +103,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool done) {
Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
DCHECK_EQ(0, io_buffer_bytes_left_);
- DiskIoMgr* io_mgr = parent_->state_->io_mgr();
+ DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
if (UNLIKELY(parent_->cancelled())) return Status::CANCELLED;
if (io_buffer_ != nullptr) ReturnIoBuffer();
@@ -134,6 +136,7 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
read_past_buffer_size = ::max(read_past_buffer_size, read_past_size);
read_past_buffer_size = ::min(read_past_buffer_size, file_bytes_remaining);
read_past_buffer_size = ::min(read_past_buffer_size, max_buffer_size);
+ read_past_buffer_size = ::min(read_past_buffer_size, reservation_);
// We're reading past the scan range. Be careful not to read past the end of file.
DCHECK_GE(read_past_buffer_size, 0);
if (read_past_buffer_size == 0) {
@@ -150,9 +153,14 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
if (needs_buffers) {
// Allocate fresh buffers. The buffers for 'scan_range_' should be released now
// since we hit EOS.
+ if (reservation_ < io_mgr->min_buffer_size()) {
+ return Status(Substitute("Could not read past end of scan range in file '$0'. "
+ "Reservation provided $1 was < the minimum I/O buffer size",
+ reservation_, io_mgr->min_buffer_size()));
+ }
RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
- parent_->scan_node_->reader_context(), range,
- 3 * io_mgr->max_buffer_size()));
+ parent_->scan_node_->reader_context(), parent_->bp_client_, range,
+ reservation_));
}
RETURN_IF_ERROR(range->GetNext(&io_buffer_));
DCHECK(io_buffer_->eosr());
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/exec/scanner-context.h
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h
index a131d3f..6292486 100644
--- a/be/src/exec/scanner-context.h
+++ b/be/src/exec/scanner-context.h
@@ -27,6 +27,7 @@
#include "common/compiler-util.h"
#include "common/status.h"
#include "exec/filter-context.h"
+#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/io/request-ranges.h"
namespace impala {
@@ -84,10 +85,12 @@ class TupleRow;
class ScannerContext {
public:
/// Create a scanner context with the parent scan_node (where materialized row batches
- /// get pushed to) and the scan range to process.
- /// This context starts with 1 stream.
- ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*,
- io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs,
+ /// get pushed to) and the scan range to process. Buffers are allocated using
+ /// 'bp_client'.
+ ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
+ BufferPool::ClientHandle* bp_client,
+ HdfsPartitionDescriptor* partition_desc,
+ const std::vector<FilterContext>& filter_ctxs,
MemPool* expr_results_pool);
/// Destructor verifies that all stream objects have been released.
~ScannerContext();
@@ -150,6 +153,7 @@ class ScannerContext {
const char* filename() { return scan_range_->file(); }
const io::ScanRange* scan_range() { return scan_range_; }
const HdfsFileDesc* file_desc() { return file_desc_; }
+ int64_t reservation() const { return reservation_; }
/// Returns the buffer's current offset in the file.
int64_t file_offset() const { return scan_range_->offset() + total_bytes_returned_; }
@@ -211,9 +215,15 @@ class ScannerContext {
private:
friend class ScannerContext;
- ScannerContext* parent_;
- io::ScanRange* scan_range_;
- const HdfsFileDesc* file_desc_;
+ ScannerContext* const parent_;
+ io::ScanRange* const scan_range_;
+ const HdfsFileDesc* const file_desc_;
+
+ /// Reservation given to this stream for allocating I/O buffers. The reservation is
+ /// shared with 'scan_range_', so the context must be careful not to use this until
+ /// all of 'scan_ranges_'s buffers have been freed. Must be >= the minimum IoMgr
+ /// buffer size to allow reading past the end of 'scan_range_'.
+ const int64_t reservation_;
/// Total number of bytes returned from GetBytes()
int64_t total_bytes_returned_ = 0;
@@ -272,7 +282,8 @@ class ScannerContext {
/// output_buffer_bytes_left_ will be set to something else.
static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
- Stream(ScannerContext* parent, io::ScanRange* scan_range,
+ /// Private constructor. See AddStream() for public API.
+ Stream(ScannerContext* parent, io::ScanRange* scan_range, int64_t reservation,
const HdfsFileDesc* file_desc);
/// GetBytes helper to handle the slow path.
@@ -355,24 +366,37 @@ class ScannerContext {
/// size to 0.
void ClearStreams();
- /// Add a stream to this ScannerContext for 'range'. The stream is owned by this
- /// context.
- Stream* AddStream(io::ScanRange* range);
+ /// Add a stream to this ScannerContext for 'range'. 'range' must already have any
+ /// buffers that it needs allocated. 'reservation' is the amount of reservation that
+ /// is given to this stream for allocating I/O buffers. The reservation is shared with
+ /// 'range', so the context must be careful not to use this until all of 'range's
+ /// buffers have been freed. Must be >= the minimum IoMgr buffer size o allow reading
+ /// past the end of 'range'.
+ ///
+ /// Returns the added stream. The returned stream is owned by this context.
+ Stream* AddStream(io::ScanRange* range, int64_t reservation);
/// Returns true if RuntimeState::is_cancelled() is true, or if scan node is not
/// multi-threaded and is done (finished, cancelled or reached it's limit).
/// In all other cases returns false.
bool cancelled() const;
- HdfsPartitionDescriptor* partition_descriptor() { return partition_desc_; }
+ BufferPool::ClientHandle* bp_client() const { return bp_client_; }
+ HdfsPartitionDescriptor* partition_descriptor() const { return partition_desc_; }
const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; }
MemPool* expr_results_pool() const { return expr_results_pool_; }
private:
friend class Stream;
- RuntimeState* state_;
- HdfsScanNodeBase* scan_node_;
- HdfsPartitionDescriptor* partition_desc_;
+ RuntimeState* const state_;
+ HdfsScanNodeBase* const scan_node_;
+
+ /// Buffer pool client used to allocate I/O buffers. This is accessed by multiple
+ /// threads in the multi-threaded scan node, so those threads must take care to only
+ /// call thread-safe BufferPool methods with this client.
+ BufferPool::ClientHandle* const bp_client_;
+
+ HdfsPartitionDescriptor* const partition_desc_;
/// Vector of streams. Non-columnar formats will always have one stream per context.
std::vector<std::unique_ptr<Stream>> streams_;
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index c46c5ea..e441402 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -522,15 +522,15 @@ TEST_F(ReservationTrackerTest, TransferReservation) {
TEST_F(ReservationTrackerTest, ReservationUtil) {
const int64_t MEG = 1024 * 1024;
const int64_t GIG = 1024 * 1024 * 1024;
- EXPECT_EQ(75 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
+ EXPECT_EQ(32 * MEG, ReservationUtil::RESERVATION_MEM_MIN_REMAINING);
EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(0));
EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(-1));
- EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(75 * MEG));
+ EXPECT_EQ(0, ReservationUtil::GetReservationLimitFromMemLimit(32 * MEG));
EXPECT_EQ(8 * GIG, ReservationUtil::GetReservationLimitFromMemLimit(10 * GIG));
- EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
- EXPECT_EQ(75 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
+ EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(0));
+ EXPECT_EQ(32 * MEG, ReservationUtil::GetMinMemLimitFromReservation(-1));
EXPECT_EQ(500 * MEG, ReservationUtil::GetMinMemLimitFromReservation(400 * MEG));
EXPECT_EQ(5 * GIG, ReservationUtil::GetMinMemLimitFromReservation(4 * GIG));
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/runtime/bufferpool/reservation-util.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-util.cc b/be/src/runtime/bufferpool/reservation-util.cc
index 85718ab..a27ab9d 100644
--- a/be/src/runtime/bufferpool/reservation-util.cc
+++ b/be/src/runtime/bufferpool/reservation-util.cc
@@ -24,7 +24,7 @@ namespace impala {
// Most operators that accumulate memory use reservations, so the majority of memory
// should be allocated to buffer reservations, as a heuristic.
const double ReservationUtil::RESERVATION_MEM_FRACTION = 0.8;
-const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 75 * 1024 * 1024;
+const int64_t ReservationUtil::RESERVATION_MEM_MIN_REMAINING = 32 * 1024 * 1024;
int64_t ReservationUtil::GetReservationLimitFromMemLimit(int64_t mem_limit) {
int64_t max_reservation = std::min<int64_t>(
http://git-wip-us.apache.org/repos/asf/impala/blob/24b4ed0b/be/src/runtime/io/disk-io-mgr-stress-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress-test.cc b/be/src/runtime/io/disk-io-mgr-stress-test.cc
index 0e41a6f..2ec1d09 100644
--- a/be/src/runtime/io/disk-io-mgr-stress-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress-test.cc
@@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-#include "runtime/io/disk-io-mgr-stress.h"
+#include <gflags/gflags.h>
#include "common/init.h"
+#include "runtime/io/disk-io-mgr-stress.h"
+#include "common/init.h"
#include "runtime/test-env.h"
#include "service/fe-support.h"
#include "util/string-parser.h"
@@ -31,34 +33,32 @@ using namespace impala::io;
// can be passed to control how long to run this test (0 for forever).
// TODO: make these configurable once we decide how to run BE tests with args
-const int DEFAULT_DURATION_SEC = 1;
+constexpr int DEFAULT_DURATION_SEC = 1;
const int NUM_DISKS = 5;
const int NUM_THREADS_PER_DISK = 5;
const int NUM_CLIENTS = 10;
const bool TEST_CANCELLATION = true;
+const int64_t BUFFER_POOL_CAPACITY = 1024L * 1024L * 1024L * 4L;
+
+DEFINE_int64(duration_sec, DEFAULT_DURATION_SEC,
+ "Disk I/O Manager stress test duration in seconds. 0 means run indefinitely.");
int main(int argc, char** argv) {
- InitCommonRuntime(argc, argv, true, TestInfo::BE_TEST);
- InitFeSupport();
- TestEnv test_env;
- ABORT_IF_ERROR(test_env.Init());
- int duration_sec = DEFAULT_DURATION_SEC;
+ impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+ impala::InitFeSupport();
- if (argc == 2) {
- StringParser::ParseResult status;
- duration_sec = StringParser::StringToInt<int>(argv[1], strlen(argv[1]), &status);
- if (status != StringParser::PARSE_SUCCESS) {
- printf("Invalid arg: %s\n", argv[1]);
- return 1;
- }
- }
- if (duration_sec != 0) {
- printf("Running stress test for %d seconds.\n", duration_sec);
+ if (FLAGS_duration_sec != 0) {
+ printf("Running stress test for %ld seconds.\n", FLAGS_duration_sec);
} else {
printf("Running stress test indefinitely.\n");
}
- DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION);
- test.Run(duration_sec);
+ TestEnv test_env;
+ // Tests try to allocate arbitrarily small buffers. Ensure Buffer Pool allows it.
+ test_env.SetBufferPoolArgs(DiskIoMgrStress::MIN_READ_BUFFER_SIZE, BUFFER_POOL_CAPACITY);
+ Status status = test_env.Init();
+ CHECK(status.ok()) << status.GetDetail();
+ DiskIoMgrStress test(NUM_DISKS, NUM_THREADS_PER_DISK, NUM_CLIENTS, TEST_CANCELLATION);
+ test.Run(FLAGS_duration_sec);
return 0;
}