You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/22 16:14:04 UTC
[4/7] impala git commit: IMPALA-6669: Remove Parquet
NeedsSeedingForBatchedReading()
IMPALA-6669: Remove Parquet NeedsSeedingForBatchedReading()
I noticed that we could remove this part of the interface and instead
do the "seeding" in ParquetColumnReader::Read*ValueBatch(). It should
be easier to understand with level reading and consumption happening
driven by the same function instead of split between files.
Testing:
Ran core tests. This code path should be thoroughly exercised by
the regular scanner tests.
Change-Id: I98f65d0d72e86b1e3db1f3543a03873afb9da062
Reviewed-on: http://gerrit.cloudera.org:8080/9636
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/24ae7dfe
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/24ae7dfe
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/24ae7dfe
Branch: refs/heads/2.x
Commit: 24ae7dfe8c3593b098805b6aaba5a2be5774ad59
Parents: 28eb128
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Mar 14 11:01:05 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 22 00:49:57 2018 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-parquet-scanner.cc | 29 ++-----------------
be/src/exec/hdfs-parquet-scanner.h | 2 +-
be/src/exec/parquet-column-readers.cc | 14 ++++++++--
be/src/exec/parquet-column-readers.h | 45 +++++++++++++++---------------
4 files changed, 37 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/24ae7dfe/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index e279369..0d79f53 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -713,33 +713,8 @@ Status HdfsParquetScanner::NextRowGroup() {
ReleaseSkippedRowGroupResources();
continue;
}
-
- bool seeding_failed = false;
- for (ParquetColumnReader* col_reader: column_readers_) {
- // Seed collection and boolean column readers with NextLevel().
- // The ScalarColumnReaders use an optimized ReadValueBatch() that
- // should not be seeded.
- // TODO: Refactor the column readers to look more like the optimized
- // ScalarColumnReader::ReadValueBatch() which does not need seeding. This
- // will allow better sharing of code between the row-wise and column-wise
- // materialization strategies.
- if (col_reader->NeedsSeedingForBatchedReading()
- && !col_reader->NextLevels()) {
- seeding_failed = true;
- break;
- }
- }
- if (seeding_failed) {
- if (!parse_status_.ok()) {
- RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
- }
- ReleaseSkippedRowGroupResources();
- continue;
- } else {
- // Seeding succeeded - we're ready to read the row group.
- DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
- break;
- }
+ DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
+ break;
}
DCHECK(parse_status_.ok());
http://git-wip-us.apache.org/repos/asf/impala/blob/24ae7dfe/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index f0043b5..ccb109c 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -344,7 +344,7 @@ class HdfsParquetScanner : public HdfsScanner {
llvm::Function** process_scratch_batch_fn)
WARN_UNUSED_RESULT;
- /// The repetition level is set to this value to indicate the end of a row group.
+ /// The rep and def levels are set to this value to indicate the end of a row group.
static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();
/// Indicates an invalid definition or repetition level.
static const int16_t INVALID_LEVEL = -1;
http://git-wip-us.apache.org/repos/asf/impala/blob/24ae7dfe/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 09842c5..1c9f57f 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -274,8 +274,6 @@ class ScalarColumnReader : public BaseScalarColumnReader {
return ReadValue<false>(pool, tuple);
}
- virtual bool NeedsSeedingForBatchedReading() const { return false; }
-
virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
uint8_t* tuple_mem, int* num_values) {
return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, num_values);
@@ -790,6 +788,11 @@ bool ParquetColumnReader::ColReaderDebugAction(int* val_count) {
bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
int tuple_size, uint8_t* tuple_mem, int* num_values) {
+ // The below loop requires that NextLevels() was called previously to populate
+ // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
+ // row group.
+ if (def_level_ == HdfsParquetScanner::INVALID_LEVEL && !NextLevels()) return false;
+
int val_count = 0;
bool continue_execution = true;
while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
@@ -813,6 +816,11 @@ bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) {
+ // The below loop requires that NextLevels() was called previously to populate
+ // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
+ // row group.
+ if (def_level_ == HdfsParquetScanner::INVALID_LEVEL && !NextLevels()) return false;
+
int val_count = 0;
bool continue_execution = true;
while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
@@ -1227,7 +1235,7 @@ bool BaseScalarColumnReader::NextPage() {
if (UNLIKELY(!parent_->parse_status_.ok())) return false;
if (num_buffered_values_ == 0) {
rep_level_ = HdfsParquetScanner::ROW_GROUP_END;
- def_level_ = HdfsParquetScanner::INVALID_LEVEL;
+ def_level_ = HdfsParquetScanner::ROW_GROUP_END;
pos_current_value_ = HdfsParquetScanner::INVALID_POS;
return false;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/24ae7dfe/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 6b92ccb..8e3b1ae 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -205,18 +205,14 @@ class ParquetColumnReader {
/// not in collections.
virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0;
- /// Returns true if this reader needs to be seeded with NextLevels() before
- /// calling ReadValueBatch() or ReadNonRepeatedValueBatch().
- /// Note that all readers need to be seeded before calling the non-batched ReadValue().
- virtual bool NeedsSeedingForBatchedReading() const { return true; }
-
/// Batched version of ReadValue() that reads up to max_values at once and materializes
/// them into tuples in tuple_mem. Returns the number of values actually materialized
/// in *num_values. The return value, error behavior and state changes are generally
/// the same as in ReadValue(). For example, if an error occurs in the middle of
/// materializing a batch then false is returned, and num_values, tuple_mem, as well as
/// this column reader are left in an undefined state, assuming that the caller will
- /// immediately abort execution.
+ /// immediately abort execution. NextLevels() does *not* need to be called before
+ /// ReadValueBatch(), unlike ReadValue().
virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
uint8_t* tuple_mem, int* num_values);
@@ -245,7 +241,11 @@ class ParquetColumnReader {
void ReadPosition(Tuple* tuple);
/// Returns true if this column reader has reached the end of the row group.
- inline bool RowGroupAtEnd() { return rep_level_ == HdfsParquetScanner::ROW_GROUP_END; }
+ inline bool RowGroupAtEnd() {
+ DCHECK_EQ(rep_level_ == HdfsParquetScanner::ROW_GROUP_END,
+ def_level_ == HdfsParquetScanner::ROW_GROUP_END);
+ return rep_level_ == HdfsParquetScanner::ROW_GROUP_END;
+ }
/// If 'row_batch' is non-NULL, transfers the remaining resources backing tuples to it,
/// and frees up other resources. If 'row_batch' is NULL frees all resources instead.
@@ -261,18 +261,19 @@ class ParquetColumnReader {
const SlotDescriptor* pos_slot_desc_;
/// The next value to write into the position slot, if there is one. 64-bit int because
- /// the pos slot is always a BIGINT Set to -1 when this column reader does not have a
- /// current rep and def level (i.e. before the first NextLevels() call or after the last
- /// value in the column has been read).
+ /// the pos slot is always a BIGINT Set to INVALID_POS when this column reader does not
+ /// have a current rep and def level (i.e. before the first NextLevels() call or after
+ /// the last value in the column has been read).
int64_t pos_current_value_;
/// The current repetition and definition levels of this reader. Advanced via
- /// ReadValue() and NextLevels(). Set to -1 when this column reader does not have a
- /// current rep and def level (i.e. before the first NextLevels() call or after the last
- /// value in the column has been read). If this is not inside a collection, rep_level_ is
- /// always 0.
- /// int16_t is large enough to hold the valid levels 0-255 and sentinel value -1.
- /// The maximum values are cached here because they are accessed in inner loops.
+ /// ReadValue() and NextLevels(). Set to INVALID_LEVEL before the first NextLevels()
+ /// call for a row group or if an error is encountered decoding a level. Set to
+ /// ROW_GROUP_END after the last value in the column has been read). If this is not
+ /// inside a collection, rep_level_ is always 0, INVALID_LEVEL or ROW_GROUP_END.
+ /// int16_t is large enough to hold the valid levels 0-255 and negative sentinel values
+ /// INVALID_LEVEL and ROW_GROUP_END. The maximum values are cached here because they
+ /// are accessed in inner loops.
int16_t rep_level_;
int16_t max_rep_level_;
int16_t def_level_;
@@ -352,10 +353,10 @@ class BaseScalarColumnReader : public ParquetColumnReader {
stream_ = stream;
metadata_ = metadata;
num_values_read_ = 0;
- def_level_ = -1;
+ def_level_ = HdfsParquetScanner::INVALID_LEVEL;
// See ColumnReader constructor.
- rep_level_ = max_rep_level() == 0 ? 0 : -1;
- pos_current_value_ = -1;
+ rep_level_ = max_rep_level() == 0 ? 0 : HdfsParquetScanner::INVALID_LEVEL;
+ pos_current_value_ = HdfsParquetScanner::INVALID_POS;
if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
RETURN_IF_ERROR(Codec::CreateDecompressor(
@@ -551,9 +552,9 @@ class CollectionColumnReader : public ParquetColumnReader {
/// This is called once for each row group in the file.
void Reset() {
- def_level_ = -1;
- rep_level_ = -1;
- pos_current_value_ = -1;
+ def_level_ = HdfsParquetScanner::INVALID_LEVEL;
+ rep_level_ = HdfsParquetScanner::INVALID_LEVEL;
+ pos_current_value_ = HdfsParquetScanner::INVALID_POS;
}
virtual void Close(RowBatch* row_batch) {