You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2022/08/30 16:47:45 UTC

[impala] branch master updated: IMPALA-11529: FILE__POSITION virtual column for ORC tables

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new fec7a79c5 IMPALA-11529: FILE__POSITION virtual column for ORC tables
fec7a79c5 is described below

commit fec7a79c50de9ef5db2c652ece111aaa303d57c7
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Mon Aug 22 13:50:46 2022 +0200

    IMPALA-11529: FILE__POSITION virtual column for ORC tables
    
    IMPALA-11350 implemented the FILE__POSITION virtual column for Parquet
    files. This ticket does the same but for ORC files. Note, that for full
    ACID ORC tables there have already been an implementation of row__id
    that could simply be re-used for this ticket.
    
    Testing:
     - TestScannersVirtualColumns.test_virtual_column_file_position_generic
       is changed to run now on ORC as well. I don't think further testing
       is required as this functionality has already been there for row__id
       we just re-used it for FILE__POSITION.
    
    Change-Id: Ie8e951f73ceb910d64cd149192853a4a2131f79b
    Reviewed-on: http://gerrit.cloudera.org:8080/18909
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-scanner.cc                        |  3 ++-
 be/src/exec/orc/hdfs-orc-scanner.cc                | 13 ++++++++++---
 be/src/exec/orc/hdfs-orc-scanner.h                 |  6 ++++--
 be/src/exec/orc/orc-column-readers.cc              | 10 +++++-----
 be/src/exec/orc/orc-column-readers.h               |  2 +-
 .../virtual-column-file-position-generic.test      | 22 +++++++++++-----------
 tests/query_test/test_scanners.py                  |  4 ++--
 7 files changed, 35 insertions(+), 25 deletions(-)

diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 1b7e8f47c..7795a8c09 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -76,7 +76,8 @@ HdfsScanner::~HdfsScanner() {
 }
 
 Status HdfsScanner::ValidateSlotDescriptors() const {
-  if (file_format() != THdfsFileFormat::PARQUET) {
+  if (file_format() != THdfsFileFormat::PARQUET &&
+      file_format() != THdfsFileFormat::ORC) {
     // Virtual column FILE__POSITION is only supported for PARQUET files.
     for (SlotDescriptor* sd : scan_node_->virtual_column_slots()) {
       if (sd->virtual_column_type() == TVirtualColumnType::FILE_POSITION) {
diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc b/be/src/exec/orc/hdfs-orc-scanner.cc
index ca62de143..ce04bf178 100644
--- a/be/src/exec/orc/hdfs-orc-scanner.cc
+++ b/be/src/exec/orc/hdfs-orc-scanner.cc
@@ -599,7 +599,14 @@ Status HdfsOrcScanner::ResolveColumns(const TupleDescriptor& tuple_desc,
   SlotDescriptor* pos_slot_desc = nullptr;
   for (SlotDescriptor* slot_desc : tuple_desc.slots()) {
     // Skip columns not (necessarily) stored in the data files.
-    if (!file_metadata_utils_.NeedDataInFile(slot_desc)) continue;
+    if (!file_metadata_utils_.NeedDataInFile(slot_desc)) {
+      if (slot_desc->virtual_column_type() == TVirtualColumnType::FILE_POSITION) {
+        DCHECK(pos_slot_desc == nullptr)
+            << "There should only be one position slot per tuple";
+        file_position_ = slot_desc;
+      }
+      continue;
+    }
 
     node = nullptr;
     pos_field = false;
@@ -676,7 +683,7 @@ void HdfsOrcScanner::SetSyntheticAcidFieldForOriginalFile(const SlotDescriptor*
           ValidWriteIdList::GetBucketProperty(filename());
       break;
     case ACID_FIELD_ROWID_INDEX:
-      acid_synthetic_rowid_ = slot_desc;
+      file_position_ = slot_desc;
     default:
       break;
   }
@@ -934,7 +941,7 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
       try {
         end_of_stripe_ |= !row_reader_->next(*orc_root_batch_);
         RETURN_IF_ERROR(orc_root_reader_->UpdateInputBatch(orc_root_batch_.get()));
-        if (acid_synthetic_rowid_ != nullptr) {
+        if (file_position_ != nullptr) {
           // Set the first row index of the batch. The ORC reader guarantees that rows
           // are consecutive in the returned batch.
           orc_root_reader_->SetFileRowIndex(row_reader_->getRowNumber());
diff --git a/be/src/exec/orc/hdfs-orc-scanner.h b/be/src/exec/orc/hdfs-orc-scanner.h
index 33ce0f875..7383f4942 100644
--- a/be/src/exec/orc/hdfs-orc-scanner.h
+++ b/be/src/exec/orc/hdfs-orc-scanner.h
@@ -271,8 +271,10 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   /// Non-ACID file in full ACID table.
   bool acid_original_file_ = false;
 
-  /// Slot descriptor of synthetic rowid of original files.
-  const SlotDescriptor* acid_synthetic_rowid_ = nullptr;
+  /// Slot descriptor for file position (a.k.a. row ID) for two different purposes: It can
+  /// be used for the synthetic rowid column in original files of a full ACID table, or
+  /// for the FILE__POSITION virtual column.
+  const SlotDescriptor* file_position_ = nullptr;
 
   /// True if we need to validate the row batches against the valid write id list. This
   /// only needs to be done for Hive Streaming Ingestion. The 'write id' will be the same
diff --git a/be/src/exec/orc/orc-column-readers.cc b/be/src/exec/orc/orc-column-readers.cc
index 5dfbb4f7c..aeb1cbd0c 100644
--- a/be/src/exec/orc/orc-column-readers.cc
+++ b/be/src/exec/orc/orc-column-readers.cc
@@ -537,20 +537,20 @@ Status OrcStructReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch,
                              NumElements() - row_idx_);
     scratch_batch->num_tuples += num_rows_read;
   }
-  if (scanner_->acid_synthetic_rowid_ != nullptr) {
-    FillSyntheticRowId(scratch_batch, scratch_batch_idx, num_rows_read);
+  if (scanner_->file_position_ != nullptr) {
+    FillVirtualRowIdColumn(scratch_batch, scratch_batch_idx, num_rows_read);
   }
   row_idx_ += num_rows_read;
   return Status::OK();
 }
 
-void OrcStructReader::FillSyntheticRowId(ScratchTupleBatch* scratch_batch,
+void OrcStructReader::FillVirtualRowIdColumn(ScratchTupleBatch* scratch_batch,
     int scratch_batch_idx, int num_rows) {
-    DCHECK(scanner_->acid_synthetic_rowid_ != nullptr);
+    DCHECK(scanner_->file_position_ != nullptr);
     int tuple_size = OrcColumnReader::scanner_->tuple_byte_size();
     uint8_t* first_tuple = scratch_batch->tuple_mem + scratch_batch_idx * tuple_size;
     int64_t* first_slot = reinterpret_cast<Tuple*>(first_tuple)->GetBigIntSlot(
-        scanner_->acid_synthetic_rowid_->tuple_offset());
+        scanner_->file_position_->tuple_offset());
     StrideWriter<int64_t> out{first_slot, tuple_size};
     for (int i = 0; i < num_rows; ++i) {
       *out.Advance() = file_row_idx_++;
diff --git a/be/src/exec/orc/orc-column-readers.h b/be/src/exec/orc/orc-column-readers.h
index 7dd967a5c..f67e487de 100644
--- a/be/src/exec/orc/orc-column-readers.h
+++ b/be/src/exec/orc/orc-column-readers.h
@@ -630,7 +630,7 @@ class OrcStructReader : public OrcComplexColumnReader {
   void SetFileRowIndex(int64_t file_row_idx) { file_row_idx_ = file_row_idx; }
 
  private:
-  void FillSyntheticRowId(ScratchTupleBatch* scratch_batch, int scratch_batch_idx,
+  void FillVirtualRowIdColumn(ScratchTupleBatch* scratch_batch, int scratch_batch_idx,
       int num_rows);
 
   orc::StructVectorBatch* batch_ = nullptr;
diff --git a/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-generic.test b/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-generic.test
index e0b13c8c7..bb06a353b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-generic.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-generic.test
@@ -1,6 +1,6 @@
 ====
 ---- QUERY
-select file__position, * from functional_parquet.alltypestiny order by id;
+select file__position, * from alltypestiny order by id;
 ---- RESULTS
 0,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
 1,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
@@ -14,7 +14,7 @@ select file__position, * from functional_parquet.alltypestiny order by id;
 BIGINT, INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
 ====
 ---- QUERY
-select file__position, * from functional_parquet.alltypestiny
+select file__position, * from alltypestiny
 where file__position = 1
 order by id;
 ---- RESULTS
@@ -26,7 +26,7 @@ order by id;
 BIGINT, INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
 ====
 ---- QUERY
-select file__position, count(*) from functional_parquet.alltypestiny
+select file__position, count(*) from alltypestiny
 group by file__position;
 ---- RESULTS
 1,4
@@ -35,7 +35,7 @@ group by file__position;
 BIGINT, BIGINT
 ====
 ---- QUERY
-select file__position, id from functional_parquet.alltypestiny
+select file__position, id from alltypestiny
 order by id;
 ---- RESULTS
 0,0
@@ -50,7 +50,7 @@ order by id;
 BIGINT, INT
 ====
 ---- QUERY
-select file__position from functional_parquet.alltypestiny;
+select file__position from alltypestiny;
 ---- RESULTS
 0
 1
@@ -64,14 +64,14 @@ select file__position from functional_parquet.alltypestiny;
 BIGINT
 ====
 ---- QUERY
-select max(file__position) from functional_parquet.alltypestiny;
+select max(file__position) from alltypestiny;
 ---- RESULTS
 1
 ---- TYPES
 BIGINT
 ====
 ---- QUERY
-select file__position, id from functional_parquet.complextypestbl
+select file__position, id from complextypestbl
 order by id;
 ---- RESULTS
 0,1
@@ -86,7 +86,7 @@ order by id;
 BIGINT, BIGINT
 ====
 ---- QUERY
-select file__position, id, int_array from functional_parquet.complextypestbl;
+select file__position, id, int_array from complextypestbl;
 ---- RESULTS
 0,1,'[1,2,3]'
 1,2,'[NULL,1,2,NULL,3,NULL]'
@@ -100,7 +100,7 @@ select file__position, id, int_array from functional_parquet.complextypestbl;
 BIGINT, BIGINT, STRING
 ====
 ---- QUERY
-select file__position, id, item from functional_parquet.complextypestbl c, c.int_array
+select file__position, id, item from complextypestbl c, c.int_array
 order by id;
 ---- RESULTS
 0,1,1
@@ -117,7 +117,7 @@ order by id;
 BIGINT, BIGINT, INT
 ====
 ---- QUERY
-select file__position, id, item from functional_parquet.complextypestbl c, c.int_array_array;
+select file__position, id, item from complextypestbl c, c.int_array_array;
 ---- RESULTS
 0,1,'[1,2]'
 0,1,'[3,4]'
@@ -134,7 +134,7 @@ select file__position, id, item from functional_parquet.complextypestbl c, c.int
 BIGINT, BIGINT, STRING
 ====
 ---- QUERY
-select file__position, id, i.item from functional_parquet.complextypestbl c, c.int_array_array a, a.item i
+select file__position, id, i.item from complextypestbl c, c.int_array_array a, a.item i
 order by id;
 ---- RESULTS
 0,1,1
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 31c927e29..62c0ab3b8 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -150,8 +150,8 @@ class TestScannersVirtualColumns(ImpalaTestSuite):
   def test_virtual_column_file_position_generic(self, vector):
     # Generic tests about virtual column file position.
     file_format = vector.get_value('table_format').file_format
-    # TODO: add support for other file formats, especially ORC
-    if file_format not in ['parquet']:
+    # TODO: add support for other file format
+    if file_format not in ['parquet', 'orc']:
       pytest.skip()
     self.run_test_case('QueryTest/virtual-column-file-position-generic', vector)