You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/03 06:25:34 UTC
[11/11] impala git commit: Revert IMPALA-4835 and dependent changes
Revert IMPALA-4835 and dependent changes
Revert "IMPALA-6585: increase test_low_mem_limit_q21 limit"
This reverts commit 25bcb258dfd712f1514cf188206667a5e6be0e26.
Revert "IMPALA-6588: don't add empty list of ranges in text scan"
This reverts commit d57fbec6f67b83227b4c6117476da8f7d75fc4f6.
Revert "IMPALA-4835: Part 3: switch I/O buffers to buffer pool"
This reverts commit 24b4ed0b29a44090350e630d625291c01b753a36.
Revert "IMPALA-4835: Part 2: Allocate scan range buffers upfront"
This reverts commit 5699b59d0c5cbe37e888a367adb42fa12dfb0916.
Revert "IMPALA-4835: Part 1: simplify I/O mgr mem mgmt and cancellation"
This reverts commit 65680dc42107db4ff2273c635cedf83d20f0ea94.
Change-Id: Ie5ca451cd96602886b0a8ecaa846957df0269cbb
Reviewed-on: http://gerrit.cloudera.org:8080/9480
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/161cbe30
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/161cbe30
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/161cbe30
Branch: refs/heads/master
Commit: 161cbe30ff0cba70e48fe48d41e0d76ec5273264
Parents: a2c9a7c
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Mar 2 16:09:25 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Mar 3 04:22:12 2018 +0000
----------------------------------------------------------------------
be/src/exec/CMakeLists.txt | 1 -
be/src/exec/base-sequence-scanner.cc | 1 -
be/src/exec/base-sequence-scanner.h | 3 +-
be/src/exec/hdfs-lzo-text-scanner.cc | 1 -
be/src/exec/hdfs-parquet-scanner-test.cc | 96 -
be/src/exec/hdfs-parquet-scanner.cc | 234 +--
be/src/exec/hdfs-parquet-scanner.h | 44 +-
be/src/exec/hdfs-scan-node-base.cc | 95 +-
be/src/exec/hdfs-scan-node-base.h | 3 -
be/src/exec/hdfs-scan-node-mt.cc | 20 +-
be/src/exec/hdfs-scan-node.cc | 172 +-
be/src/exec/hdfs-scan-node.h | 66 +-
be/src/exec/hdfs-text-scanner.cc | 6 +-
be/src/exec/parquet-column-readers.cc | 108 +-
be/src/exec/parquet-column-readers.h | 88 +-
be/src/exec/scanner-context.cc | 42 +-
be/src/exec/scanner-context.h | 54 +-
be/src/runtime/bufferpool/buffer-pool.h | 1 -
.../bufferpool/reservation-tracker-test.cc | 8 +-
be/src/runtime/bufferpool/reservation-util.cc | 2 +-
be/src/runtime/exec-env.cc | 7 +-
be/src/runtime/io/disk-io-mgr-internal.h | 16 -
be/src/runtime/io/disk-io-mgr-stress-test.cc | 43 +-
be/src/runtime/io/disk-io-mgr-stress.cc | 89 +-
be/src/runtime/io/disk-io-mgr-stress.h | 26 +-
be/src/runtime/io/disk-io-mgr-test.cc | 849 ++++----
be/src/runtime/io/disk-io-mgr.cc | 716 +++++--
be/src/runtime/io/disk-io-mgr.h | 383 ++--
be/src/runtime/io/request-context.cc | 239 +--
be/src/runtime/io/request-context.h | 318 ++-
be/src/runtime/io/request-ranges.h | 196 +-
be/src/runtime/io/scan-range.cc | 309 ++-
be/src/runtime/mem-tracker.h | 1 +
be/src/runtime/test-env.cc | 2 +-
be/src/runtime/tmp-file-mgr-test.cc | 3 +-
be/src/runtime/tmp-file-mgr.cc | 18 +-
be/src/runtime/tmp-file-mgr.h | 17 +-
be/src/util/bit-util-test.cc | 11 -
be/src/util/bit-util.h | 8 +-
be/src/util/impalad-metrics.cc | 13 +-
be/src/util/impalad-metrics.h | 9 +
common/thrift/PlanNodes.thrift | 3 -
.../apache/impala/analysis/SlotDescriptor.java | 19 -
.../org/apache/impala/analysis/SlotRef.java | 20 +
.../org/apache/impala/planner/HdfsScanNode.java | 167 +-
.../java/org/apache/impala/util/BitUtil.java | 6 -
.../org/apache/impala/util/BitUtilTest.java | 6 -
.../queries/PlannerTest/constant-folding.test | 42 +-
.../queries/PlannerTest/disable-codegen.test | 24 +-
.../PlannerTest/fk-pk-join-detection.test | 78 +-
.../queries/PlannerTest/max-row-size.test | 80 +-
.../PlannerTest/min-max-runtime-filters.test | 6 +-
.../queries/PlannerTest/mt-dop-validation.test | 40 +-
.../queries/PlannerTest/parquet-filtering.test | 42 +-
.../queries/PlannerTest/partition-pruning.test | 4 +-
.../PlannerTest/resource-requirements.test | 1814 ++++--------------
.../PlannerTest/sort-expr-materialization.test | 32 +-
.../PlannerTest/spillable-buffer-sizing.test | 192 +-
.../queries/PlannerTest/tablesample.test | 44 +-
.../queries/PlannerTest/union.test | 8 +-
.../admission-reject-min-reservation.test | 12 +-
.../queries/QueryTest/analytic-fns.test | 5 +-
.../queries/QueryTest/codegen-mem-limit.test | 5 +-
.../QueryTest/disk-spill-encryption.test | 2 +-
.../queries/QueryTest/explain-level0.test | 4 +-
.../queries/QueryTest/explain-level1.test | 4 +-
.../queries/QueryTest/explain-level2.test | 14 +-
.../queries/QueryTest/explain-level3.test | 14 +-
.../queries/QueryTest/nested-types-tpch.test | 6 +-
.../queries/QueryTest/runtime_row_filters.test | 11 +-
.../queries/QueryTest/scanners.test | 7 -
.../functional-query/queries/QueryTest/set.test | 2 +-
.../queries/QueryTest/spilling-aggs.test | 19 +-
.../spilling-naaj-no-deny-reservation.test | 7 +-
.../queries/QueryTest/spilling-naaj.test | 8 +-
.../QueryTest/spilling-no-debug-action.test | 66 -
.../QueryTest/spilling-sorts-exhaustive.test | 10 +-
.../queries/QueryTest/spilling.test | 76 +-
.../queries/QueryTest/stats-extrapolation.test | 52 +-
.../tpch/queries/sort-reservation-usage.test | 9 +-
tests/common/test_dimensions.py | 16 +-
tests/custom_cluster/test_scratch_disk.py | 2 +-
tests/query_test/test_mem_usage_scaling.py | 11 +-
tests/query_test/test_query_mem_limit.py | 4 +-
tests/query_test/test_scanners.py | 25 +-
tests/query_test/test_scanners_fuzz.py | 9 +-
tests/query_test/test_sort.py | 16 +-
tests/query_test/test_spilling.py | 5 -
88 files changed, 2801 insertions(+), 4565 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index c1f91d6..aab1383 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -108,4 +108,3 @@ ADD_BE_TEST(parquet-version-test)
ADD_BE_TEST(row-batch-list-test)
ADD_BE_TEST(incr-stats-util-test)
ADD_BE_TEST(hdfs-avro-scanner-test)
-ADD_BE_TEST(hdfs-parquet-scanner-test)
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/base-sequence-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc
index 9d95b0b..9cb6330 100644
--- a/be/src/exec/base-sequence-scanner.cc
+++ b/be/src/exec/base-sequence-scanner.cc
@@ -46,7 +46,6 @@ static const int MIN_SYNC_READ_SIZE = 64 * 1024; // bytes
Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
const vector<HdfsFileDesc*>& files) {
- DCHECK(!files.empty());
// Issue just the header range for each file. When the header is complete,
// we'll issue the splits for that file. Splits cannot be processed until the
// header is parsed (the header object is then shared across splits for that file).
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/base-sequence-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/base-sequence-scanner.h b/be/src/exec/base-sequence-scanner.h
index 3c2326e..887ff6f 100644
--- a/be/src/exec/base-sequence-scanner.h
+++ b/be/src/exec/base-sequence-scanner.h
@@ -47,8 +47,7 @@ class ScannerContext;
/// situation, causing the block to be incorrectly skipped.
class BaseSequenceScanner : public HdfsScanner {
public:
- /// Issue the initial ranges for all sequence container files. 'files' must not be
- /// empty.
+ /// Issue the initial ranges for all sequence container files.
static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
const std::vector<HdfsFileDesc*>& files)
WARN_UNUSED_RESULT;
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-lzo-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-lzo-text-scanner.cc b/be/src/exec/hdfs-lzo-text-scanner.cc
index 8af89f2..88ae295 100644
--- a/be/src/exec/hdfs-lzo-text-scanner.cc
+++ b/be/src/exec/hdfs-lzo-text-scanner.cc
@@ -62,7 +62,6 @@ HdfsScanner* HdfsLzoTextScanner::GetHdfsLzoTextScanner(
Status HdfsLzoTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
const vector<HdfsFileDesc*>& files) {
- DCHECK(!files.empty());
if (LzoIssueInitialRanges == NULL) {
lock_guard<SpinLock> l(lzo_load_lock_);
if (library_load_status_.ok()) {
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-parquet-scanner-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner-test.cc b/be/src/exec/hdfs-parquet-scanner-test.cc
deleted file mode 100644
index cbc6e76..0000000
--- a/be/src/exec/hdfs-parquet-scanner-test.cc
+++ /dev/null
@@ -1,96 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/hdfs-parquet-scanner.h"
-#include "testutil/gtest-util.h"
-
-#include "common/names.h"
-
-static const int64_t MIN_BUFFER_SIZE = 64 * 1024;
-static const int64_t MAX_BUFFER_SIZE = 8 * 1024 * 1024;
-
-namespace impala {
-
-class HdfsParquetScannerTest : public testing::Test {
- protected:
- void TestDivideReservation(const vector<int64_t>& col_range_lengths,
- int64_t total_col_reservation, const vector<int64_t>& expected_reservations);
-};
-
-/// Test that DivideReservationBetweenColumns() returns 'expected_reservations' for
-/// inputs 'col_range_lengths' and 'total_col_reservation'.
-void HdfsParquetScannerTest::TestDivideReservation(const vector<int64_t>& col_range_lengths,
- int64_t total_col_reservation, const vector<int64_t>& expected_reservations) {
- vector<pair<int, int64_t>> reservations =
- HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
- MIN_BUFFER_SIZE, MAX_BUFFER_SIZE, col_range_lengths, total_col_reservation);
- for (int i = 0; i < reservations.size(); ++i) {
- LOG(INFO) << i << " " << reservations[i].first << " " << reservations[i].second;
- }
- EXPECT_EQ(reservations.size(), expected_reservations.size());
- vector<bool> present(expected_reservations.size(), false);
- for (auto& reservation: reservations) {
- // Ensure that each appears exactly once.
- EXPECT_FALSE(present[reservation.first]);
- present[reservation.first] = true;
- EXPECT_EQ(expected_reservations[reservation.first], reservation.second)
- << reservation.first;
- }
-}
-
-TEST_F(HdfsParquetScannerTest, DivideReservation) {
- // Test a long scan ranges with lots of memory - should allocate 3 max-size
- // buffers per range.
- TestDivideReservation({100 * 1024 * 1024}, 50 * 1024 * 1024, {3 * MAX_BUFFER_SIZE});
- TestDivideReservation({100 * 1024 * 1024, 50 * 1024 * 1024}, 100 * 1024 * 1024,
- {3 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
-
- // Long scan ranges, not enough memory for 3 buffers each. Should only allocate
- // max-sized buffers, preferring the longer scan range.
- TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 5 * MAX_BUFFER_SIZE,
- {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
- TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024},
- 5 * MAX_BUFFER_SIZE + MIN_BUFFER_SIZE,
- {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
- TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 6 * MAX_BUFFER_SIZE - 1,
- {2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
-
- // Test a short range with lots of memory - should round up buffer size.
- TestDivideReservation({100 * 1024}, 50 * 1024 * 1024, {128 * 1024});
-
- // Test a range << MIN_BUFFER_SIZE - should round up to buffer size.
- TestDivideReservation({13}, 50 * 1024 * 1024, {MIN_BUFFER_SIZE});
-
- // Test long ranges with limited memory.
- TestDivideReservation({100 * 1024 * 1024}, 100 * 1024, {MIN_BUFFER_SIZE});
- TestDivideReservation({100 * 1024 * 1024}, MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE});
- TestDivideReservation({100 * 1024 * 1024}, 2 * MIN_BUFFER_SIZE, {2 * MIN_BUFFER_SIZE});
- TestDivideReservation({100 * 1024 * 1024}, MAX_BUFFER_SIZE - 1, {MAX_BUFFER_SIZE / 2});
- TestDivideReservation({100 * 1024 * 1024, 1024 * 1024, MIN_BUFFER_SIZE},
- 3 * MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE, MIN_BUFFER_SIZE, MIN_BUFFER_SIZE});
-
- // Test a mix of scan range lengths larger than and smaller than the max I/O buffer
- // size. Long ranges get allocated most memory.
- TestDivideReservation(
- {15145047, 5019635, 5019263, 15145047, 15145047, 5019635, 5019263, 317304},
- 25165824,
- {8388608, 2097152, 524288, 8388608, 4194304, 1048576, 262144, 262144});
-}
-
-}
-
-IMPALA_TEST_MAIN();
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index 574ddb0..e279369 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -17,7 +17,6 @@
#include "exec/hdfs-parquet-scanner.h"
-#include <algorithm>
#include <queue>
#include <gutil/strings/substitute.h>
@@ -28,7 +27,6 @@
#include "exec/parquet-column-stats.h"
#include "exec/scanner-context.inline.h"
#include "runtime/collection-value-builder.h"
-#include "runtime/exec-env.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/runtime-state.h"
#include "runtime/runtime-filter.inline.h"
@@ -37,7 +35,6 @@
#include "common/names.h"
using std::move;
-using std::sort;
using namespace impala;
using namespace impala::io;
@@ -50,6 +47,10 @@ constexpr int BATCHES_PER_FILTER_SELECTIVITY_CHECK = 16;
static_assert(BitUtil::IsPowerOf2(BATCHES_PER_FILTER_SELECTIVITY_CHECK),
"BATCHES_PER_FILTER_SELECTIVITY_CHECK must be a power of two");
+// Max dictionary page header size in bytes. This is an estimate and only needs to be an
+// upper bound.
+const int MAX_DICT_HEADER_SIZE = 100;
+
// Max entries in the dictionary before switching to PLAIN encoding. If a dictionary
// has fewer entries, then the entire column is dictionary encoded. This threshold
// is guaranteed to be true for Impala versions 2.9 or below.
@@ -68,7 +69,6 @@ const string PARQUET_MEM_LIMIT_EXCEEDED =
Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
const vector<HdfsFileDesc*>& files) {
- DCHECK(!files.empty());
vector<ScanRange*> footer_ranges;
for (int i = 0; i < files.size(); ++i) {
// If the file size is less than 12 bytes, it is an invalid Parquet file.
@@ -98,7 +98,7 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
static_cast<ScanRangeMetadata*>(split->meta_data());
// Each split is processed by first issuing a scan range for the file footer, which
// is done here, followed by scan ranges for the columns of each row group within
- // the actual split (see InitScalarColumns()). The original split is stored in the
+ // the actual split (in InitColumns()). The original split is stored in the
// metadata associated with the footer range.
ScanRange* footer_range;
if (footer_split != nullptr) {
@@ -121,13 +121,9 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
}
}
}
- // We may not have any scan ranges if this scan node does not have the footer split for
- // any Parquet file.
- if (footer_ranges.size() > 0) {
- // The threads that process the footer will also do the scan, so we mark all the files
- // as complete here.
- RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
- }
+ // The threads that process the footer will also do the scan, so we mark all the files
+ // as complete here.
+ RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, files.size()));
return Status::OK();
}
@@ -232,14 +228,9 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
// Release I/O buffers immediately to make sure they are cleaned up
// in case we return a non-OK status anywhere below.
- int64_t stream_reservation = stream_->reservation();
- stream_ = nullptr;
context_->ReleaseCompletedResources(true);
context_->ClearStreams();
RETURN_IF_ERROR(footer_status);
- // The scanner-wide stream was used only to read the file footer. Each column has added
- // its own stream. We can use the reservation from 'stream_' for the columns now.
- total_col_reservation_ = stream_reservation;
// Parse the file schema into an internal representation for schema resolution.
schema_resolver_.reset(new ParquetSchemaResolver(*scan_node_->hdfs_table(),
@@ -256,6 +247,10 @@ Status HdfsParquetScanner::Open(ScannerContext* context) {
template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
RETURN_IF_ERROR(InitDictFilterStructures());
+
+ // The scanner-wide stream was used only to read the file footer. Each column has added
+ // its own stream.
+ stream_ = nullptr;
return Status::OK();
}
@@ -679,13 +674,15 @@ Status HdfsParquetScanner::NextRowGroup() {
}
InitCollectionColumns();
- RETURN_IF_ERROR(InitScalarColumns());
- // Start scanning dictionary filtering column readers, so we can read the dictionary
- // pages in EvalDictionaryFilters().
- RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(dict_filterable_readers_));
+ // Prepare dictionary filtering columns for first read
+ // This must be done before dictionary filtering, because this code initializes
+ // the column offsets and streams needed to read the dictionaries.
+ // TODO: Restructure the code so that the dictionary can be read without the rest
+ // of the column.
+ RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, dict_filterable_readers_));
- // StartScans() may have allocated resources to scan columns. If we skip this row
+ // InitColumns() may have allocated resources to scan columns. If we skip this row
// group below, we must call ReleaseSkippedRowGroupResources() before continuing.
// If there is a dictionary-encoded column where every value is eliminated
@@ -706,10 +703,10 @@ Status HdfsParquetScanner::NextRowGroup() {
}
// At this point, the row group has passed any filtering criteria
- // Start scanning non-dictionary filtering column readers and initialize their
- // dictionaries.
- RETURN_IF_ERROR(BaseScalarColumnReader::StartScans(non_dict_filterable_readers_));
- status = BaseScalarColumnReader::InitDictionaries(non_dict_filterable_readers_);
+ // Prepare non-dictionary filtering column readers for first read and
+ // initialize their dictionaries.
+ RETURN_IF_ERROR(InitScalarColumns(row_group_idx_, non_dict_filterable_readers_));
+ status = InitDictionaries(non_dict_filterable_readers_);
if (!status.ok()) {
// Either return an error or skip this row group if it is ok to ignore errors
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
@@ -744,6 +741,7 @@ Status HdfsParquetScanner::NextRowGroup() {
break;
}
}
+
DCHECK(parse_status_.ok());
return Status::OK();
}
@@ -801,7 +799,6 @@ void HdfsParquetScanner::PartitionReaders(
} else {
BaseScalarColumnReader* scalar_reader =
static_cast<BaseScalarColumnReader*>(reader);
- scalar_readers_.push_back(scalar_reader);
if (can_eval_dict_filters && IsDictFilterable(scalar_reader)) {
dict_filterable_readers_.push_back(scalar_reader);
} else {
@@ -993,7 +990,7 @@ Status HdfsParquetScanner::EvalDictionaryFilters(const parquet::RowGroup& row_gr
// Any columns that were not 100% dictionary encoded need to initialize
// their dictionaries here.
- RETURN_IF_ERROR(BaseScalarColumnReader::InitDictionaries(deferred_dict_init_list));
+ RETURN_IF_ERROR(InitDictionaries(deferred_dict_init_list));
return Status::OK();
}
@@ -1442,15 +1439,12 @@ Status HdfsParquetScanner::ProcessFooter() {
BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size));
unique_ptr<BufferDescriptor> io_buffer;
- bool needs_buffers;
- RETURN_IF_ERROR(io_mgr->StartScanRange(
- scan_node_->reader_context(), metadata_range, &needs_buffers));
- DCHECK(!needs_buffers) << "Already provided a buffer";
- RETURN_IF_ERROR(metadata_range->GetNext(&io_buffer));
+ RETURN_IF_ERROR(
+ io_mgr->Read(scan_node_->reader_context(), metadata_range, &io_buffer));
DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
DCHECK_EQ(io_buffer->len(), metadata_size);
DCHECK(io_buffer->eosr());
- metadata_range->ReturnBuffer(move(io_buffer));
+ io_mgr->ReturnBuffer(move(io_buffer));
}
// Deserialize file header
@@ -1650,16 +1644,23 @@ void HdfsParquetScanner::InitCollectionColumns() {
}
}
-Status HdfsParquetScanner::InitScalarColumns() {
+Status HdfsParquetScanner::InitScalarColumns(
+ int row_group_idx, const vector<BaseScalarColumnReader*>& column_readers) {
int64_t partition_id = context_->partition_descriptor()->id();
const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
DCHECK(file_desc != nullptr);
- parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
+ parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx];
+ // All the scan ranges (one for each column).
+ vector<ScanRange*> col_ranges;
// Used to validate that the number of values in each reader in column_readers_ at the
// same SchemaElement is the same.
unordered_map<const parquet::SchemaElement*, int> num_values_map;
- for (BaseScalarColumnReader* scalar_reader : scalar_readers_) {
+ // Used to validate we issued the right number of scan ranges
+ int num_scalar_readers = 0;
+
+ for (BaseScalarColumnReader* scalar_reader: column_readers) {
+ ++num_scalar_readers;
const parquet::ColumnChunk& col_chunk = row_group.columns[scalar_reader->col_idx()];
auto num_values_it = num_values_map.find(&scalar_reader->schema_element());
int num_values = -1;
@@ -1668,115 +1669,78 @@ Status HdfsParquetScanner::InitScalarColumns() {
} else {
num_values_map[&scalar_reader->schema_element()] = col_chunk.meta_data.num_values;
}
+ int64_t col_start = col_chunk.meta_data.data_page_offset;
+
if (num_values != -1 && col_chunk.meta_data.num_values != num_values) {
// TODO: improve this error message by saying which columns are different,
// and also specify column in other error messages as appropriate
return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(),
col_chunk.meta_data.num_values, num_values, filename());
}
- RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_));
- }
- RETURN_IF_ERROR(
- DivideReservationBetweenColumns(scalar_readers_, total_col_reservation_));
- return Status::OK();
-}
-Status HdfsParquetScanner::DivideReservationBetweenColumns(
- const vector<BaseScalarColumnReader*>& column_readers,
- int64_t reservation_to_distribute) {
- DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
- const int64_t min_buffer_size = io_mgr->min_buffer_size();
- const int64_t max_buffer_size = io_mgr->max_buffer_size();
- // The HdfsScanNode reservation calculation in the planner ensures that we have
- // reservation for at least one buffer per column.
- if (reservation_to_distribute < min_buffer_size * column_readers.size()) {
- return Status(TErrorCode::INTERNAL_ERROR,
- Substitute("Not enough reservation in Parquet scanner for file '$0'. Need at "
- "least $1 bytes per column for $2 columns but had $3 bytes",
- filename(), min_buffer_size, column_readers.size(),
- reservation_to_distribute));
- }
+ RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(file_metadata_,
+ filename(), row_group_idx, scalar_reader->col_idx(),
+ scalar_reader->schema_element(), state_));
- vector<int64_t> col_range_lengths(column_readers.size());
- for (int i = 0; i < column_readers.size(); ++i) {
- col_range_lengths[i] = column_readers[i]->scan_range()->len();
- }
- vector<pair<int, int64_t>> tmp_reservations = DivideReservationBetweenColumnsHelper(
- min_buffer_size, max_buffer_size, col_range_lengths, reservation_to_distribute);
- for (auto& tmp_reservation : tmp_reservations) {
- column_readers[tmp_reservation.first]->set_io_reservation(tmp_reservation.second);
- }
- return Status::OK();
-}
+ if (col_chunk.meta_data.__isset.dictionary_page_offset) {
+ // Already validated in ValidateColumnOffsets()
+ DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
+ col_start = col_chunk.meta_data.dictionary_page_offset;
+ }
+ int64_t col_len = col_chunk.meta_data.total_compressed_size;
+ if (col_len <= 0) {
+ return Status(Substitute("File '$0' contains invalid column chunk size: $1",
+ filename(), col_len));
+ }
+ int64_t col_end = col_start + col_len;
+
+ // Already validated in ValidateColumnOffsets()
+ DCHECK_GT(col_end, 0);
+ DCHECK_LT(col_end, file_desc->file_length);
+ if (file_version_.application == "parquet-mr" && file_version_.VersionLt(1, 2, 9)) {
+ // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
+ // dictionary page header size in total_compressed_size and total_uncompressed_size
+ // (see IMPALA-694). We pad col_len to compensate.
+ int64_t bytes_remaining = file_desc->file_length - col_end;
+ int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
+ col_len += pad;
+ }
-vector<pair<int, int64_t>> HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
- int64_t min_buffer_size, int64_t max_buffer_size,
- const vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute) {
- // Pair of (column index, reservation allocated).
- vector<pair<int, int64_t>> tmp_reservations;
- for (int i = 0; i < col_range_lengths.size(); ++i) tmp_reservations.emplace_back(i, 0);
-
- // Sort in descending order of length, breaking ties by index so that larger columns
- // get allocated reservation first. It is common to have dramatically different column
- // sizes in a single file because of different value sizes and compressibility. E.g.
- // consider a large STRING "comment" field versus a highly compressible
- // dictionary-encoded column with only a few distinct values. We want to give max-sized
- // buffers to large columns first to maximize the size of I/Os that we do while reading
- // this row group.
- sort(tmp_reservations.begin(), tmp_reservations.end(),
- [&col_range_lengths](const pair<int, int64_t>& left, const pair<int, int64_t>& right) {
- int64_t left_len = col_range_lengths[left.first];
- int64_t right_len = col_range_lengths[right.first];
- return left_len != right_len ? left_len > right_len : left.first < right.first;
- });
-
- // Set aside the minimum reservation per column.
- reservation_to_distribute -= min_buffer_size * col_range_lengths.size();
-
- // Allocate reservations to columns by repeatedly allocating either a max-sized buffer
- // or a large enough buffer to fit the remaining data for each column. Do this
- // round-robin up to the ideal number of I/O buffers.
- for (int i = 0; i < DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; ++i) {
- for (auto& tmp_reservation : tmp_reservations) {
- // Add back the reservation we set aside above.
- if (i == 0) reservation_to_distribute += min_buffer_size;
-
- int64_t bytes_left_in_range =
- col_range_lengths[tmp_reservation.first] - tmp_reservation.second;
- int64_t bytes_to_add;
- if (bytes_left_in_range >= max_buffer_size) {
- if (reservation_to_distribute >= max_buffer_size) {
- bytes_to_add = max_buffer_size;
- } else if (i == 0) {
- DCHECK_EQ(0, tmp_reservation.second);
- // Ensure this range gets at least one buffer on the first iteration.
- bytes_to_add = BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute);
- } else {
- DCHECK_GT(tmp_reservation.second, 0);
- // We need to read more than the max buffer size, but can't allocate a
- // max-sized buffer. Stop adding buffers to this column: we prefer to use
- // the existing max-sized buffers without small buffers mixed in so that
- // we will alway do max-sized I/Os, which make efficient use of I/O devices.
- bytes_to_add = 0;
- }
- } else if (bytes_left_in_range > 0 &&
- reservation_to_distribute >= min_buffer_size) {
- // Choose a buffer size that will fit the rest of the bytes left in the range.
- bytes_to_add = max(min_buffer_size, BitUtil::RoundUpToPowerOfTwo(bytes_left_in_range));
- // But don't add more reservation than is available.
- bytes_to_add =
- min(bytes_to_add, BitUtil::RoundDownToPowerOfTwo(reservation_to_distribute));
- } else {
- bytes_to_add = 0;
- }
- DCHECK(bytes_to_add == 0 || bytes_to_add >= min_buffer_size) << bytes_to_add;
- reservation_to_distribute -= bytes_to_add;
- tmp_reservation.second += bytes_to_add;
- DCHECK_GE(reservation_to_distribute, 0);
- DCHECK_GT(tmp_reservation.second, 0);
+ // TODO: this will need to change when we have co-located files and the columns
+ // are different files.
+ if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
+ return Status(Substitute("Expected parquet column file path '$0' to match "
+ "filename '$1'", col_chunk.file_path, filename()));
}
+
+ const ScanRange* split_range =
+ static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split;
+
+ // Determine if the column is completely contained within a local split.
+ bool col_range_local = split_range->expected_local()
+ && col_start >= split_range->offset()
+ && col_end <= split_range->offset() + split_range->len();
+ ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(),
+ filename(), col_len, col_start, partition_id, split_range->disk_id(),
+ col_range_local,
+ BufferOpts(split_range->try_cache(), file_desc->mtime));
+ col_ranges.push_back(col_range);
+
+ // Get the stream that will be used for this column
+ ScannerContext::Stream* stream = context_->AddStream(col_range);
+ DCHECK(stream != nullptr);
+
+ RETURN_IF_ERROR(scalar_reader->Reset(&col_chunk.meta_data, stream));
}
- return tmp_reservations;
+ DCHECK_EQ(col_ranges.size(), num_scalar_readers);
+
+ // Issue all the column chunks to the io mgr and have them scheduled immediately.
+ // This means these ranges aren't returned via DiskIoMgr::GetNextRange and
+ // instead are scheduled to be read immediately.
+ RETURN_IF_ERROR(scan_node_->runtime_state()->io_mgr()->AddScanRanges(
+ scan_node_->reader_context(), col_ranges, true));
+
+ return Status::OK();
}
Status HdfsParquetScanner::InitDictionaries(
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index 92f2550..f0043b5 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -34,7 +34,7 @@ class CollectionValueBuilder;
struct HdfsFileDesc;
/// Internal schema representation and resolution.
-struct SchemaNode;
+class SchemaNode;
/// Class that implements Parquet definition and repetition level decoding.
class ParquetLevelDecoder;
@@ -69,8 +69,8 @@ class BoolColumnReader;
/// the split size, the mid point guarantees that we have at least 50% of the row group in
/// the current split. ProcessSplit() then computes the column ranges for these row groups
/// and submits them to the IoMgr for immediate scheduling (so they don't surface in
-/// DiskIoMgr::GetNextUnstartedRange()). Scheduling them immediately also guarantees they
-/// are all read at once.
+/// DiskIoMgr::GetNextRange()). Scheduling them immediately also guarantees they are all
+/// read at once.
///
/// Like the other scanners, each parquet scanner object is one to one with a
/// ScannerContext. Unlike the other scanners though, the context will have multiple
@@ -328,7 +328,7 @@ class HdfsParquetScanner : public HdfsScanner {
virtual ~HdfsParquetScanner() {}
/// Issue just the footer range for each file. We'll then parse the footer and pick
- /// out the columns we want. 'files' must not be empty.
+ /// out the columns we want.
static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
const std::vector<HdfsFileDesc*>& files)
WARN_UNUSED_RESULT;
@@ -361,7 +361,6 @@ class HdfsParquetScanner : public HdfsScanner {
template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
friend class ScalarColumnReader;
friend class BoolColumnReader;
- friend class HdfsParquetScannerTest;
/// Size of the file footer. This is a guess. If this value is too little, we will
/// need to issue another read.
@@ -430,7 +429,7 @@ class HdfsParquetScanner : public HdfsScanner {
/// Number of scratch batches processed so far.
int64_t row_batches_produced_;
- /// Column reader for each top-level materialized slot in the output tuple.
+ /// Column reader for each materialized columns for this file.
std::vector<ParquetColumnReader*> column_readers_;
/// Column readers will write slot values into this scratch batch for
@@ -446,9 +445,6 @@ class HdfsParquetScanner : public HdfsScanner {
/// Scan range for the metadata.
const io::ScanRange* metadata_range_;
- /// Reservation available for scanning columns, in bytes.
- int64_t total_col_reservation_ = 0;
-
/// Pool to copy dictionary page buffer into. This pool is shared across all the
/// pages in a column chunk.
boost::scoped_ptr<MemPool> dictionary_pool_;
@@ -465,9 +461,6 @@ class HdfsParquetScanner : public HdfsScanner {
/// or nested within a collection.
std::vector<BaseScalarColumnReader*> non_dict_filterable_readers_;
- /// Flattened list of all scalar column readers in column_readers_.
- std::vector<BaseScalarColumnReader*> scalar_readers_;
-
/// Flattened collection column readers that point to readers in column_readers_.
std::vector<CollectionColumnReader*> collection_readers_;
@@ -634,24 +627,12 @@ class HdfsParquetScanner : public HdfsScanner {
WARN_UNUSED_RESULT;
/// Walks file_metadata_ and initiates reading the materialized columns. This
- /// initializes 'scalar_readers_' and divides reservation between the columns but
- /// does not start any scan ranges.
- Status InitScalarColumns() WARN_UNUSED_RESULT;
-
- /// Decides how to divide 'reservation_to_distribute' bytes of reservation between the
- /// columns. Sets the reservation on each corresponding reader in 'column_readers'.
- Status DivideReservationBetweenColumns(
- const std::vector<BaseScalarColumnReader*>& column_readers,
- int64_t reservation_to_distribute);
-
- /// Helper for DivideReservationBetweenColumns. Implements the core algorithm for
- /// dividing a reservation of 'reservation_to_distribute' bytes between columns with
- /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns
- /// a vector with an entry per column with the index into 'col_range_lengths' and the
- /// amount of reservation in bytes to give to that column.
- static std::vector<std::pair<int, int64_t>> DivideReservationBetweenColumnsHelper(
- int64_t min_buffer_size, int64_t max_buffer_size,
- const std::vector<int64_t>& col_range_lengths, int64_t reservation_to_distribute);
+ /// initializes 'column_readers' and issues the reads for the columns. 'column_readers'
+ /// includes a mix of scalar readers from multiple schema nodes (i.e., readers of
+ /// top-level scalar columns and readers of scalar columns within a collection node).
+ Status InitScalarColumns(
+ int row_group_idx, const std::vector<BaseScalarColumnReader*>& column_readers)
+ WARN_UNUSED_RESULT;
/// Initializes the column readers in collection_readers_.
void InitCollectionColumns();
@@ -687,8 +668,7 @@ class HdfsParquetScanner : public HdfsScanner {
/// Partitions the readers into scalar and collection readers. The collection readers
/// are flattened into collection_readers_. The scalar readers are partitioned into
/// dict_filterable_readers_ and non_dict_filterable_readers_ depending on whether
- /// dictionary filtering is enabled and the reader can be dictionary filtered. All
- /// scalar readers are also flattened into scalar_readers_.
+ /// dictionary filtering is enabled and the reader can be dictionary filtered.
void PartitionReaders(const vector<ParquetColumnReader*>& readers,
bool can_eval_dict_filters);
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 5625389..98c6e14 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -35,7 +35,6 @@
#include "exprs/scalar-expr-evaluator.h"
#include "exprs/scalar-expr.h"
#include "runtime/descriptors.h"
-#include "runtime/exec-env.h"
#include "runtime/hdfs-fs-cache.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/io/request-context.h"
@@ -65,7 +64,6 @@ const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024;
HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ScanNode(pool, tnode, descs),
- ideal_scan_range_reservation_(tnode.hdfs_scan_node.ideal_scan_range_reservation),
min_max_tuple_id_(tnode.hdfs_scan_node.__isset.min_max_tuple_id ?
tnode.hdfs_scan_node.min_max_tuple_id : -1),
skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ?
@@ -82,7 +80,6 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode,
&tnode.hdfs_scan_node.dictionary_filter_conjuncts : nullptr),
disks_accessed_bitmap_(TUnit::UNIT, 0),
active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {
- DCHECK_GE(ideal_scan_range_reservation_, resource_profile_.min_reservation);
}
HdfsScanNodeBase::~HdfsScanNodeBase() {
@@ -112,6 +109,7 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ScalarExpr::Create(tnode.hdfs_scan_node.min_max_conjuncts,
*min_max_row_desc, state, &min_max_conjuncts_));
}
+
return Status::OK();
}
@@ -242,16 +240,6 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) {
ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size());
ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id);
- // Check if reservation was enough to allocate at least one buffer. The
- // reservation calculation in HdfsScanNode.java should guarantee this.
- // Hitting this error indicates a misconfiguration or bug.
- int64_t min_buffer_size = ExecEnv::GetInstance()->disk_io_mgr()->min_buffer_size();
- if (scan_range_params_->size() > 0
- && resource_profile_.min_reservation < min_buffer_size) {
- return Status(TErrorCode::INTERNAL_ERROR,
- Substitute("HDFS scan min reservation $0 must be >= min buffer size $1",
- resource_profile_.min_reservation, min_buffer_size));
- }
// Add per volume stats to the runtime profile
PerVolumeStats per_volume_stats;
stringstream str;
@@ -338,18 +326,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
partition_desc->partition_key_value_evals(), scan_node_pool_.get(), state);
}
- RETURN_IF_ERROR(ClaimBufferReservation(state));
- // We got the minimum reservation. Now try to get ideal reservation.
- if (resource_profile_.min_reservation != ideal_scan_range_reservation_) {
- bool increased = buffer_pool_client_.IncreaseReservation(
- ideal_scan_range_reservation_ - resource_profile_.min_reservation);
- VLOG_FILE << "Increasing reservation from minimum "
- << resource_profile_.min_reservation << "B to ideal "
- << ideal_scan_range_reservation_ << "B "
- << (increased ? "succeeded" : "failed");
- }
-
- reader_context_ = runtime_state_->io_mgr()->RegisterContext();
+ reader_context_ = runtime_state_->io_mgr()->RegisterContext(mem_tracker());
// Initialize HdfsScanNode specific counters
// TODO: Revisit counters and move the counters specific to multi-threaded scans
@@ -370,11 +347,14 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
num_scanner_threads_started_counter_ =
ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
- reader_context_->set_bytes_read_counter(bytes_read_counter());
- reader_context_->set_read_timer(read_timer());
- reader_context_->set_open_file_timer(open_file_timer());
- reader_context_->set_active_read_thread_counter(&active_hdfs_read_thread_counter_);
- reader_context_->set_disks_accessed_bitmap(&disks_accessed_bitmap_);
+ runtime_state_->io_mgr()->set_bytes_read_counter(
+ reader_context_.get(), bytes_read_counter());
+ runtime_state_->io_mgr()->set_read_timer(reader_context_.get(), read_timer());
+ runtime_state_->io_mgr()->set_open_file_timer(reader_context_.get(), open_file_timer());
+ runtime_state_->io_mgr()->set_active_read_thread_counter(
+ reader_context_.get(), &active_hdfs_read_thread_counter_);
+ runtime_state_->io_mgr()->set_disks_access_bitmap(
+ reader_context_.get(), &disks_accessed_bitmap_);
average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_);
@@ -468,27 +448,18 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
}
}
- // Issue initial ranges for all file types. Only call functions for file types that
- // actually exist - trying to add empty lists of ranges can result in spurious
- // CANCELLED errors - see IMPALA-6564.
- for (const auto& entry : matching_per_type_files) {
- if (entry.second.empty()) continue;
- switch (entry.first) {
- case THdfsFileFormat::PARQUET:
- RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this, entry.second));
- break;
- case THdfsFileFormat::TEXT:
- RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this, entry.second));
- break;
- case THdfsFileFormat::SEQUENCE_FILE:
- case THdfsFileFormat::RC_FILE:
- case THdfsFileFormat::AVRO:
- RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, entry.second));
- break;
- default:
- DCHECK(false) << "Unexpected file type " << entry.first;
- }
- }
+ // Issue initial ranges for all file types.
+ RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this,
+ matching_per_type_files[THdfsFileFormat::PARQUET]));
+ RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this,
+ matching_per_type_files[THdfsFileFormat::TEXT]));
+ RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
+ matching_per_type_files[THdfsFileFormat::SEQUENCE_FILE]));
+ RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
+ matching_per_type_files[THdfsFileFormat::RC_FILE]));
+ RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this,
+ matching_per_type_files[THdfsFileFormat::AVRO]));
+
return Status::OK();
}
@@ -547,8 +518,6 @@ ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file,
Status HdfsScanNodeBase::AddDiskIoRanges(
const vector<ScanRange*>& ranges, int num_files_queued) {
- DCHECK(!progress_.done()) << "Don't call AddScanRanges() after all ranges finished.";
- DCHECK_GT(ranges.size(), 0);
RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges));
num_unqueued_files_.Add(-num_files_queued);
DCHECK_GE(num_unqueued_files_.Load(), 0);
@@ -851,14 +820,20 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
if (reader_context_ != nullptr) {
- bytes_read_local_->Set(reader_context_->bytes_read_local());
- bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit());
- bytes_read_dn_cache_->Set(reader_context_->bytes_read_dn_cache());
- num_remote_ranges_->Set(reader_context_->num_remote_ranges());
- unexpected_remote_bytes_->Set(reader_context_->unexpected_remote_bytes());
- cached_file_handles_hit_count_->Set(reader_context_->cached_file_handles_hit_count());
+ bytes_read_local_->Set(
+ runtime_state_->io_mgr()->bytes_read_local(reader_context_.get()));
+ bytes_read_short_circuit_->Set(
+ runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_.get()));
+ bytes_read_dn_cache_->Set(
+ runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_.get()));
+ num_remote_ranges_->Set(static_cast<int64_t>(
+ runtime_state_->io_mgr()->num_remote_ranges(reader_context_.get())));
+ unexpected_remote_bytes_->Set(
+ runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_.get()));
+ cached_file_handles_hit_count_->Set(
+ runtime_state_->io_mgr()->cached_file_handles_hit_count(reader_context_.get()));
cached_file_handles_miss_count_->Set(
- reader_context_->cached_file_handles_miss_count());
+ runtime_state_->io_mgr()->cached_file_handles_miss_count(reader_context_.get()));
if (unexpected_remote_bytes_->value() >= UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) {
runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute(
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 3a9c37f..70fbac2 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -322,9 +322,6 @@ class HdfsScanNodeBase : public ScanNode {
friend class ScannerContext;
friend class HdfsScanner;
- /// Ideal reservation to process each input split, computed by the planner.
- const int64_t ideal_scan_range_reservation_;
-
/// Tuple id of the tuple used to evaluate conjuncts on parquet::Statistics.
const int min_max_tuple_id_;
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index f4948d9..7ea4d80 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -26,7 +26,6 @@
#include "gen-cpp/PlanNodes_types.h"
-using namespace impala::io;
using std::stringstream;
namespace impala {
@@ -77,27 +76,20 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
scanner_->Close(row_batch);
scanner_.reset();
}
- DiskIoMgr* io_mgr = runtime_state_->io_mgr();
- bool needs_buffers;
- RETURN_IF_ERROR(io_mgr->GetNextUnstartedRange(
- reader_context_.get(), &scan_range_, &needs_buffers));
- if (scan_range_ == nullptr) {
+ RETURN_IF_ERROR(
+ runtime_state_->io_mgr()->GetNextRange(reader_context_.get(), &scan_range_));
+ if (scan_range_ == NULL) {
*eos = true;
StopAndFinalizeCounters();
return Status::OK();
}
- int64_t scanner_reservation = buffer_pool_client_.GetReservation();
- if (needs_buffers) {
- RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(reader_context_.get(),
- &buffer_pool_client_, scan_range_, scanner_reservation));
- }
ScanRangeMetadata* metadata =
static_cast<ScanRangeMetadata*>(scan_range_->meta_data());
int64_t partition_id = metadata->partition_id;
HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id);
- scanner_ctx_.reset(new ScannerContext(runtime_state_, this, &buffer_pool_client_,
- partition, filter_ctxs(), expr_results_pool()));
- scanner_ctx_->AddStream(scan_range_, scanner_reservation);
+ scanner_ctx_.reset(new ScannerContext(
+ runtime_state_, this, partition, scan_range_, filter_ctxs(),
+ expr_results_pool()));
Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_);
if (!status.ok()) {
DCHECK(scanner_ == NULL);
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index a95e47a..710a8af 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -26,7 +26,6 @@
#include "exec/scanner-context.h"
#include "runtime/descriptors.h"
#include "runtime/fragment-instance-state.h"
-#include "runtime/io/request-context.h"
#include "runtime/runtime-filter.inline.h"
#include "runtime/runtime-state.h"
#include "runtime/mem-tracker.h"
@@ -46,6 +45,16 @@ DECLARE_bool(skip_file_runtime_filtering);
using namespace impala;
using namespace impala::io;
+// Amount of memory that we approximate a scanner thread will use not including IoBuffers.
+// The memory used does not vary considerably between file formats (just a couple of MBs).
+// This value is conservative and taken from running against the tpch lineitem table.
+// TODO: revisit how we do this.
+const int SCANNER_THREAD_MEM_USAGE = 32 * 1024 * 1024;
+
+// Estimated upper bound on the compression ratio of compressed text files. Used to
+// estimate scanner thread memory usage.
+const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11;
+
// Amount of time to block waiting for GetNext() to release scanner threads between
// checking if a scanner thread should yield itself back to the global thread pool.
const int SCANNER_THREAD_WAIT_TIME_MS = 20;
@@ -53,6 +62,11 @@ const int SCANNER_THREAD_WAIT_TIME_MS = 20;
HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: HdfsScanNodeBase(pool, tnode, descs),
+ ranges_issued_barrier_(1),
+ scanner_thread_bytes_required_(0),
+ done_(false),
+ all_ranges_started_(false),
+ thread_avail_cb_id_(-1),
max_num_scanner_threads_(CpuInfo::num_cores()) {
}
@@ -153,6 +167,36 @@ Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) {
Status HdfsScanNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
+
+ // Compute the minimum bytes required to start a new thread. This is based on the
+ // file format.
+ // The higher the estimate, the less likely it is the query will fail but more likely
+ // the query will be throttled when it does not need to be.
+ // TODO: how many buffers should we estimate per range. The IoMgr will throttle down to
+ // one but if there are already buffers queued before memory pressure was hit, we can't
+ // reclaim that memory.
+ if (per_type_files_[THdfsFileFormat::PARQUET].size() > 0) {
+ // Parquet files require buffers per column
+ scanner_thread_bytes_required_ =
+ materialized_slots_.size() * 3 * runtime_state_->io_mgr()->max_read_buffer_size();
+ } else {
+ scanner_thread_bytes_required_ =
+ 3 * runtime_state_->io_mgr()->max_read_buffer_size();
+ }
+ // scanner_thread_bytes_required_ now contains the IoBuffer requirement.
+ // Next we add in the other memory the scanner thread will use.
+ // e.g. decompression buffers, tuple buffers, etc.
+ // For compressed text, we estimate this based on the file size (since the whole file
+ // will need to be decompressed at once). For all other formats, we use a constant.
+ // TODO: can we do something better?
+ int64_t scanner_thread_mem_usage = SCANNER_THREAD_MEM_USAGE;
+ for (HdfsFileDesc* file: per_type_files_[THdfsFileFormat::TEXT]) {
+ if (file->file_compression != THdfsCompression::NONE) {
+ int64_t bytes_required = file->file_length * COMPRESSED_TEXT_COMPRESSION_RATIO;
+ scanner_thread_mem_usage = ::max(bytes_required, scanner_thread_mem_usage);
+ }
+ }
+ scanner_thread_bytes_required_ += scanner_thread_mem_usage;
row_batches_get_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueueGetWaitTime");
row_batches_put_timer_ = ADD_TIMER(runtime_profile(), "RowBatchQueuePutWaitTime");
return Status::OK();
@@ -174,9 +218,10 @@ Status HdfsScanNode::Open(RuntimeState* state) {
max_num_scanner_threads_ = runtime_state_->query_options().num_scanner_threads;
}
DCHECK_GT(max_num_scanner_threads_, 0);
- spare_reservation_.Store(buffer_pool_client_.GetReservation());
+
thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
+
return Status::OK();
}
@@ -217,28 +262,37 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges,
return Status::OK();
}
-bool HdfsScanNode::EnoughReservationForExtraThread(const unique_lock<mutex>& lock) {
- DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
- if (spare_reservation_.Load() >= ideal_scan_range_reservation_) return true;
- int64_t increase = ideal_scan_range_reservation_ - spare_reservation_.Load();
- if (!buffer_pool_client_.IncreaseReservation(increase)) return false;
- spare_reservation_.Add(increase);
- return true;
-}
-
-int64_t HdfsScanNode::DeductReservationForScannerThread(const unique_lock<mutex>& lock,
- bool first_thread) {
- DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
- int64_t amount;
- if (first_thread) {
- amount = spare_reservation_.Load() >= ideal_scan_range_reservation_ ?
- ideal_scan_range_reservation_ : resource_profile_.min_reservation;
- } else {
- amount = ideal_scan_range_reservation_;
+// For controlling the amount of memory used for scanners, we approximate the
+// scanner mem usage based on scanner_thread_bytes_required_, rather than the
+// consumption in the scan node's mem tracker. The problem with the scan node
+// trackers is that it does not account for the memory the scanner will use.
+// For example, if there is 110 MB of memory left (based on the mem tracker)
+// and we estimate that a scanner will use 100MB, we want to make sure to only
+// start up one additional thread. However, after starting the first thread, the
+// mem tracker value will not change immediately (it takes some time before the
+// scanner is running and starts using memory). Therefore we just use the estimate
+// based on the number of running scanner threads.
+bool HdfsScanNode::EnoughMemoryForScannerThread(bool new_thread) {
+ int64_t committed_scanner_mem =
+ active_scanner_thread_counter_.value() * scanner_thread_bytes_required_;
+ int64_t tracker_consumption = mem_tracker()->consumption();
+ int64_t est_additional_scanner_mem = committed_scanner_mem - tracker_consumption;
+ if (est_additional_scanner_mem < 0) {
+ // This is the case where our estimate was too low. Expand the estimate based
+ // on the usage.
+ int64_t avg_consumption =
+ tracker_consumption / active_scanner_thread_counter_.value();
+ // Take the average and expand it by 50%. Some scanners will not have hit their
+ // steady state mem usage yet.
+ // TODO: how can we scale down if we've overestimated.
+ // TODO: better heuristic?
+ scanner_thread_bytes_required_ = static_cast<int64_t>(avg_consumption * 1.5);
+ est_additional_scanner_mem = 0;
}
- int64_t remainder = spare_reservation_.Add(-amount);
- DCHECK_GE(remainder, 0);
- return amount;
+
+ // If we are starting a new thread, take that into account now.
+ if (new_thread) est_additional_scanner_mem += scanner_thread_bytes_required_;
+ return est_additional_scanner_mem < mem_tracker()->SpareCapacity();
}
void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
@@ -268,45 +322,36 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
// TODO: This still leans heavily on starvation-free locks, come up with a more
// correct way to communicate between this method and ScannerThreadHelper
unique_lock<mutex> lock(lock_);
-
- const int64_t num_active_scanner_threads = active_scanner_thread_counter_.value();
- const bool first_thread = num_active_scanner_threads == 0;
// Cases 1, 2, 3.
if (done_ || all_ranges_started_ ||
- num_active_scanner_threads >= progress_.remaining()) {
+ active_scanner_thread_counter_.value() >= progress_.remaining()) {
break;
}
// Cases 5 and 6.
- if (!first_thread &&
+ if (active_scanner_thread_counter_.value() > 0 &&
(materialized_row_batches_->Size() >= max_materialized_row_batches_ ||
- !EnoughReservationForExtraThread(lock))) {
+ !EnoughMemoryForScannerThread(true))) {
break;
}
// Case 7 and 8.
- if (num_active_scanner_threads >= max_num_scanner_threads_ ||
- !pool->TryAcquireThreadToken()) {
+ bool is_reserved = false;
+ if (active_scanner_thread_counter_.value() >= max_num_scanner_threads_ ||
+ !pool->TryAcquireThreadToken(&is_reserved)) {
break;
}
- // Deduct the reservation. We haven't dropped the lock since the
- // first_thread/EnoughReservationForExtraThread() checks so spare reservation
- // must be available.
- int64_t scanner_thread_reservation =
- DeductReservationForScannerThread(lock, first_thread);
COUNTER_ADD(&active_scanner_thread_counter_, 1);
string name = Substitute("scanner-thread (finst:$0, plan-node-id:$1, thread-idx:$2)",
PrintId(runtime_state_->fragment_instance_id()), id(),
num_scanner_threads_started_counter_->value());
- auto fn = [this, scanner_thread_reservation]() {
- this->ScannerThread(scanner_thread_reservation);
- };
+
+ auto fn = [this]() { this->ScannerThread(); };
std::unique_ptr<Thread> t;
status =
Thread::Create(FragmentInstanceState::FINST_THREAD_GROUP_NAME, name, fn, &t, true);
if (!status.ok()) {
- ReturnReservationFromScannerThread(scanner_thread_reservation);
COUNTER_ADD(&active_scanner_thread_counter_, -1);
// Release the token and skip running callbacks to find a replacement. Skipping
// serves two purposes. First, it prevents a mutual recursion between this function
@@ -327,10 +372,9 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
}
}
-void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) {
+void HdfsScanNode::ScannerThread() {
SCOPED_THREAD_COUNTER_MEASUREMENT(scanner_thread_counters());
SCOPED_THREAD_COUNTER_MEASUREMENT(runtime_state_->total_thread_statistics());
- DiskIoMgr* io_mgr = runtime_state_->io_mgr();
// Make thread-local copy of filter contexts to prune scan ranges, and to pass to the
// scanner for finer-grained filtering. Use a thread-local MemPool for the filter
@@ -356,7 +400,8 @@ void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) {
// this thread.
unique_lock<mutex> l(lock_);
if (active_scanner_thread_counter_.value() > 1) {
- if (runtime_state_->resource_pool()->optional_exceeded()) {
+ if (runtime_state_->resource_pool()->optional_exceeded() ||
+ !EnoughMemoryForScannerThread(false)) {
// We can't break here. We need to update the counter with the lock held or else
// all threads might see active_scanner_thread_counter_.value > 1
COUNTER_ADD(&active_scanner_thread_counter_, -1);
@@ -376,29 +421,21 @@ void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) {
// to return if there's an error.
ranges_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused);
- // Take a snapshot of num_unqueued_files_ before calling GetNextUnstartedRange().
+ ScanRange* scan_range;
+ // Take a snapshot of num_unqueued_files_ before calling GetNextRange().
// We don't want num_unqueued_files_ to go to zero between the return from
- // GetNextUnstartedRange() and the check for when all ranges are complete.
+ // GetNextRange() and the check for when all ranges are complete.
int num_unqueued_files = num_unqueued_files_.Load();
// TODO: the Load() acts as an acquire barrier. Is this needed? (i.e. any earlier
// stores that need to complete?)
AtomicUtil::MemoryBarrier();
- ScanRange* scan_range;
- bool needs_buffers;
Status status =
- io_mgr->GetNextUnstartedRange(reader_context_.get(), &scan_range, &needs_buffers);
+ runtime_state_->io_mgr()->GetNextRange(reader_context_.get(), &scan_range);
- if (status.ok() && scan_range != nullptr) {
- if (needs_buffers) {
- status = io_mgr->AllocateBuffersForRange(
- reader_context_.get(), &buffer_pool_client_, scan_range,
- scanner_thread_reservation);
- }
- if (status.ok()) {
- // Got a scan range. Process the range end to end (in this thread).
- status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
- &expr_results_pool, scan_range, scanner_thread_reservation);
- }
+ if (status.ok() && scan_range != NULL) {
+ // Got a scan range. Process the range end to end (in this thread).
+ status = ProcessSplit(filter_status.ok() ? filter_ctxs : vector<FilterContext>(),
+ &expr_results_pool, scan_range);
}
if (!status.ok()) {
@@ -428,8 +465,8 @@ void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) {
// TODO: Based on the usage pattern of all_ranges_started_, it looks like it is not
// needed to acquire the lock in x86.
unique_lock<mutex> l(lock_);
- // All ranges have been queued and GetNextUnstartedRange() returned NULL. This means
- // that every range is either done or being processed by another thread.
+ // All ranges have been queued and GetNextRange() returned NULL. This means that
+ // every range is either done or being processed by another thread.
all_ranges_started_ = true;
break;
}
@@ -437,7 +474,6 @@ void HdfsScanNode::ScannerThread(int64_t scanner_thread_reservation) {
COUNTER_ADD(&active_scanner_thread_counter_, -1);
exit:
- ReturnReservationFromScannerThread(scanner_thread_reservation);
runtime_state_->resource_pool()->ReleaseThreadToken(false);
for (auto& ctx: filter_ctxs) ctx.expr_eval->Close(runtime_state_);
filter_mem_pool.FreeAll();
@@ -445,8 +481,7 @@ exit:
}
Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
- MemPool* expr_results_pool, ScanRange* scan_range,
- int64_t scanner_thread_reservation) {
+ MemPool* expr_results_pool, ScanRange* scan_range) {
DCHECK(scan_range != NULL);
ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data());
@@ -471,9 +506,8 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
return Status::OK();
}
- ScannerContext context(runtime_state_, this, &buffer_pool_client_, partition,
- filter_ctxs, expr_results_pool);
- context.AddStream(scan_range, scanner_thread_reservation);
+ ScannerContext context(
+ runtime_state_, this, partition, scan_range, filter_ctxs, expr_results_pool);
scoped_ptr<HdfsScanner> scanner;
Status status = CreateAndOpenScanner(partition, &context, &scanner);
if (!status.ok()) {
@@ -515,7 +549,9 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
void HdfsScanNode::SetDoneInternal() {
if (done_) return;
done_ = true;
- if (reader_context_ != nullptr) reader_context_->Cancel();
+ if (reader_context_ != nullptr) {
+ runtime_state_->io_mgr()->CancelContext(reader_context_.get());
+ }
materialized_row_batches_->Shutdown();
}
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 81e826e..a1c97cf 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -27,7 +27,6 @@
#include <boost/scoped_ptr.hpp>
#include <boost/thread/mutex.hpp>
-#include "common/atomic.h"
#include "exec/filter-context.h"
#include "exec/hdfs-scan-node-base.h"
#include "runtime/io/disk-io-mgr.h"
@@ -59,14 +58,8 @@ class TPlanNode;
/// 5. The scanner finishes the scan range and informs the scan node so it can track
/// end of stream.
///
-/// Buffer management:
-/// ------------------
-/// The different scanner threads all allocate I/O buffers from the node's Buffer Pool
-/// client. The scan node ensures that enough reservation is available to start a
-/// scanner thread before launching each one with (see
-/// EnoughReservationForExtraThread()), after which the scanner thread is responsible
-/// for staying within the reservation handed off to it.
-///
+/// TODO: This class allocates a bunch of small utility objects that should be
+/// recycled.
/// TODO: Remove this class once the fragment-based multi-threaded execution is
/// fully functional.
class HdfsScanNode : public HdfsScanNodeBase {
@@ -107,7 +100,12 @@ class HdfsScanNode : public HdfsScanNodeBase {
private:
/// Released when initial ranges are issued in the first call to GetNext().
- CountingBarrier ranges_issued_barrier_{1};
+ CountingBarrier ranges_issued_barrier_;
+
+ /// The estimated memory required to start up a new scanner thread. If the memory
+ /// left (due to limits) is less than this value, we won't start up optional
+ /// scanner threads.
+ int64_t scanner_thread_bytes_required_;
/// Thread group for all scanner worker threads
ThreadGroup scanner_threads_;
@@ -132,16 +130,16 @@ class HdfsScanNode : public HdfsScanNodeBase {
/// are finished, an error/cancellation occurred, or the limit was reached.
/// Setting this to true triggers the scanner threads to clean up.
/// This should not be explicitly set. Instead, call SetDone().
- bool done_ = false;
+ bool done_;
/// Set to true if all ranges have started. Some of the ranges may still be in flight
/// being processed by scanner threads, but no new ScannerThreads should be started.
- bool all_ranges_started_ = false;
+ bool all_ranges_started_;
/// The id of the callback added to the thread resource manager when thread token
/// is available. Used to remove the callback before this scan node is destroyed.
/// -1 if no callback is registered.
- int thread_avail_cb_id_ = -1;
+ int thread_avail_cb_id_;
/// Maximum number of scanner threads. Set to 'NUM_SCANNER_THREADS' if that query
/// option is set. Otherwise, it's set to the number of cpu cores. Scanner threads
@@ -149,14 +147,6 @@ class HdfsScanNode : public HdfsScanNodeBase {
/// the number of cores.
int max_num_scanner_threads_;
- /// Amount of the 'buffer_pool_client_' reservation that is not allocated to scanner
- /// threads. Doled out to scanner threads when they are started and returned when
- /// those threads no longer need it. Can be atomically incremented without holding
- /// 'lock_' but 'lock_' is held when decrementing to ensure that the check for
- /// reservation and the actual deduction are atomic with respect to other threads
- /// trying to claim reservation.
- AtomicInt64 spare_reservation_{0};
-
/// The wait time for fetching a row batch from the row batch queue.
RuntimeProfile::Counter* row_batches_get_timer_;
@@ -170,35 +160,21 @@ class HdfsScanNode : public HdfsScanNodeBase {
/// Main function for scanner thread. This thread pulls the next range to be
/// processed from the IoMgr and then processes the entire range end to end.
/// This thread terminates when all scan ranges are complete or an error occurred.
- /// The caller must have reserved 'scanner_thread_reservation' bytes of memory for
- /// this thread with DeductReservationForScannerThread().
- void ScannerThread(int64_t scanner_thread_reservation);
+ void ScannerThread();
/// Process the entire scan range with a new scanner object. Executed in scanner
/// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows
/// in this split.
Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs,
- MemPool* expr_results_pool, io::ScanRange* scan_range,
- int64_t scanner_thread_reservation) WARN_UNUSED_RESULT;
-
- /// Return true if there is enough reservation to start an extra scanner thread.
- /// Tries to increase reservation if enough is not already available in
- /// 'spare_reservation_'. 'lock_' must be held via 'lock'
- bool EnoughReservationForExtraThread(const boost::unique_lock<boost::mutex>& lock);
-
- /// Deduct reservation to start a new scanner thread from 'spare_reservation_'. If
- /// 'first_thread' is true, this is the first thread to be started and only the
- /// minimum reservation is required to be available. Otherwise
- /// EnoughReservationForExtra() thread must have returned true in the current
- /// critical section so that 'ideal_scan_range_bytes_' is available for the extra
- /// thread. Returns the amount deducted. 'lock_' must be held via 'lock'.
- int64_t DeductReservationForScannerThread(const boost::unique_lock<boost::mutex>& lock,
- bool first_thread);
-
- /// Called by scanner thread to return or all of its reservation that is not needed.
- void ReturnReservationFromScannerThread(int64_t bytes) {
- spare_reservation_.Add(bytes);
- }
+ MemPool* expr_results_pool, io::ScanRange* scan_range) WARN_UNUSED_RESULT;
+
+ /// Returns true if there is enough memory (against the mem tracker limits) to
+ /// have a scanner thread.
+ /// If new_thread is true, the calculation is for starting a new scanner thread.
+ /// If false, it determines whether there's adequate memory for the existing
+ /// set of scanner threads.
+ /// lock_ must be taken before calling this.
+ bool EnoughMemoryForScannerThread(bool new_thread);
/// Checks for eos conditions and returns batches from materialized_row_batches_.
Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos)
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 2a6a912..253bcc8 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -144,10 +144,8 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
DCHECK(false);
}
}
- if (compressed_text_scan_ranges.size() > 0) {
- RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
- compressed_text_files));
- }
+ RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
+ compressed_text_files));
if (lzo_text_files.size() > 0) {
// This will dlopen the lzo binary and can fail if the lzo binary is not present.
RETURN_IF_ERROR(HdfsLzoTextScanner::IssueInitialRanges(scan_node, lzo_text_files));
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 5afb4e5..7cf89ba 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -30,7 +30,6 @@
#include "exec/scanner-context.inline.h"
#include "rpc/thrift-util.h"
#include "runtime/collection-value-builder.h"
-#include "runtime/exec-env.h"
#include "runtime/tuple-row.h"
#include "runtime/tuple.h"
#include "runtime/runtime-state.h"
@@ -48,10 +47,6 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
"When true, TIMESTAMPs read from files written by Parquet-MR (used by Hive) will "
"be converted from UTC to local time. Writes are unaffected.");
-// Max dictionary page header size in bytes. This is an estimate and only needs to be an
-// upper bound.
-static const int MAX_DICT_HEADER_SIZE = 100;
-
// Max data page header size in bytes. This is an estimate and only needs to be an upper
// bound. It is theoretically possible to have a page header of any size due to string
// value statistics, but in practice we'll have trouble reading string values this large.
@@ -71,8 +66,6 @@ static int debug_count = 0;
#define SHOULD_TRIGGER_DEBUG_ACTION(x) (false)
#endif
-using namespace impala::io;
-
namespace impala {
const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
@@ -841,99 +834,8 @@ static bool RequiresSkippedDictionaryHeaderCheck(
return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
}
-Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
- const parquet::ColumnChunk& col_chunk, int row_group_idx) {
- num_buffered_values_ = 0;
- data_ = nullptr;
- data_end_ = nullptr;
- stream_ = nullptr;
- io_reservation_ = 0;
- metadata_ = &col_chunk.meta_data;
- num_values_read_ = 0;
- def_level_ = -1;
- // See ColumnReader constructor.
- rep_level_ = max_rep_level() == 0 ? 0 : -1;
- pos_current_value_ = -1;
-
- if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
- RETURN_IF_ERROR(Codec::CreateDecompressor(
- nullptr, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_));
- }
- int64_t col_start = col_chunk.meta_data.data_page_offset;
-
- RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_,
- parent_->filename(), row_group_idx, col_idx(), schema_element(),
- parent_->state_));
-
- if (col_chunk.meta_data.__isset.dictionary_page_offset) {
- // Already validated in ValidateColumnOffsets()
- DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
- col_start = col_chunk.meta_data.dictionary_page_offset;
- }
- int64_t col_len = col_chunk.meta_data.total_compressed_size;
- if (col_len <= 0) {
- return Status(Substitute("File '$0' contains invalid column chunk size: $1",
- filename(), col_len));
- }
- int64_t col_end = col_start + col_len;
-
- // Already validated in ValidateColumnOffsets()
- DCHECK_GT(col_end, 0);
- DCHECK_LT(col_end, file_desc.file_length);
- const ParquetFileVersion& file_version = parent_->file_version_;
- if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2, 9)) {
- // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
- // dictionary page header size in total_compressed_size and total_uncompressed_size
- // (see IMPALA-694). We pad col_len to compensate.
- int64_t bytes_remaining = file_desc.file_length - col_end;
- int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
- col_len += pad;
- }
-
- // TODO: this will need to change when we have co-located files and the columns
- // are different files.
- if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
- return Status(Substitute("Expected parquet column file path '$0' to match "
- "filename '$1'", col_chunk.file_path, filename()));
- }
-
- const ScanRange* metadata_range = parent_->metadata_range_;
- int64_t partition_id = parent_->context_->partition_descriptor()->id();
- const ScanRange* split_range =
- static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
- // Determine if the column is completely contained within a local split.
- bool col_range_local = split_range->expected_local()
- && col_start >= split_range->offset()
- && col_end <= split_range->offset() + split_range->len();
- scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
- filename(), col_len, col_start, partition_id, split_range->disk_id(),
- col_range_local,
- BufferOpts(split_range->try_cache(), file_desc.mtime));
- ClearDictionaryDecoder();
- return Status::OK();
-}
-
-Status BaseScalarColumnReader::StartScan() {
- DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
- DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
- ScannerContext* context = parent_->context_;
- DCHECK_GT(io_reservation_, 0);
- bool needs_buffers;
- RETURN_IF_ERROR(io_mgr->StartScanRange(
- parent_->scan_node_->reader_context(), scan_range_, &needs_buffers));
- if (needs_buffers) {
- RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
- parent_->scan_node_->reader_context(), context->bp_client(),
- scan_range_, io_reservation_));
- }
- stream_ = parent_->context_->AddStream(scan_range_, io_reservation_);
- DCHECK(stream_ != nullptr);
- return Status::OK();
-}
-
Status BaseScalarColumnReader::ReadPageHeader(bool peek,
parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool* eos) {
- DCHECK(stream_ != nullptr);
*eos = false;
uint8_t* buffer;
@@ -1017,7 +919,7 @@ Status BaseScalarColumnReader::InitDictionary() {
bool eos;
parquet::PageHeader next_page_header;
uint32_t next_header_size;
- DCHECK(stream_ != nullptr);
+
DCHECK(!HasDictionaryDecoder());
RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header,
@@ -1129,14 +1031,6 @@ Status BaseScalarColumnReader::InitDictionary() {
return Status::OK();
}
-Status BaseScalarColumnReader::InitDictionaries(
- const vector<BaseScalarColumnReader*> readers) {
- for (BaseScalarColumnReader* reader : readers) {
- RETURN_IF_ERROR(reader->InitDictionary());
- }
- return Status::OK();
-}
-
Status BaseScalarColumnReader::ReadDataPage() {
// We're about to move to the next data page. The previous data page is
// now complete, free up any memory allocated for it. If the data page contained
http://git-wip-us.apache.org/repos/asf/impala/blob/161cbe30/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index f19d40a..86ca239 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -322,29 +322,42 @@ class BaseScalarColumnReader : public ParquetColumnReader {
BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
const SlotDescriptor* slot_desc)
: ParquetColumnReader(parent, node, slot_desc),
+ data_(NULL),
+ data_end_(NULL),
+ def_levels_(true),
+ rep_levels_(false),
+ page_encoding_(parquet::Encoding::PLAIN_DICTIONARY),
+ num_buffered_values_(0),
+ num_values_read_(0),
+ metadata_(NULL),
+ stream_(NULL),
data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
}
virtual ~BaseScalarColumnReader() { }
- /// Resets the reader for each row group in the file and creates the scan
- /// range for the column, but does not start it. To start scanning,
- /// set_io_reservation() must be called to assign reservation to this
- /// column, followed by StartScan().
- Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk,
- int row_group_idx);
-
- /// Starts the column scan range. The reader must be Reset() and have a
- /// reservation assigned via set_io_reservation(). This must be called
- /// before any of the column data can be read (including dictionary and
- /// data pages). Returns an error status if there was an error starting the
- /// scan or allocating buffers for it.
- Status StartScan();
-
- /// Helper to start scans for multiple columns at once.
- static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) {
- for (BaseScalarColumnReader* reader : readers) RETURN_IF_ERROR(reader->StartScan());
+ /// This is called once for each row group in the file.
+ Status Reset(const parquet::ColumnMetaData* metadata, ScannerContext::Stream* stream) {
+ DCHECK(stream != NULL);
+ DCHECK(metadata != NULL);
+
+ num_buffered_values_ = 0;
+ data_ = NULL;
+ data_end_ = NULL;
+ stream_ = stream;
+ metadata_ = metadata;
+ num_values_read_ = 0;
+ def_level_ = -1;
+ // See ColumnReader constructor.
+ rep_level_ = max_rep_level() == 0 ? 0 : -1;
+ pos_current_value_ = -1;
+
+ if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
+ RETURN_IF_ERROR(Codec::CreateDecompressor(
+ NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_));
+ }
+ ClearDictionaryDecoder();
return Status::OK();
}
@@ -359,27 +372,22 @@ class BaseScalarColumnReader : public ParquetColumnReader {
if (dict_decoder != nullptr) dict_decoder->Close();
}
- io::ScanRange* scan_range() const { return scan_range_; }
int64_t total_len() const { return metadata_->total_compressed_size; }
int col_idx() const { return node_.col_idx; }
THdfsCompression::type codec() const {
if (metadata_ == NULL) return THdfsCompression::NONE;
return PARQUET_TO_IMPALA_CODEC[metadata_->codec];
}
- void set_io_reservation(int bytes) { io_reservation_ = bytes; }
/// Reads the next definition and repetition levels for this column. Initializes the
/// next data page if necessary.
virtual bool NextLevels() { return NextLevels<true>(); }
- /// Check the data stream to see if there is a dictionary page. If there is,
- /// use that page to initialize dict_decoder_ and advance the data stream
- /// past the dictionary page.
+ // Check the data stream to see if there is a dictionary page. If there is,
+ // use that page to initialize dict_decoder_ and advance the data stream
+ // past the dictionary page.
Status InitDictionary();
- /// Convenience function to initialize multiple dictionaries.
- static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> readers);
-
// Returns the dictionary or NULL if the dictionary doesn't exist
virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; }
@@ -405,45 +413,33 @@ class BaseScalarColumnReader : public ParquetColumnReader {
// fit in as few cache lines as possible.
/// Pointer to start of next value in data page
- uint8_t* data_ = nullptr;
+ uint8_t* data_;
/// End of the data page.
- const uint8_t* data_end_ = nullptr;
+ const uint8_t* data_end_;
/// Decoder for definition levels.
- ParquetLevelDecoder def_levels_{true};
+ ParquetLevelDecoder def_levels_;
/// Decoder for repetition levels.
- ParquetLevelDecoder rep_levels_{false};
+ ParquetLevelDecoder rep_levels_;
/// Page encoding for values of the current data page. Cached here for perf. Set in
/// InitDataPage().
- parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
+ parquet::Encoding::type page_encoding_;
/// Num values remaining in the current data page
- int num_buffered_values_ = 0;
+ int num_buffered_values_;
// Less frequently used members that are not accessed in inner loop should go below
// here so they do not occupy precious cache line space.
/// The number of values seen so far. Updated per data page.
- int64_t num_values_read_ = 0;
-
- /// Metadata for the column for the current row group.
- const parquet::ColumnMetaData* metadata_ = nullptr;
+ int64_t num_values_read_;
+ const parquet::ColumnMetaData* metadata_;
boost::scoped_ptr<Codec> decompressor_;
-
- /// The scan range for the column's data. Initialized for each row group by Reset().
- io::ScanRange* scan_range_ = nullptr;
-
- // Stream used to read data from 'scan_range_'. Initialized by StartScan().
- ScannerContext::Stream* stream_ = nullptr;
-
- /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. Must be set
- /// with set_io_reservation() before 'stream_' is initialized. Reset for each row group
- /// by Reset().
- int64_t io_reservation_ = 0;
+ ScannerContext::Stream* stream_;
/// Pool to allocate storage for data pages from - either decompression buffers for
/// compressed data pages or copies of the data page with var-len data to attach to