You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/22 16:14:04 UTC

[4/7] impala git commit: IMPALA-6669: Remove Parquet NeedsSeedingForBatchedReading()

IMPALA-6669: Remove Parquet NeedsSeedingForBatchedReading()

I noticed that we could remove this part of the interface and instead
do the "seeding" in ParquetColumnReader::Read*ValueBatch(). It should
be easier to understand with level reading and consumption happening
driven by the same function instead of split between files.

Testing:
Ran core tests. This code path should be thoroughly exercised by
the regular scanner tests.

Change-Id: I98f65d0d72e86b1e3db1f3543a03873afb9da062
Reviewed-on: http://gerrit.cloudera.org:8080/9636
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/24ae7dfe
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/24ae7dfe
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/24ae7dfe

Branch: refs/heads/2.x
Commit: 24ae7dfe8c3593b098805b6aaba5a2be5774ad59
Parents: 28eb128
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Mar 14 11:01:05 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 22 00:49:57 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc   | 29 ++-----------------
 be/src/exec/hdfs-parquet-scanner.h    |  2 +-
 be/src/exec/parquet-column-readers.cc | 14 ++++++++--
 be/src/exec/parquet-column-readers.h  | 45 +++++++++++++++---------------
 4 files changed, 37 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/24ae7dfe/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index e279369..0d79f53 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -713,33 +713,8 @@ Status HdfsParquetScanner::NextRowGroup() {
       ReleaseSkippedRowGroupResources();
       continue;
     }
-
-    bool seeding_failed = false;
-    for (ParquetColumnReader* col_reader: column_readers_) {
-      // Seed collection and boolean column readers with NextLevel().
-      // The ScalarColumnReaders use an optimized ReadValueBatch() that
-      // should not be seeded.
-      // TODO: Refactor the column readers to look more like the optimized
-      // ScalarColumnReader::ReadValueBatch() which does not need seeding. This
-      // will allow better sharing of code between the row-wise and column-wise
-      // materialization strategies.
-      if (col_reader->NeedsSeedingForBatchedReading()
-          && !col_reader->NextLevels()) {
-        seeding_failed = true;
-        break;
-      }
-    }
-    if (seeding_failed) {
-      if (!parse_status_.ok()) {
-        RETURN_IF_ERROR(state_->LogOrReturnError(parse_status_.msg()));
-      }
-      ReleaseSkippedRowGroupResources();
-      continue;
-    } else {
-      // Seeding succeeded - we're ready to read the row group.
-      DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
-      break;
-    }
+    DCHECK(parse_status_.ok()) << "Invalid parse_status_" << parse_status_.GetDetail();
+    break;
   }
 
   DCHECK(parse_status_.ok());

http://git-wip-us.apache.org/repos/asf/impala/blob/24ae7dfe/be/src/exec/hdfs-parquet-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h
index f0043b5..ccb109c 100644
--- a/be/src/exec/hdfs-parquet-scanner.h
+++ b/be/src/exec/hdfs-parquet-scanner.h
@@ -344,7 +344,7 @@ class HdfsParquetScanner : public HdfsScanner {
       llvm::Function** process_scratch_batch_fn)
       WARN_UNUSED_RESULT;
 
-  /// The repetition level is set to this value to indicate the end of a row group.
+  /// The rep and def levels are set to this value to indicate the end of a row group.
   static const int16_t ROW_GROUP_END = numeric_limits<int16_t>::min();
   /// Indicates an invalid definition or repetition level.
   static const int16_t INVALID_LEVEL = -1;

http://git-wip-us.apache.org/repos/asf/impala/blob/24ae7dfe/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index 09842c5..1c9f57f 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -274,8 +274,6 @@ class ScalarColumnReader : public BaseScalarColumnReader {
     return ReadValue<false>(pool, tuple);
   }
 
-  virtual bool NeedsSeedingForBatchedReading() const { return false; }
-
   virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
       uint8_t* tuple_mem, int* num_values) {
     return ReadValueBatch<true>(pool, max_values, tuple_size, tuple_mem, num_values);
@@ -790,6 +788,11 @@ bool ParquetColumnReader::ColReaderDebugAction(int* val_count) {
 
 bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
     int tuple_size, uint8_t* tuple_mem, int* num_values) {
+  // The below loop requires that NextLevels() was called previously to populate
+  // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
+  // row group.
+  if (def_level_ == HdfsParquetScanner::INVALID_LEVEL && !NextLevels()) return false;
+
   int val_count = 0;
   bool continue_execution = true;
   while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
@@ -813,6 +816,11 @@ bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
 
 bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
     int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) {
+  // The below loop requires that NextLevels() was called previously to populate
+  // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
+  // row group.
+  if (def_level_ == HdfsParquetScanner::INVALID_LEVEL && !NextLevels()) return false;
+
   int val_count = 0;
   bool continue_execution = true;
   while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
@@ -1227,7 +1235,7 @@ bool BaseScalarColumnReader::NextPage() {
   if (UNLIKELY(!parent_->parse_status_.ok())) return false;
   if (num_buffered_values_ == 0) {
     rep_level_ = HdfsParquetScanner::ROW_GROUP_END;
-    def_level_ = HdfsParquetScanner::INVALID_LEVEL;
+    def_level_ = HdfsParquetScanner::ROW_GROUP_END;
     pos_current_value_ = HdfsParquetScanner::INVALID_POS;
     return false;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/24ae7dfe/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h
index 6b92ccb..8e3b1ae 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -205,18 +205,14 @@ class ParquetColumnReader {
   /// not in collections.
   virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0;
 
-  /// Returns true if this reader needs to be seeded with NextLevels() before
-  /// calling ReadValueBatch() or ReadNonRepeatedValueBatch().
-  /// Note that all readers need to be seeded before calling the non-batched ReadValue().
-  virtual bool NeedsSeedingForBatchedReading() const { return true; }
-
   /// Batched version of ReadValue() that reads up to max_values at once and materializes
   /// them into tuples in tuple_mem. Returns the number of values actually materialized
   /// in *num_values. The return value, error behavior and state changes are generally
   /// the same as in ReadValue(). For example, if an error occurs in the middle of
   /// materializing a batch then false is returned, and num_values, tuple_mem, as well as
   /// this column reader are left in an undefined state, assuming that the caller will
-  /// immediately abort execution.
+  /// immediately abort execution. NextLevels() does *not* need to be called before
+  /// ReadValueBatch(), unlike ReadValue().
   virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
       uint8_t* tuple_mem, int* num_values);
 
@@ -245,7 +241,11 @@ class ParquetColumnReader {
   void ReadPosition(Tuple* tuple);
 
   /// Returns true if this column reader has reached the end of the row group.
-  inline bool RowGroupAtEnd() { return rep_level_ == HdfsParquetScanner::ROW_GROUP_END; }
+  inline bool RowGroupAtEnd() {
+    DCHECK_EQ(rep_level_ == HdfsParquetScanner::ROW_GROUP_END,
+              def_level_ == HdfsParquetScanner::ROW_GROUP_END);
+    return rep_level_ == HdfsParquetScanner::ROW_GROUP_END;
+  }
 
   /// If 'row_batch' is non-NULL, transfers the remaining resources backing tuples to it,
   /// and frees up other resources. If 'row_batch' is NULL frees all resources instead.
@@ -261,18 +261,19 @@ class ParquetColumnReader {
   const SlotDescriptor* pos_slot_desc_;
 
   /// The next value to write into the position slot, if there is one. 64-bit int because
-  /// the pos slot is always a BIGINT Set to -1 when this column reader does not have a
-  /// current rep and def level (i.e. before the first NextLevels() call or after the last
-  /// value in the column has been read).
+  /// the pos slot is always a BIGINT Set to INVALID_POS when this column reader does not
+  /// have a current rep and def level (i.e. before the first NextLevels() call or after
+  /// the last value in the column has been read).
   int64_t pos_current_value_;
 
   /// The current repetition and definition levels of this reader. Advanced via
-  /// ReadValue() and NextLevels(). Set to -1 when this column reader does not have a
-  /// current rep and def level (i.e. before the first NextLevels() call or after the last
-  /// value in the column has been read). If this is not inside a collection, rep_level_ is
-  /// always 0.
-  /// int16_t is large enough to hold the valid levels 0-255 and sentinel value -1.
-  /// The maximum values are cached here because they are accessed in inner loops.
+  /// ReadValue() and NextLevels(). Set to INVALID_LEVEL before the first NextLevels()
+  /// call for a row group or if an error is encountered decoding a level. Set to
+  /// ROW_GROUP_END after the last value in the column has been read). If this is not
+  /// inside a collection, rep_level_ is always 0, INVALID_LEVEL or ROW_GROUP_END.
+  /// int16_t is large enough to hold the valid levels 0-255 and negative sentinel values
+  /// INVALID_LEVEL and ROW_GROUP_END. The maximum values are cached here because they
+  /// are accessed in inner loops.
   int16_t rep_level_;
   int16_t max_rep_level_;
   int16_t def_level_;
@@ -352,10 +353,10 @@ class BaseScalarColumnReader : public ParquetColumnReader {
     stream_ = stream;
     metadata_ = metadata;
     num_values_read_ = 0;
-    def_level_ = -1;
+    def_level_ = HdfsParquetScanner::INVALID_LEVEL;
     // See ColumnReader constructor.
-    rep_level_ = max_rep_level() == 0 ? 0 : -1;
-    pos_current_value_ = -1;
+    rep_level_ = max_rep_level() == 0 ? 0 : HdfsParquetScanner::INVALID_LEVEL;
+    pos_current_value_ = HdfsParquetScanner::INVALID_POS;
 
     if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
       RETURN_IF_ERROR(Codec::CreateDecompressor(
@@ -551,9 +552,9 @@ class CollectionColumnReader : public ParquetColumnReader {
 
   /// This is called once for each row group in the file.
   void Reset() {
-    def_level_ = -1;
-    rep_level_ = -1;
-    pos_current_value_ = -1;
+    def_level_ = HdfsParquetScanner::INVALID_LEVEL;
+    rep_level_ = HdfsParquetScanner::INVALID_LEVEL;
+    pos_current_value_ = HdfsParquetScanner::INVALID_POS;
   }
 
   virtual void Close(RowBatch* row_batch) {