You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2022/08/13 20:01:45 UTC

[impala] branch master updated (15ce822e7 -> 522ee1fcc)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 15ce822e7 IMPALA-11468: Port "Block Bloom filter false positive correction" from Kudu
     new adadec7fd IMPALA-11480: Retain all YARN container logs in a Jenkins run
     new 522ee1fcc IMPALA-11350: Add virtual column FILE__POSITION for Parquet tables

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/base-sequence-scanner.h                |   3 -
 be/src/exec/file-metadata-utils.cc                 |   5 +-
 be/src/exec/hdfs-orc-scanner.h                     |   4 +
 be/src/exec/hdfs-scan-node-base.cc                 |   4 +
 be/src/exec/hdfs-scan-node-base.h                  |   3 +-
 be/src/exec/hdfs-scanner.cc                        |  15 ++
 be/src/exec/hdfs-scanner.h                         |   7 +
 be/src/exec/hdfs-text-scanner.h                    |  12 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  38 +++-
 be/src/exec/parquet/hdfs-parquet-scanner.h         |   9 +-
 .../parquet/parquet-collection-column-reader.cc    |   9 +-
 be/src/exec/parquet/parquet-column-readers.cc      | 121 ++++++++---
 be/src/exec/parquet/parquet-column-readers.h       |  55 +++--
 common/thrift/CatalogObjects.thrift                |   3 +-
 common/thrift/CatalogService.thrift                |  16 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  |   1 +
 .../org/apache/impala/catalog/IcebergTable.java    |   1 +
 .../main/java/org/apache/impala/catalog/Table.java |   4 +
 .../org/apache/impala/catalog/VirtualColumn.java   |   5 +
 .../impala/catalog/local/CatalogdMetaProvider.java |  19 +-
 .../impala/catalog/local/DirectMetaProvider.java   |   7 +
 .../apache/impala/catalog/local/LocalFsTable.java  |   6 +-
 .../impala/catalog/local/LocalIcebergTable.java    |   6 +-
 .../apache/impala/catalog/local/LocalTable.java    |   4 +
 .../apache/impala/catalog/local/MetaProvider.java  |   2 +
 .../org/apache/impala/planner/HdfsScanNode.java    |   4 +
 testdata/LineItemMultiBlock/README.dox             |   6 +
 ... lineitem_multiblock_variable_num_rows.parquet} | Bin 2767242 -> 2431994 bytes
 testdata/bin/create-load-data.sh                   |   8 +-
 .../common/etc/hadoop/conf/yarn-site.xml.py        |   4 +
 .../functional/functional_schema_template.sql      |  23 +++
 .../datasets/functional/schema_constraints.csv     |   1 +
 .../queries/QueryTest/mixing-virtual-columns.test  | 112 +++++++++++
 .../virtual-column-file-position-generic.test      | 157 +++++++++++++++
 .../virtual-column-file-position-parquet.test      | 223 +++++++++++++++++++++
 tests/query_test/test_scanners.py                  |  46 +++++
 36 files changed, 860 insertions(+), 83 deletions(-)
 copy testdata/LineItemMultiBlock/{000000_0 => lineitem_multiblock_variable_num_rows.parquet} (60%)
 mode change 100755 => 100644
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/mixing-virtual-columns.test
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-generic.test
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-parquet.test


[impala] 02/02: IMPALA-11350: Add virtual column FILE__POSITION for Parquet tables

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 522ee1fcc09d47074c75440ba4fc3d258e1c95b3
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Tue Jun 21 16:14:32 2022 +0200

    IMPALA-11350: Add virtual column FILE__POSITION for Parquet tables
    
    Virtual column FILE__POSITION returns the ordinal position of the row
    in the data file. It will be useful to add support for Iceberg's
    position-based delete files
    
    This patch only adds FILE__POSITION to Parquet tables. It works
    similarly to the handling of collection position slots. I.e. we
    add the responsibility of dealing with the file position slot to
    an existing column reader. Because of page-filtering and late
    materialization we already tracked the file position in member
    'current_row_' during scanning.
    
    Querying the FILE__POSITION in other file formats raises an error.
    
    Testing:
     * added e2e tests
    
    Change-Id: I4ef72c683d0d5ae2898bca36fa87e74b663671f7
    Reviewed-on: http://gerrit.cloudera.org:8080/18704
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/base-sequence-scanner.h                |   3 -
 be/src/exec/file-metadata-utils.cc                 |   5 +-
 be/src/exec/hdfs-orc-scanner.h                     |   4 +
 be/src/exec/hdfs-scan-node-base.cc                 |   4 +
 be/src/exec/hdfs-scan-node-base.h                  |   3 +-
 be/src/exec/hdfs-scanner.cc                        |  15 ++
 be/src/exec/hdfs-scanner.h                         |   7 +
 be/src/exec/hdfs-text-scanner.h                    |  12 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  38 +++-
 be/src/exec/parquet/hdfs-parquet-scanner.h         |   9 +-
 .../parquet/parquet-collection-column-reader.cc    |   9 +-
 be/src/exec/parquet/parquet-column-readers.cc      | 121 ++++++++---
 be/src/exec/parquet/parquet-column-readers.h       |  55 +++--
 common/thrift/CatalogObjects.thrift                |   3 +-
 common/thrift/CatalogService.thrift                |  16 +-
 .../java/org/apache/impala/catalog/HdfsTable.java  |   1 +
 .../org/apache/impala/catalog/IcebergTable.java    |   1 +
 .../main/java/org/apache/impala/catalog/Table.java |   4 +
 .../org/apache/impala/catalog/VirtualColumn.java   |   5 +
 .../impala/catalog/local/CatalogdMetaProvider.java |  19 +-
 .../impala/catalog/local/DirectMetaProvider.java   |   7 +
 .../apache/impala/catalog/local/LocalFsTable.java  |   6 +-
 .../impala/catalog/local/LocalIcebergTable.java    |   6 +-
 .../apache/impala/catalog/local/LocalTable.java    |   4 +
 .../apache/impala/catalog/local/MetaProvider.java  |   2 +
 .../org/apache/impala/planner/HdfsScanNode.java    |   4 +
 testdata/LineItemMultiBlock/README.dox             |   6 +
 .../lineitem_multiblock_variable_num_rows.parquet  | Bin 0 -> 2431994 bytes
 testdata/bin/create-load-data.sh                   |   8 +-
 .../functional/functional_schema_template.sql      |  23 +++
 .../datasets/functional/schema_constraints.csv     |   1 +
 .../queries/QueryTest/mixing-virtual-columns.test  | 112 +++++++++++
 .../virtual-column-file-position-generic.test      | 157 +++++++++++++++
 .../virtual-column-file-position-parquet.test      | 223 +++++++++++++++++++++
 tests/query_test/test_scanners.py                  |  46 +++++
 35 files changed, 856 insertions(+), 83 deletions(-)

diff --git a/be/src/exec/base-sequence-scanner.h b/be/src/exec/base-sequence-scanner.h
index 3c2326e81..b0948c0b1 100644
--- a/be/src/exec/base-sequence-scanner.h
+++ b/be/src/exec/base-sequence-scanner.h
@@ -111,9 +111,6 @@ class BaseSequenceScanner : public HdfsScanner {
   /// not at a sync marker or other metadata of the range. May set 'eos_'.
   virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT = 0;
 
-  /// Returns type of scanner: e.g. rcfile, seqfile
-  virtual THdfsFileFormat::type file_format() const = 0;
-
   BaseSequenceScanner(HdfsScanNodeBase*, RuntimeState*);
 
   /// Read sync marker from 'stream_' and validate against 'header_->sync'. Returns
diff --git a/be/src/exec/file-metadata-utils.cc b/be/src/exec/file-metadata-utils.cc
index 6a2b1eff0..52891ab77 100644
--- a/be/src/exec/file-metadata-utils.cc
+++ b/be/src/exec/file-metadata-utils.cc
@@ -43,8 +43,7 @@ Tuple* FileMetadataUtils::CreateTemplateTuple(int64_t partition_id, MemPool* mem
   // HdfsScanNodeBase.
   Tuple* template_tuple = scan_node_->GetTemplateTupleForPartitionId(partition_id);
   if (template_tuple != nullptr) {
-    template_tuple =
-        template_tuple->DeepCopy(*scan_node_->tuple_desc(), mem_pool);
+    template_tuple = template_tuple->DeepCopy(*scan_node_->tuple_desc(), mem_pool);
   }
   if (UNLIKELY(!scan_node_->virtual_column_slots().empty())) {
     AddFileLevelVirtualColumns(mem_pool, template_tuple);
@@ -57,7 +56,7 @@ Tuple* FileMetadataUtils::CreateTemplateTuple(int64_t partition_id, MemPool* mem
 
 void FileMetadataUtils::AddFileLevelVirtualColumns(MemPool* mem_pool,
     Tuple* template_tuple) {
-  DCHECK(template_tuple != nullptr);
+  if (template_tuple == nullptr) return;
   for (int i = 0; i < scan_node_->virtual_column_slots().size(); ++i) {
     const SlotDescriptor* slot_desc = scan_node_->virtual_column_slots()[i];
     if (slot_desc->virtual_column_type() != TVirtualColumnType::INPUT_FILE_NAME) {
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index a446c1998..5e2f72226 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -181,6 +181,10 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   virtual Status ProcessSplit() override WARN_UNUSED_RESULT;
   virtual void Close(RowBatch* row_batch) override;
 
+  THdfsFileFormat::type file_format() const override {
+    return THdfsFileFormat::ORC;
+  }
+
  protected:
   virtual int64_t GetNumberOfRowsInFile() const override {
     return static_cast<int64_t>(reader_->getNumberOfRows());
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index b72167832..8d36019b0 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -1039,6 +1039,10 @@ bool HdfsScanPlanNode::HasVirtualColumnInTemplateTuple() const {
     DCHECK(sd->IsVirtual());
     if (sd->virtual_column_type() == TVirtualColumnType::INPUT_FILE_NAME) {
       return true;
+    } else if (sd->virtual_column_type() == TVirtualColumnType::FILE_POSITION) {
+      // We return false at the end of the function if there are no virtual
+      // columns in the template tuple.
+      continue;
     } else {
       // Adding DCHECK here so we don't forget to update this when adding new virtual
       // column.
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 4fcc03644..13e4c99c4 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -584,7 +584,8 @@ class HdfsScanNodeBase : public ScanNode {
 
   /// Returns true if there are no materialized slots, such as a count(*) over the table.
   inline bool IsZeroSlotTableScan() const {
-    return materialized_slots().empty() && tuple_desc()->tuple_path().empty();
+    return materialized_slots().empty() && tuple_desc()->tuple_path().empty() &&
+        virtual_column_slots().empty();
   }
 
   /// Return true if scan over 'filename' require row validation.
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 8da49d4b1..501203909 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -75,8 +75,23 @@ HdfsScanner::HdfsScanner()
 HdfsScanner::~HdfsScanner() {
 }
 
+Status HdfsScanner::ValidateSlotDescriptors() const {
+  if (file_format() != THdfsFileFormat::PARQUET) {
+    // Virtual column FILE__POSITION is only supported for PARQUET files.
+    for (SlotDescriptor* sd : scan_node_->virtual_column_slots()) {
+      if (sd->virtual_column_type() == TVirtualColumnType::FILE_POSITION) {
+        return Status(Substitute(
+            "Virtual column FILE__POSITION is not supported for $0 files.",
+            _THdfsFileFormat_VALUES_TO_NAMES.find(file_format())->second));
+      }
+    }
+  }
+  return Status::OK();
+}
+
 Status HdfsScanner::Open(ScannerContext* context) {
   context_ = context;
+  RETURN_IF_ERROR(ValidateSlotDescriptors());
   file_metadata_utils_.SetFile(state_, scan_node_->GetFileDesc(
       context->partition_descriptor()->id(), context->GetStream()->filename()));
   stream_ = context->GetStream();
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index e68e4be45..ec9096975 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -179,6 +179,9 @@ class HdfsScanner {
   /// Return the plan id of the scan node that this scanner is associated with.
   int GetScanNodeId() const { return scan_node_->id(); }
 
+  /// Returns type of scanner: e.g. rcfile, seqfile
+  virtual THdfsFileFormat::type file_format() const = 0;
+
   /// Not inlined in IR so it can be replaced with a constant.
   int IR_NO_INLINE tuple_byte_size() const { return tuple_byte_size_; }
   int IR_NO_INLINE tuple_byte_size(const TupleDescriptor& desc) const {
@@ -681,6 +684,10 @@ class HdfsScanner {
   /// conjunct_evals_.
   ScalarExprEvaluator* GetConjunctEval(int idx) const;
 
+  /// Returns error if this scanner doesn't support a slot in the tuple, e.g. virtual
+  /// columns.
+  Status ValidateSlotDescriptors() const;
+
   /// Unit test constructor
   HdfsScanner();
 };
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index d74db9ff7..8bfbfe2e3 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -52,8 +52,12 @@ class HdfsTextScanner : public HdfsScanner {
   virtual ~HdfsTextScanner();
 
   /// Implementation of HdfsScanner interface.
-  virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT;
-  virtual void Close(RowBatch* row_batch);
+  virtual Status Open(ScannerContext* context) override WARN_UNUSED_RESULT;
+  virtual void Close(RowBatch* row_batch) override;
+
+  THdfsFileFormat::type file_format() const override {
+    return THdfsFileFormat::TEXT;
+  }
 
   /// Issue io manager byte ranges for 'files'.
   static Status IssueInitialRanges(HdfsScanNodeBase* scan_node,
@@ -87,7 +91,7 @@ class HdfsTextScanner : public HdfsScanner {
   static const char* LLVM_CLASS_NAME;
 
  protected:
-  virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT;
+  virtual Status GetNextInternal(RowBatch* row_batch) override WARN_UNUSED_RESULT;
 
   /// Reset the scanner.  This clears any partial state that needs to
   /// be cleared when starting or when restarting after an error.
@@ -130,7 +134,7 @@ class HdfsTextScanner : public HdfsScanner {
 
   /// Initializes this scanner for this context.  The context maps to a single
   /// scan range. Advances the scan state to SCAN_RANGE_INITIALIZED.
-  virtual Status InitNewRange() WARN_UNUSED_RESULT;
+  virtual Status InitNewRange() override WARN_UNUSED_RESULT;
 
   /// Finds the start of the first tuple in this scan range and initializes
   /// 'byte_buffer_ptr_' to point to the start of first tuple. Advances the scan state
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 84916e584..64e4f546e 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -849,6 +849,14 @@ Status HdfsParquetScanner::NextRowGroup() {
     // but some data in row groups.
     if (row_group.num_rows == 0 || file_metadata_.num_rows == 0) continue;
 
+    // Let's find the index of the first row in this row group. It's needed to track the
+    // file position of each row.
+    int64_t row_group_first_row = 0;
+    for (int i = 0; i < group_idx_; ++i) {
+      const parquet::RowGroup& row_group = file_metadata_.row_groups[i];
+      row_group_first_row += row_group.num_rows;
+    }
+
     RETURN_IF_ERROR(ParquetMetadataUtils::ValidateColumnOffsets(
         file_desc->filename, file_desc->file_length, row_group));
 
@@ -920,7 +928,7 @@ Status HdfsParquetScanner::NextRowGroup() {
     }
 
     InitComplexColumns();
-    RETURN_IF_ERROR(InitScalarColumns());
+    RETURN_IF_ERROR(InitScalarColumns(row_group_first_row));
 
     // Start scanning dictionary filtering column readers, so we can read the dictionary
     // pages in EvalDictionaryFilters().
@@ -2627,7 +2635,7 @@ inline bool HdfsParquetScanner::ReadCollectionItem(
       // Fill in position slot if applicable
       const SlotDescriptor* pos_slot_desc = col_reader->pos_slot_desc();
       if (pos_slot_desc != nullptr) {
-        col_reader->ReadPositionNonBatched(
+        col_reader->ReadItemPositionNonBatched(
             tuple->GetBigIntSlot(pos_slot_desc->tuple_offset()));
       }
       continue_execution = col_reader->ReadValue(pool, tuple);
@@ -2788,10 +2796,19 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
 
   // Each tuple can have at most one position slot. We'll process this slot desc last.
   SlotDescriptor* pos_slot_desc = nullptr;
+  SlotDescriptor* file_pos_slot_desc = nullptr;
 
   for (SlotDescriptor* slot_desc: tuple_desc.slots()) {
     // Skip partition columns
-    if (!file_metadata_utils_.NeedDataInFile(slot_desc)) continue;
+    if (!file_metadata_utils_.NeedDataInFile(slot_desc)) {
+      if (UNLIKELY(slot_desc->virtual_column_type() ==
+          TVirtualColumnType::FILE_POSITION)) {
+        DCHECK(file_pos_slot_desc == nullptr)
+            << "There should only be one position slot per tuple";
+        file_pos_slot_desc = slot_desc;
+      }
+      continue;
+    }
 
     SchemaNode* node = nullptr;
     bool pos_field;
@@ -2851,7 +2868,15 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc
     column_readers->push_back(reader);
   }
 
-  if (pos_slot_desc != nullptr) {
+  // We either have a file position slot or a position slot in a tuple, but not both.
+  // Because file position is always at the table-level tuple, while position slot is
+  // at collection item-level.
+  DCHECK(file_pos_slot_desc == nullptr || pos_slot_desc == nullptr);
+  if (file_pos_slot_desc != nullptr) {
+    // 'tuple_desc' has a file position slot. Use an existing column reader to populate it
+    DCHECK(!column_readers->empty());
+    (*column_readers)[0]->set_file_pos_slot_desc(file_pos_slot_desc);
+  } else if (pos_slot_desc != nullptr) {
     // 'tuple_desc' has a position slot. Use an existing column reader to populate it.
     DCHECK(!column_readers->empty());
     (*column_readers)[0]->set_pos_slot_desc(pos_slot_desc);
@@ -2926,7 +2951,7 @@ void HdfsParquetScanner::InitComplexColumns() {
   }
 }
 
-Status HdfsParquetScanner::InitScalarColumns() {
+Status HdfsParquetScanner::InitScalarColumns(int64_t row_group_first_row) {
   int64_t partition_id = context_->partition_descriptor()->id();
   const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename());
   DCHECK(file_desc != nullptr);
@@ -2950,7 +2975,8 @@ Status HdfsParquetScanner::InitScalarColumns() {
       return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(),
           col_chunk.meta_data.num_values, num_values, filename());
     }
-    RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, group_idx_));
+    RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, group_idx_,
+        row_group_first_row));
   }
 
   ColumnRangeLengths col_range_lengths(scalar_readers_.size());
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h
index c78609e66..5a0474b5a 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -351,6 +351,10 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   virtual Status ProcessSplit() override WARN_UNUSED_RESULT;
   virtual void Close(RowBatch* row_batch) override;
 
+  THdfsFileFormat::type file_format() const override {
+    return THdfsFileFormat::PARQUET;
+  }
+
   /// Helper function to create ColumnStatsReader object. 'col_order' might be NULL.
   ColumnStatsReader CreateColumnStatsReader(
       const parquet::ColumnChunk& col_chunk, const ColumnType& col_type,
@@ -812,8 +816,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
 
   /// Walks file_metadata_ and initiates reading the materialized columns.  This
   /// initializes 'scalar_readers_' and divides reservation between the columns but
-  /// does not start any scan ranges.
-  Status InitScalarColumns() WARN_UNUSED_RESULT;
+  /// does not start any scan ranges. 'row_group_first_row' is the index of the first
+  /// row in this row group, it is used to track the file position of the rows.
+  Status InitScalarColumns(int64_t row_group_first_row) WARN_UNUSED_RESULT;
 
   /// Initializes the column readers in complex_readers_.
   void InitComplexColumns();
diff --git a/be/src/exec/parquet/parquet-collection-column-reader.cc b/be/src/exec/parquet/parquet-collection-column-reader.cc
index 1499a1073..afe46dece 100644
--- a/be/src/exec/parquet/parquet-collection-column-reader.cc
+++ b/be/src/exec/parquet/parquet-collection-column-reader.cc
@@ -78,9 +78,12 @@ bool CollectionColumnReader::ReadValueBatch(MemPool* pool, int max_values,
       continue_execution = NextLevels();
       continue;
     }
-    // Fill in position slot if applicable
-    if (pos_slot_desc_ != nullptr) {
-      ReadPositionNonBatched(tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset()));
+    // Fill in position slots if applicable
+    if (pos_slot_desc() != nullptr) {
+      ReadItemPositionNonBatched(tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset()));
+    } else if (file_pos_slot_desc() != nullptr) {
+      ReadFilePositionNonBatched(
+          tuple->GetBigIntSlot(file_pos_slot_desc()->tuple_offset()));
     }
     continue_execution = ReadValue(pool, tuple);
     ++val_count;
diff --git a/be/src/exec/parquet/parquet-column-readers.cc b/be/src/exec/parquet/parquet-column-readers.cc
index c85671ebf..b2f82590f 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -149,6 +149,18 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   /// Read 'num_to_read' position values into a batch of tuples starting at 'tuple_mem'.
   void ReadPositions(
       int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+  void ReadItemPositions(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+  void ReadFilePositions(
+      int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT;
+
+  /// Reads file position into 'file_pos' based on 'rep_level'.
+  /// It updates 'current_row_' when 'rep_level' is 0.
+  inline ALWAYS_INLINE void ReadFilePositionBatched(int16_t rep_level, int64_t* file_pos);
+
+  /// Reads position into 'pos' and updates 'pos_current_value_' based on 'rep_level'.
+  /// It updates 'current_row_' when 'rep_level' is 0.
+  inline ALWAYS_INLINE void ReadItemPositionBatched(int16_t rep_level, int64_t* pos);
 
   virtual Status CreateDictionaryDecoder(
       uint8_t* values, int size, DictDecoderBase** decoder) override {
@@ -185,11 +197,6 @@ class ScalarColumnReader : public BaseScalarColumnReader {
   template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
   inline ALWAYS_INLINE bool ReadSlot(Tuple* tuple);
 
-  /// Reads position into 'pos' and updates 'pos_current_value_' based on 'rep_level'.
-  /// This helper is only used on the batched decoding path where we need to reset
-  /// 'pos_current_value_' to 0 based on 'rep_level'.
-  inline ALWAYS_INLINE void ReadPositionBatched(int16_t rep_level, int64_t* pos);
-
   /// Decode one value from *data into 'val' and advance *data. 'data_end' is one byte
   /// past the end of the buffer. Return false and set 'parse_error_' if there is an
   /// error decoding the value.
@@ -460,7 +467,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValueBatc
 
     // Not materializing anything - skip decoding any levels and rely on the value
     // count from page metadata to return the correct number of rows.
-    if (!MATERIALIZED && !IN_COLLECTION) {
+    if (!MATERIALIZED && !IN_COLLECTION && file_pos_slot_desc() == nullptr) {
       // We cannot filter pages in this context.
       DCHECK(!DoesPageFiltering());
       int vals_to_add = min(num_buffered_values_, max_values - val_count);
@@ -473,7 +480,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValueBatc
     // nested collection into the top-level tuple returned by the scan, so we don't
     // care about the nesting structure unless the position slot is being populated,
     // or we filter out rows.
-    if (IN_COLLECTION && (pos_slot_desc_ != nullptr || DoesPageFiltering()) &&
+    if (IN_COLLECTION && (AnyPosSlotToBeFilled() || DoesPageFiltering()) &&
         !rep_levels_.CacheHasNext()) {
       parent_->parse_status_.MergeStatus(
             rep_levels_.CacheNextBatch(num_buffered_values_));
@@ -531,10 +538,10 @@ template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
 bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeValueBatch(
     int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
     int* RESTRICT num_values) RESTRICT {
-  DCHECK(MATERIALIZED || IN_COLLECTION);
+  DCHECK(MATERIALIZED || IN_COLLECTION || file_pos_slot_desc() != nullptr);
   DCHECK_GT(num_buffered_values_, 0);
   DCHECK(def_levels_.CacheHasNext());
-  if (IN_COLLECTION && (pos_slot_desc_ != nullptr || DoesPageFiltering())) {
+  if (IN_COLLECTION && (AnyPosSlotToBeFilled() || DoesPageFiltering())) {
     DCHECK(rep_levels_.CacheHasNext());
   }
   int cache_start_idx = def_levels_.CacheCurrIdx();
@@ -559,10 +566,13 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::MaterializeVa
         // A containing repeated field is empty or NULL, skip the value.
         continue;
       }
-      if (pos_slot_desc_ != nullptr) {
-        ReadPositionBatched(rep_level,
+      if (pos_slot_desc()) {
+        ReadItemPositionBatched(rep_level,
             tuple->GetBigIntSlot(pos_slot_desc_->tuple_offset()));
       }
+    } else if (file_pos_slot_desc()) {
+      ReadFilePositionBatched(rep_level,
+          tuple->GetBigIntSlot(file_pos_slot_desc_->tuple_offset()));
     }
 
     if (MATERIALIZED) {
@@ -590,7 +600,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE,
     uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
   DCHECK_GT(num_buffered_values_, 0);
   if (max_rep_level_ > 0 &&
-      (pos_slot_desc_ != nullptr || DoesPageFiltering())) {
+      (AnyPosSlotToBeFilled() || DoesPageFiltering())) {
     DCHECK(rep_levels_.CacheHasNext());
   }
   int32_t def_level_repeats = def_levels_.NextRepeatedRunLength();
@@ -605,7 +615,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE,
     DCHECK_GT(max_rep_level_, 0) << "Only possible if in a collection.";
     // A containing repeated field is empty or NULL. We don't need to return any values
     // but need to advance any rep levels.
-    if (pos_slot_desc_ != nullptr) {
+    if (AnyPosSlotToBeFilled()) {
       num_def_levels_to_consume =
           min<uint32_t>(def_level_repeats, rep_levels_.CacheRemaining());
     } else {
@@ -615,7 +625,7 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE,
     // Cannot consume more levels than allowed by buffered input values and output space.
     num_def_levels_to_consume = min(min(
         num_buffered_values_, max_values), def_level_repeats);
-    if (pos_slot_desc_ != nullptr) {
+    if (max_rep_level_ > 0 && AnyPosSlotToBeFilled()) {
       num_def_levels_to_consume =
           min<uint32_t>(num_def_levels_to_consume, rep_levels_.CacheRemaining());
     }
@@ -625,7 +635,11 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE,
     int rows_remaining = RowsRemainingInCandidateRange();
     if (max_rep_level_ == 0) {
       num_def_levels_to_consume = min(num_def_levels_to_consume, rows_remaining);
-      current_row_ += num_def_levels_to_consume;
+      if (file_pos_slot_desc()) {
+        ReadFilePositions(num_def_levels_to_consume, tuple_size, tuple_mem);
+      } else {
+        current_row_ += num_def_levels_to_consume;
+      }
     } else {
       // We need to calculate how many 'primitive' values are there until the end
       // of the current candidate range. In the meantime we also fill the position
@@ -636,12 +650,12 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE,
   }
   // Now we have 'num_def_levels_to_consume' set, let's read the slots.
   if (def_level < def_level_of_immediate_repeated_ancestor()) {
-    if (pos_slot_desc_ != nullptr && !DoesPageFiltering()) {
+    if (AnyPosSlotToBeFilled() && !DoesPageFiltering()) {
       rep_levels_.CacheSkipLevels(num_def_levels_to_consume);
     }
     *num_values = 0;
   } else {
-    if (pos_slot_desc_ != nullptr && !DoesPageFiltering()) {
+    if (AnyPosSlotToBeFilled() && !DoesPageFiltering()) {
       ReadPositions(num_def_levels_to_consume, tuple_size, tuple_mem);
     }
     if (MATERIALIZED) {
@@ -887,8 +901,14 @@ bool ScalarColumnReader<bool, parquet::Type::BOOLEAN, true>::DecodeValues(
 }
 
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
-void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadPositionBatched(
-    int16_t rep_level, int64_t* pos) {
+void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::
+    ReadFilePositionBatched(int16_t rep_level, int64_t* file_pos) {
+  *file_pos = current_row_;
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::
+    ReadItemPositionBatched(int16_t rep_level, int64_t* pos) {
   // Reset position counter if we are at the start of a new parent collection.
   if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
   *pos = pos_current_value_++;
@@ -897,11 +917,44 @@ void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadPositionB
 template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
 void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadPositions(
     int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
-  const int pos_slot_offset = pos_slot_desc()->tuple_offset();
-  void* first_slot = reinterpret_cast<Tuple*>(tuple_mem)->GetSlot(pos_slot_offset);
-  StrideWriter<int64_t> out{reinterpret_cast<int64_t*>(first_slot), tuple_size};
+  DCHECK(file_pos_slot_desc() != nullptr || pos_slot_desc() != nullptr);
+  DCHECK(file_pos_slot_desc() == nullptr || pos_slot_desc() == nullptr);
+  if (file_pos_slot_desc()) {
+    ReadFilePositions(num_to_read, tuple_size, tuple_mem);
+  } else {
+    ReadItemPositions(num_to_read, tuple_size, tuple_mem);
+  }
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadFilePositions(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  DCHECK(file_pos_slot_desc() != nullptr);
+  DCHECK(pos_slot_desc() == nullptr);
+  int64_t* file_pos_slot = reinterpret_cast<Tuple*>(tuple_mem)->GetBigIntSlot(
+      file_pos_slot_desc()->tuple_offset());
+  StrideWriter<int64_t> file_out{reinterpret_cast<int64_t*>(file_pos_slot), tuple_size};
+  for (int64_t i = 0; i < num_to_read; ++i) {
+    int64_t* file_pos = file_out.Advance();
+    uint8_t rep_level = max_rep_level() > 0 ? rep_levels_.CacheGetNext() : 0;
+    if (rep_level == 0) ++current_row_;
+    ReadFilePositionBatched(rep_level, file_pos);
+  }
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED>
+void ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadItemPositions(
+    int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT {
+  DCHECK(file_pos_slot_desc() == nullptr);
+  DCHECK(pos_slot_desc() != nullptr);
+  int64_t* pos_slot = reinterpret_cast<Tuple*>(tuple_mem)->GetBigIntSlot(
+      pos_slot_desc()->tuple_offset());
+  StrideWriter<int64_t> out{reinterpret_cast<int64_t*>(pos_slot), tuple_size};
   for (int64_t i = 0; i < num_to_read; ++i) {
-    ReadPositionBatched(rep_levels_.CacheGetNext(), out.Advance());
+    int64_t* pos = out.Advance();
+    uint8_t rep_level = max_rep_level() > 0 ? rep_levels_.CacheGetNext() : 0;
+    if (rep_level == 0) ++current_row_;
+    ReadItemPositionBatched(rep_level, pos);
   }
 }
 
@@ -999,7 +1052,8 @@ void BaseScalarColumnReader::CreateSubRanges(vector<ScanRange::SubRange>* sub_ra
 }
 
 Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
-    const parquet::ColumnChunk& col_chunk, int row_group_idx) {
+    const parquet::ColumnChunk& col_chunk, int row_group_idx,
+    int64_t row_group_first_row) {
   // Ensure metadata is valid before using it to initialize the reader.
   RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_,
       parent_->filename(), row_group_idx, col_idx(), schema_element(),
@@ -1014,6 +1068,7 @@ Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
   // See ColumnReader constructor.
   rep_level_ = max_rep_level() == 0 ? 0 : ParquetLevel::INVALID_LEVEL;
   pos_current_value_ = ParquetLevel::INVALID_POS;
+  current_row_ = row_group_first_row - 1;
 
   vector<ScanRange::SubRange> sub_ranges;
   CreateSubRanges(&sub_ranges);
@@ -1393,18 +1448,30 @@ int BaseScalarColumnReader::FillPositionsInCandidateRange(int rows_remaining,
     pos_slot = reinterpret_cast<Tuple*>(tuple_mem)->GetBigIntSlot(pos_slot_offset);
   }
   StrideWriter<int64_t> pos_writer{pos_slot, tuple_size};
+
+  int64_t *file_pos_slot = nullptr;
+  if (file_pos_slot_desc_ != nullptr) {
+    const int file_pos_slot_offset = file_pos_slot_desc()->tuple_offset();
+    file_pos_slot = reinterpret_cast<Tuple*>(tuple_mem)->GetBigIntSlot(
+        file_pos_slot_offset);
+  }
+  StrideWriter<int64_t> file_pos_writer{file_pos_slot, tuple_size};
   while (rep_levels_.CacheRemaining() && row_count <= rows_remaining &&
          val_count < max_values) {
     if (row_count == rows_remaining && rep_levels_.CachePeekNext() == 0) break;
     int rep_level = rep_levels_.CacheGetNext();
-    if (rep_level == 0) ++row_count;
+    if (rep_level == 0) {
+      ++row_count;
+      ++current_row_;
+    }
     ++val_count;
-    if (pos_writer.IsValid()) {
+    if (file_pos_writer.IsValid()) {
+      *file_pos_writer.Advance() = current_row_;
+    } else if (pos_writer.IsValid()) {
       if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
       *pos_writer.Advance() = pos_current_value_++;
     }
   }
-  current_row_ += row_count;
   return val_count;
 }
 
diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h
index 706b975c5..c5aa058bd 100644
--- a/be/src/exec/parquet/parquet-column-readers.h
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -80,10 +80,16 @@ class ParquetColumnReader {
   }
   const SlotDescriptor* pos_slot_desc() const { return pos_slot_desc_; }
   void set_pos_slot_desc(const SlotDescriptor* pos_slot_desc) {
-    DCHECK(pos_slot_desc_ == NULL);
+    DCHECK(pos_slot_desc_ == nullptr);
     pos_slot_desc_ = pos_slot_desc;
   }
 
+  const SlotDescriptor* file_pos_slot_desc() const { return file_pos_slot_desc_; }
+  void set_file_pos_slot_desc(const SlotDescriptor* file_pos_slot_desc) {
+    DCHECK(file_pos_slot_desc_ == nullptr);
+    file_pos_slot_desc_ = file_pos_slot_desc;
+  }
+
   /// Returns true if this reader materializes collections (i.e. CollectionValues).
   virtual bool IsCollectionReader() const = 0;
 
@@ -162,7 +168,12 @@ class ParquetColumnReader {
   /// call when doing non-batched reading, i.e. NextLevels() must have been called
   /// before each call to this function to advance to the next element in the
   /// collection.
-  inline void ReadPositionNonBatched(int64_t* pos);
+  inline void ReadItemPositionNonBatched(int64_t* pos);
+
+  /// Writes file position based on the current row of the child scanners.
+  /// Only valid to call when doing non-batched reading, i.e. NextLevels() must have been
+  /// called before each call.
+  inline void ReadFilePositionNonBatched(int64_t* file_pos);
 
   /// Returns true if this column reader has reached the end of the row group.
   inline bool RowGroupAtEnd() {
@@ -198,14 +209,21 @@ class ParquetColumnReader {
     tuple->SetNull(DCHECK_NOTNULL(slot_desc_)->null_indicator_offset());
   }
 
+  /// Returns 'true' if there is a file position slot or position slot to be filled.
+  bool AnyPosSlotToBeFilled() const {
+    return pos_slot_desc_ != nullptr || file_pos_slot_desc_ != nullptr;
+  }
+
  protected:
   HdfsParquetScanner* parent_;
   const SchemaNode& node_;
   const SlotDescriptor* const slot_desc_;
 
-  /// The slot descriptor for the position field of the tuple, if there is one. NULL if
-  /// there's not. Only one column reader for a given tuple desc will have this set.
-  const SlotDescriptor* pos_slot_desc_;
+  /// The slot descriptors for the collection item position and file position fields of
+  /// the tuple, if there is one. NULL if there's not. If one is set, then the other must
+  /// be NULL. Only one column reader for a given tuple desc will have this set.
+  const SlotDescriptor* pos_slot_desc_ = nullptr;
+  const SlotDescriptor* file_pos_slot_desc_ = nullptr;
 
   /// The next value to write into the position slot, if there is one. 64-bit int because
   /// the pos slot is always a BIGINT Set to ParquetLevel::INVALID_POS when this column
@@ -240,7 +258,8 @@ class ParquetColumnReader {
     : parent_(parent),
       node_(node),
       slot_desc_(slot_desc),
-      pos_slot_desc_(NULL),
+      pos_slot_desc_(nullptr),
+      file_pos_slot_desc_(nullptr),
       pos_current_value_(ParquetLevel::INVALID_POS),
       rep_level_(ParquetLevel::INVALID_LEVEL),
       max_rep_level_(node_.max_rep_level),
@@ -298,7 +317,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// set_io_reservation() must be called to assign reservation to this
   /// column, followed by StartScan().
   Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& col_chunk,
-    int row_group_idx);
+    int row_group_idx, int64_t row_group_first_row);
 
   /// Starts the column scan range. The reader must be Reset() and have a
   /// reservation assigned via set_io_reservation(). This must be called
@@ -395,6 +414,11 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// Metadata for the column for the current row group.
   const parquet::ColumnMetaData* metadata_ = nullptr;
 
+  /// Index of the current top-level row. It is updated together with the rep/def levels.
+  /// When updated, and its value is N, it means that we already processed the Nth row
+  /// completely, hence the initial value is '-1', because '0' would mean that we already
+  /// processed the first (zeroeth) row.
+  int64_t current_row_ = -1;
 
   /////////////////////////////////////////
   /// BEGIN: Members used for page filtering
@@ -419,11 +443,6 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// rows and increment this field.
   int current_row_range_ = 0;
 
-  /// Index of the current top-level row. It is updated together with the rep/def levels.
-  /// When updated, and its value is N, it means that we already processed the Nth row
-  /// completely.
-  int64_t current_row_ = -1;
-
   /// This flag is needed for the proper tracking of the last processed row.
   /// The batched and non-batched interfaces behave differently. E.g. when using the
   /// batched interface you don't need to invoke NextLevels() in advance, while you need
@@ -649,7 +668,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
 };
 
 // Inline to allow inlining into collection and scalar column reader.
-inline void ParquetColumnReader::ReadPositionNonBatched(int64_t* pos) {
+inline void ParquetColumnReader::ReadItemPositionNonBatched(int64_t* pos) {
   // NextLevels() should have already been called
   DCHECK_GE(rep_level_, 0);
   DCHECK_GE(def_level_, 0);
@@ -659,6 +678,16 @@ inline void ParquetColumnReader::ReadPositionNonBatched(int64_t* pos) {
   *pos = pos_current_value_++;
 }
 
+// Inline to allow inlining into collection and scalar column reader.
+inline void ParquetColumnReader::ReadFilePositionNonBatched(int64_t* file_pos) {
+  // NextLevels() should have already been called
+  DCHECK_GE(rep_level_, 0);
+  DCHECK_GE(def_level_, 0);
+  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+      "Caller should have called NextLevels() until we are ready to read a value";
+  *file_pos = LastProcessedRow() + 1;
+}
+
 // Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling
 // path doesn't falsely report that the file is corrupted.
 // Inlined to avoid overhead in release builds.
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index dfe806f9c..84acf257b 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -76,7 +76,8 @@ enum THdfsFileFormat {
 
 enum TVirtualColumnType {
   NONE,
-  INPUT_FILE_NAME
+  INPUT_FILE_NAME,
+  FILE_POSITION
 }
 
 // TODO: Since compression is also enabled for Kudu columns, we should
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 53e15d38e..b8ff63c88 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -468,33 +468,35 @@ struct TPartialTableInfo {
 
   3: optional list<hive_metastore.ColumnStatisticsObj> column_stats
 
+  4: optional list<CatalogObjects.TColumn> virtual_columns
+
   // Set if this table needs storage access during metadata load.
   // Time used for storage loading in nanoseconds.
-  4: optional i64 storage_metadata_load_time_ns
+  5: optional i64 storage_metadata_load_time_ns
 
   // Each TNetworkAddress is a datanode which contains blocks of a file in the table.
   // Used so that each THdfsFileBlock can just reference an index in this list rather
   // than duplicate the list of network address, which helps reduce memory usage.
   // Only used when partition files are fetched.
-  7: optional list<Types.TNetworkAddress> network_addresses
+  8: optional list<Types.TNetworkAddress> network_addresses
 
   // SqlConstraints for the table, small enough that we can
   // return them wholesale.
-  8: optional SqlConstraints.TSqlConstraints sql_constraints
+  9: optional SqlConstraints.TSqlConstraints sql_constraints
 
   // Valid write id list of ACID table.
-  9: optional CatalogObjects.TValidWriteIdList valid_write_ids;
+  10: optional CatalogObjects.TValidWriteIdList valid_write_ids;
 
   // Set if this table is marked as cached by hdfs caching. Does not necessarily mean the
   // data is cached or that all/any partitions are cached. Only used in analyzing DDLs.
-  10: optional bool is_marked_cached
+  11: optional bool is_marked_cached
 
   // The prefixes of locations of partitions in this table. See THdfsPartitionLocation for
   // the description of how a prefix is computed.
-  11: optional list<string> partition_prefixes
+  12: optional list<string> partition_prefixes
 
   // Iceberg table information
-  12: optional CatalogObjects.TIcebergTable iceberg_table
+  13: optional CatalogObjects.TIcebergTable iceberg_table
 }
 
 // Table types in the user's perspective. Though we treat materialized view as table
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index e880ba935..9ca396294 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -631,6 +631,7 @@ public class HdfsTable extends Table implements FeFsTable {
 
   private void addVirtualColumns() {
     addVirtualColumn(VirtualColumn.INPUT_FILE_NAME);
+    addVirtualColumn(VirtualColumn.FILE_POSITION);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index b5da61a19..2cb36930a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -416,6 +416,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
 
   private void addVirtualColumns() {
     addVirtualColumn(VirtualColumn.INPUT_FILE_NAME);
+    addVirtualColumn(VirtualColumn.FILE_POSITION);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 1c43b7add..3260e31e8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -739,6 +739,10 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
       // ensure that serialization of the GetPartialCatalogObjectResponse object
       // is done while we continue to hold the table lock.
       resp.table_info.setHms_table(getMetaStoreTable().deepCopy());
+      resp.table_info.setVirtual_columns(new ArrayList<>());
+      for (VirtualColumn vCol : getVirtualColumns()) {
+        resp.table_info.addToVirtual_columns(vCol.toThrift());
+      }
     }
     if (selector.want_stats_for_column_names != null ||
         selector.want_stats_for_all_columns) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java b/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java
index 7a3a8314a..4b9a138fe 100644
--- a/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java
+++ b/fe/src/main/java/org/apache/impala/catalog/VirtualColumn.java
@@ -35,11 +35,16 @@ public class VirtualColumn extends Column {
 
   public static VirtualColumn INPUT_FILE_NAME = new VirtualColumn("INPUT__FILE__NAME",
       Type.STRING, TVirtualColumnType.INPUT_FILE_NAME);
+  public static VirtualColumn FILE_POSITION = new VirtualColumn("FILE__POSITION",
+      Type.BIGINT, TVirtualColumnType.FILE_POSITION);
 
   public static VirtualColumn getVirtualColumn(TVirtualColumnType virtColType) {
     if (virtColType == TVirtualColumnType.INPUT_FILE_NAME) {
       return INPUT_FILE_NAME;
     }
+    if (virtColType == TVirtualColumnType.FILE_POSITION) {
+      return FILE_POSITION;
+    }
     return null;
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index f43686b81..fee9311a0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -63,6 +63,7 @@ import org.apache.impala.catalog.ImpaladCatalog.ObjectUpdateSequencer;
 import org.apache.impala.catalog.Principal;
 import org.apache.impala.catalog.PrincipalPrivilege;
 import org.apache.impala.catalog.SqlConstraints;
+import org.apache.impala.catalog.VirtualColumn;
 import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
@@ -76,6 +77,7 @@ import org.apache.impala.thrift.TBriefTableMeta;
 import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TDbInfoSelector;
 import org.apache.impala.thrift.TErrorCode;
@@ -737,7 +739,7 @@ public class CatalogdMetaProvider implements MetaProvider {
                 dbName, tableName, resp.table_info.hms_table, resp.object_version_number,
                 new SqlConstraints(primaryKeys, foreignKeys),
                 resp.table_info.valid_write_ids, resp.table_info.is_marked_cached,
-                resp.table_info.partition_prefixes);
+                resp.table_info.partition_prefixes, resp.table_info.virtual_columns);
            }
       });
     // The table list is populated based on tables in a given Db in catalogd. If a table
@@ -1710,6 +1712,11 @@ public class CatalogdMetaProvider implements MetaProvider {
      */
     private final Table msTable_;
 
+    /**
+     * List of virtual columns of this table.
+     */
+    List<VirtualColumn> virtualColumns_ = new ArrayList<>();
+
     /**
      * The version of the table when we first loaded it. Subsequent requests about
      * the table are verified against this version.
@@ -1732,7 +1739,7 @@ public class CatalogdMetaProvider implements MetaProvider {
     public TableMetaRefImpl(String dbName, String tableName,
         Table msTable, long catalogVersion, SqlConstraints sqlConstraints,
         TValidWriteIdList validWriteIds, boolean isMarkedCached,
-        List<String> locationPrefixes) {
+        List<String> locationPrefixes, List<TColumn> tvirtCols) {
       this.dbName_ = dbName;
       this.tableName_ = tableName;
       this.msTable_ = msTable;
@@ -1744,6 +1751,9 @@ public class CatalogdMetaProvider implements MetaProvider {
       this.partitionLocationCompressor_ = (locationPrefixes == null) ? null :
           new HdfsPartitionLocationCompressor(
               msTable.getPartitionKeysSize(), locationPrefixes);
+      for (TColumn tvCol : tvirtCols) {
+        virtualColumns_.add(VirtualColumn.fromThrift(tvCol));
+      }
     }
 
     @Override
@@ -1771,6 +1781,11 @@ public class CatalogdMetaProvider implements MetaProvider {
     public boolean isTransactional() {
       return AcidUtils.isTransactionalTable(msTable_.getParameters());
     }
+
+    @Override
+    public List<VirtualColumn> getVirtualColumns() {
+      return virtualColumns_;
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index f2479dced..20ce56ada 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -46,6 +46,7 @@ import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.MetaStoreClientPool;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.catalog.SqlConstraints;
+import org.apache.impala.catalog.VirtualColumn;
 import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
 import org.apache.impala.common.Pair;
 import org.apache.impala.compat.MetastoreShim;
@@ -511,6 +512,12 @@ class DirectMetaProvider implements MetaProvider {
     public boolean isTransactional() {
       return AcidUtils.isTransactionalTable(msTable_.getParameters());
     }
+
+    @Override
+    public List<VirtualColumn> getVirtualColumns() {
+      throw new UnsupportedOperationException("Virtual columns are not supported with " +
+          "DirectMetaProvider implementation");
+    }
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 9a90b32fb..eca95765d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -183,11 +183,7 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
 
     avroSchema_ = explicitAvroSchema;
     isMarkedCached_ = (ref != null && ref.isMarkedCached());
-    addVirtualColumns();
-  }
-
-  private void addVirtualColumns() {
-    addVirtualColumn(VirtualColumn.INPUT_FILE_NAME);
+    if (ref != null) addVirtualColumns(ref.getVirtualColumns());
   }
 
   private static String loadAvroSchema(Table msTbl) throws AnalysisException {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index c128f4251..84f1bf13a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -127,11 +127,7 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
     icebergParquetPlainPageSize_ = Utils.getIcebergParquetPlainPageSize(msTable);
     icebergParquetDictPageSize_ = Utils.getIcebergParquetDictPageSize(msTable);
     partitionStats_ = tableInfo.getIceberg_table().getPartition_stats();
-    addVirtualColumns();
-  }
-
-  private void addVirtualColumns() {
-    addVirtualColumn(VirtualColumn.INPUT_FILE_NAME);
+    addVirtualColumns(ref.getVirtualColumns());
   }
 
   static void validateColumns(List<Column> impalaCols, List<FieldSchema> hmsCols) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index 9be997b36..3b131217a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -181,6 +181,10 @@ abstract class LocalTable implements FeTable {
     this.tableStats_ = null;
   }
 
+  protected void addVirtualColumns(List<VirtualColumn> virtualColumns) {
+    for (VirtualColumn virtCol : virtualColumns) addVirtualColumn(virtCol);
+  }
+
   protected void addVirtualColumn(VirtualColumn col) {
     virtualCols_.add(col);
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 3e3840df6..444ff0556 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -34,6 +34,7 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.HdfsPartitionLocationCompressor;
 import org.apache.impala.catalog.HdfsStorageDescriptor;
 import org.apache.impala.catalog.SqlConstraints;
+import org.apache.impala.catalog.VirtualColumn;
 import org.apache.impala.catalog.local.LocalIcebergTable.TableParams;
 import org.apache.impala.common.Pair;
 import org.apache.impala.thrift.TBriefTableMeta;
@@ -148,6 +149,7 @@ public interface MetaProvider {
     List<String> getPartitionPrefixes();
     boolean isPartitioned();
     boolean isTransactional();
+    List<VirtualColumn> getVirtualColumns();
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 295925e97..68994ffad 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -611,6 +611,7 @@ public class HdfsScanNode extends ScanNode {
     // LiteralExpr, but can also be an expr like "1 + 2".
     if (!constExpr.isConstant()) return;
     if (Expr.IS_NULL_VALUE.apply(constExpr)) return;
+    if (slotDesc.isVirtualColumn()) return;
     if (isUnsupportedStatsType(slotDesc.getType())) return;
     if (isUnsupportedStatsType(constExpr.getType())) return;
 
@@ -1010,6 +1011,9 @@ public class HdfsScanNode extends ScanNode {
     // to scalar values nested in collection types, not enclosing collection types.
     if (firstSlotDesc.getType().isCollectionType()) return;
 
+    // Dictionary filtering is not applicable on virtual columns.
+    if (firstSlotDesc.isVirtualColumn()) return;
+
     // If any of the slot descriptors affected by 'conjunct' happens to be a scalar member
     // of a struct, where the struct is also given in the select list then skip dictionary
     // filtering as the slot/tuple IDs in the conjunct would mismatch with the ones in the
diff --git a/testdata/LineItemMultiBlock/README.dox b/testdata/LineItemMultiBlock/README.dox
index 1d6db468f..cc157941e 100755
--- a/testdata/LineItemMultiBlock/README.dox
+++ b/testdata/LineItemMultiBlock/README.dox
@@ -33,6 +33,12 @@ blocks.
 'lineitem_multiblock_one_row_group' was created similarly but with a much higher
 'parquet.block.size' so that everything fit in one row group.
 
+'lineitem_multiblock_variable_num_rows' was created similarly, but with
+'parquet.block.size'=80000 so we have a bit fewer row groups, but the real point is that
+the number of rows in the row groups are not the same. Also, the source table was
+lineitem_multiblock so the resulting table will have the same rows in the same order in
+the data file.
+
 ----
 
 The orc files are created by the following hive queries:
diff --git a/testdata/LineItemMultiBlock/lineitem_multiblock_variable_num_rows.parquet b/testdata/LineItemMultiBlock/lineitem_multiblock_variable_num_rows.parquet
new file mode 100644
index 000000000..a9d5dbb6f
Binary files /dev/null and b/testdata/LineItemMultiBlock/lineitem_multiblock_variable_num_rows.parquet differ
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index 12af04b7d..78090596d 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -513,7 +513,8 @@ function custom-post-load-steps {
 
   hadoop fs -mkdir -p ${FILESYSTEM_PREFIX}/test-warehouse/lineitem_multiblock_parquet \
     ${FILESYSTEM_PREFIX}/test-warehouse/lineitem_sixblocks_parquet \
-    ${FILESYSTEM_PREFIX}/test-warehouse/lineitem_multiblock_one_row_group_parquet
+    ${FILESYSTEM_PREFIX}/test-warehouse/lineitem_multiblock_one_row_group_parquet \
+    ${FILESYSTEM_PREFIX}/test-warehouse/lineitem_multiblock_variable_num_rows_parquet
 
   #IMPALA-1881: data file produced by hive with multiple blocks.
   hadoop fs -Ddfs.block.size=1048576 -put -f \
@@ -530,6 +531,11 @@ function custom-post-load-steps {
     ${IMPALA_HOME}/testdata/LineItemMultiBlock/lineitem_one_row_group.parquet \
     ${FILESYSTEM_PREFIX}/test-warehouse/lineitem_multiblock_one_row_group_parquet
 
+  # IMPALA-11350: Add tests for row groups with variable num rows.
+  hadoop fs -Ddfs.block.size=1048576 -put -f \
+    ${IMPALA_HOME}/testdata/LineItemMultiBlock/lineitem_multiblock_variable_num_rows.parquet \
+    ${FILESYSTEM_PREFIX}/test-warehouse/lineitem_multiblock_variable_num_rows_parquet
+
   # IMPALA-3307: Upload test time-zone database
   hadoop fs -Ddfs.block.size=1048576 -put -f ${IMPALA_HOME}/testdata/tzdb \
     ${FILESYSTEM_PREFIX}/test-warehouse/
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 715a41d0a..919a6e58e 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -2741,6 +2741,29 @@ L_SHIPMODE STRING
 L_COMMENT STRING
 ====
 ---- DATASET
+-- IMPALA-11350: Implementing virtual column FILE__POSITION
+functional
+---- BASE_TABLE_NAME
+lineitem_multiblock_variable_num_rows
+---- COLUMNS
+L_ORDERKEY BIGINT
+L_PARTKEY BIGINT
+L_SUPPKEY BIGINT
+L_LINENUMBER INT
+L_QUANTITY DECIMAL(12,2)
+L_EXTENDEDPRICE DECIMAL(12,2)
+L_DISCOUNT DECIMAL(12,2)
+L_TAX DECIMAL(12,2)
+L_RETURNFLAG STRING
+L_LINESTATUS STRING
+L_SHIPDATE STRING
+L_COMMITDATE STRING
+L_RECEIPTDATE STRING
+L_SHIPINSTRUCT STRING
+L_SHIPMODE STRING
+L_COMMENT STRING
+====
+---- DATASET
 -- IMPALA-4933: tests nested collections stored in multiple row-groups.
 ---- BASE_TABLE_NAME
 customer_multiblock
diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv
index 54caaeb5e..68ebace5c 100644
--- a/testdata/datasets/functional/schema_constraints.csv
+++ b/testdata/datasets/functional/schema_constraints.csv
@@ -58,6 +58,7 @@ table_name:bad_column_metadata, constraint:restrict_to, table_format:parquet/non
 table_name:lineitem_multiblock, constraint:restrict_to, table_format:parquet/none/none
 table_name:lineitem_sixblocks, constraint:restrict_to, table_format:parquet/none/none
 table_name:lineitem_multiblock_one_row_group, constraint:restrict_to, table_format:parquet/none/none
+table_name:lineitem_multiblock_variable_num_rows, constraint:restrict_to, table_format:parquet/none/none
 table_name:customer_multiblock, constraint:restrict_to, table_format:parquet/none/none
 table_name:hudi_partitioned, constraint:restrict_to, table_format:parquet/none/none
 table_name:hudi_non_partitioned, constraint:restrict_to, table_format:parquet/none/none
diff --git a/testdata/workloads/functional-query/queries/QueryTest/mixing-virtual-columns.test b/testdata/workloads/functional-query/queries/QueryTest/mixing-virtual-columns.test
new file mode 100644
index 000000000..e670c32b8
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/mixing-virtual-columns.test
@@ -0,0 +1,112 @@
+====
+---- QUERY
+select input__file__name, file__position, id from alltypes_tiny_pages
+where file__position = 7000;
+---- RESULTS
+'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_tiny_pages/alltypes_tiny_pages.parquet',7000,6285
+---- TYPES
+STRING, BIGINT, INT
+====
+---- QUERY
+select file__position, input__file__name, id from alltypes_tiny_pages
+where file__position = 7000;
+---- RESULTS
+7000,'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_tiny_pages/alltypes_tiny_pages.parquet',6285
+---- TYPES
+BIGINT, STRING, INT
+====
+---- QUERY
+select input__file__name, file__position, id from alltypes_tiny_pages
+where id = 6285;
+---- RESULTS
+'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_tiny_pages/alltypes_tiny_pages.parquet',7000,6285
+---- TYPES
+STRING, BIGINT, INT
+====
+---- QUERY
+select input__file__name, file__position, id from alltypes_tiny_pages
+where file__position = 1000;
+---- RESULTS
+'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_tiny_pages/alltypes_tiny_pages.parquet',1000,3623
+---- TYPES
+STRING, BIGINT, INT
+====
+---- QUERY
+select input__file__name, file__position from alltypes_tiny_pages
+where id = 2950;
+---- RESULTS
+'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_tiny_pages/alltypes_tiny_pages.parquet',587
+---- TYPES
+STRING, BIGINT
+====
+---- QUERY
+select input__file__name, max(file__position) from functional_parquet.lineitem_multiblock
+group by input__file__name;
+---- RESULTS
+'$NAMENODE/test-warehouse/lineitem_multiblock_parquet/000000_0',19999
+---- TYPES
+STRING, BIGINT
+====
+---- QUERY
+select input__file__name, file__position, l_orderkey from functional_parquet.lineitem_multiblock
+where file__position = 7000;
+---- RESULTS
+'$NAMENODE/test-warehouse/lineitem_multiblock_parquet/000000_0',7000,12996
+---- TYPES
+STRING, BIGINT, BIGINT
+====
+---- QUERY
+select input__file__name, file__position, l_orderkey from functional_parquet.lineitem_multiblock
+where l_orderkey = 12996;
+---- RESULTS
+'$NAMENODE/test-warehouse/lineitem_multiblock_parquet/000000_0',6998,12996
+'$NAMENODE/test-warehouse/lineitem_multiblock_parquet/000000_0',6999,12996
+'$NAMENODE/test-warehouse/lineitem_multiblock_parquet/000000_0',7000,12996
+'$NAMENODE/test-warehouse/lineitem_multiblock_parquet/000000_0',7001,12996
+---- TYPES
+STRING, BIGINT, BIGINT
+====
+---- QUERY
+select input__file__name, file__position, * from functional_parquet.lineitem_multiblock
+where file__position = 19993;
+---- RESULTS
+'$NAMENODE/test-warehouse/lineitem_multiblock_parquet/000000_0',19993,2,106170,1191,1,38.00,44694.46,0.00,0.05,'N','O','1997-01-28','1997-01-14','1997-02-02','TAKE BACK RETURN','RAIL','ven requests. deposits breach a'
+---- TYPES
+STRING, BIGINT, BIGINT, BIGINT, BIGINT, INT, DECIMAL, DECIMAL, DECIMAL, DECIMAL, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+select a.file__position, a.input__file__name, a.id
+from functional_parquet.alltypestiny a, functional_parquet.alltypestiny b
+where a.file__position = 0 and a.id > 3
+order by id;
+---- RESULTS
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=3/.*.parq',4
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=3/.*.parq',4
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=3/.*.parq',4
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=3/.*.parq',4
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=3/.*.parq',4
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=3/.*.parq',4
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=3/.*.parq',4
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=3/.*.parq',4
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=4/.*.parq',6
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=4/.*.parq',6
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=4/.*.parq',6
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=4/.*.parq',6
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=4/.*.parq',6
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=4/.*.parq',6
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=4/.*.parq',6
+0,regex:'$NAMENODE/test-warehouse/alltypestiny_parquet/year=2009/month=4/.*.parq',6
+---- TYPES
+BIGINT, STRING, INT
+====
+---- QUERY
+select input__file__name, input__file__name, file__position, file__position, l_orderkey from functional_parquet.lineitem_multiblock_variable_num_rows
+where l_orderkey = 12996;
+---- RESULTS
+'$NAMENODE/test-warehouse/lineitem_multiblock_variable_num_rows_parquet/lineitem_multiblock_variable_num_rows.parquet','$NAMENODE/test-warehouse/lineitem_multiblock_variable_num_rows_parquet/lineitem_multiblock_variable_num_rows.parquet',6998,6998,12996
+'$NAMENODE/test-warehouse/lineitem_multiblock_variable_num_rows_parquet/lineitem_multiblock_variable_num_rows.parquet','$NAMENODE/test-warehouse/lineitem_multiblock_variable_num_rows_parquet/lineitem_multiblock_variable_num_rows.parquet',6999,6999,12996
+'$NAMENODE/test-warehouse/lineitem_multiblock_variable_num_rows_parquet/lineitem_multiblock_variable_num_rows.parquet','$NAMENODE/test-warehouse/lineitem_multiblock_variable_num_rows_parquet/lineitem_multiblock_variable_num_rows.parquet',7000,7000,12996
+'$NAMENODE/test-warehouse/lineitem_multiblock_variable_num_rows_parquet/lineitem_multiblock_variable_num_rows.parquet','$NAMENODE/test-warehouse/lineitem_multiblock_variable_num_rows_parquet/lineitem_multiblock_variable_num_rows.parquet',7001,7001,12996
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-generic.test b/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-generic.test
new file mode 100644
index 000000000..e0b13c8c7
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-generic.test
@@ -0,0 +1,157 @@
+====
+---- QUERY
+select file__position, * from functional_parquet.alltypestiny order by id;
+---- RESULTS
+0,0,true,0,0,0,0,0,0,'01/01/09','0',2009-01-01 00:00:00,2009,1
+1,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+0,2,true,0,0,0,0,0,0,'02/01/09','0',2009-02-01 00:00:00,2009,2
+1,3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
+0,4,true,0,0,0,0,0,0,'03/01/09','0',2009-03-01 00:00:00,2009,3
+1,5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
+0,6,true,0,0,0,0,0,0,'04/01/09','0',2009-04-01 00:00:00,2009,4
+1,7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
+---- TYPES
+BIGINT, INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
+====
+---- QUERY
+select file__position, * from functional_parquet.alltypestiny
+where file__position = 1
+order by id;
+---- RESULTS
+1,1,false,1,1,1,10,1.100000023841858,10.1,'01/01/09','1',2009-01-01 00:01:00,2009,1
+1,3,false,1,1,1,10,1.100000023841858,10.1,'02/01/09','1',2009-02-01 00:01:00,2009,2
+1,5,false,1,1,1,10,1.100000023841858,10.1,'03/01/09','1',2009-03-01 00:01:00,2009,3
+1,7,false,1,1,1,10,1.100000023841858,10.1,'04/01/09','1',2009-04-01 00:01:00,2009,4
+---- TYPES
+BIGINT, INT, BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, STRING, STRING, TIMESTAMP, INT, INT
+====
+---- QUERY
+select file__position, count(*) from functional_parquet.alltypestiny
+group by file__position;
+---- RESULTS
+1,4
+0,4
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+select file__position, id from functional_parquet.alltypestiny
+order by id;
+---- RESULTS
+0,0
+1,1
+0,2
+1,3
+0,4
+1,5
+0,6
+1,7
+---- TYPES
+BIGINT, INT
+====
+---- QUERY
+select file__position from functional_parquet.alltypestiny;
+---- RESULTS
+0
+1
+0
+1
+0
+1
+0
+1
+---- TYPES
+BIGINT
+====
+---- QUERY
+select max(file__position) from functional_parquet.alltypestiny;
+---- RESULTS
+1
+---- TYPES
+BIGINT
+====
+---- QUERY
+select file__position, id from functional_parquet.complextypestbl
+order by id;
+---- RESULTS
+0,1
+1,2
+2,3
+3,4
+4,5
+5,6
+6,7
+0,8
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+select file__position, id, int_array from functional_parquet.complextypestbl;
+---- RESULTS
+0,1,'[1,2,3]'
+1,2,'[NULL,1,2,NULL,3,NULL]'
+2,3,'[]'
+3,4,'NULL'
+4,5,'NULL'
+5,6,'NULL'
+6,7,'NULL'
+0,8,'[-1]'
+---- TYPES
+BIGINT, BIGINT, STRING
+====
+---- QUERY
+select file__position, id, item from functional_parquet.complextypestbl c, c.int_array
+order by id;
+---- RESULTS
+0,1,1
+0,1,2
+0,1,3
+1,2,NULL
+1,2,1
+1,2,2
+1,2,NULL
+1,2,3
+1,2,NULL
+0,8,-1
+---- TYPES
+BIGINT, BIGINT, INT
+====
+---- QUERY
+select file__position, id, item from functional_parquet.complextypestbl c, c.int_array_array;
+---- RESULTS
+0,1,'[1,2]'
+0,1,'[3,4]'
+1,2,'[NULL,1,2,NULL]'
+1,2,'[3,NULL,4]'
+1,2,'[]'
+1,2,'NULL'
+2,3,'NULL'
+6,7,'NULL'
+6,7,'[5,6]'
+0,8,'[-1,-2]'
+0,8,'[]'
+---- TYPES
+BIGINT, BIGINT, STRING
+====
+---- QUERY
+select file__position, id, i.item from functional_parquet.complextypestbl c, c.int_array_array a, a.item i
+order by id;
+---- RESULTS
+0,1,1
+0,1,2
+0,1,3
+0,1,4
+1,2,NULL
+1,2,1
+1,2,2
+1,2,NULL
+1,2,3
+1,2,NULL
+1,2,4
+6,7,5
+6,7,6
+0,8,-1
+0,8,-2
+---- TYPES
+BIGINT, BIGINT, INT
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-parquet.test b/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-parquet.test
new file mode 100644
index 000000000..7ff2f9d19
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/virtual-column-file-position-parquet.test
@@ -0,0 +1,223 @@
+====
+---- QUERY
+select file__position, id from alltypes_tiny_pages
+where file__position = 7000;
+---- RESULTS
+7000,6285
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+select file__position, id from alltypes_tiny_pages
+where id = 6285;
+---- RESULTS
+7000,6285
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+select file__position, id from alltypes_tiny_pages
+where file__position = 1000;
+---- RESULTS
+1000,3623
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+select file__position, id from alltypes_tiny_pages
+where id = 2950;
+---- RESULTS
+587,2950
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+# The data files of the tpch_parquet.lineitem table are not deterministic,
+# so we cannot check the results here. We only check that the queries succeed.
+select file__position, l_orderkey from tpch_parquet.lineitem
+where file__position = 0
+order by l_orderkey;
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+select file__position, l_orderkey from tpch_parquet.lineitem
+where file__position = 700000
+order by l_orderkey;
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+select file__position, l_orderkey from tpch_parquet.lineitem
+where file__position = 2142542
+order by l_orderkey;
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+select file__position, l_orderkey from tpch_parquet.lineitem
+where l_orderkey in (3555011, 4625378, 5692993)
+order by l_orderkey;
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+select max(file__position) from functional_parquet.lineitem_multiblock;
+---- RESULTS
+19999
+---- TYPES
+BIGINT
+====
+---- QUERY
+select file__position, l_orderkey from functional_parquet.lineitem_multiblock
+where file__position = 7000;
+---- RESULTS
+7000,12996
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+select file__position, l_orderkey from functional_parquet.lineitem_multiblock
+where l_orderkey = 12996;
+---- RESULTS
+6998,12996
+6999,12996
+7000,12996
+7001,12996
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+select file__position, * from functional_parquet.lineitem_multiblock
+where file__position = 19993;
+---- RESULTS
+19993,2,106170,1191,1,38.00,44694.46,0.00,0.05,'N','O','1997-01-28','1997-01-14','1997-02-02','TAKE BACK RETURN','RAIL','ven requests. deposits breach a'
+---- TYPES
+BIGINT, BIGINT, BIGINT, BIGINT, INT, DECIMAL, DECIMAL, DECIMAL, DECIMAL, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+select file__position, * from functional_parquet.lineitem_multiblock
+where l_orderkey = 2;
+---- RESULTS
+19993,2,106170,1191,1,38.00,44694.46,0.00,0.05,'N','O','1997-01-28','1997-01-14','1997-02-02','TAKE BACK RETURN','RAIL','ven requests. deposits breach a'
+---- TYPES
+BIGINT, BIGINT, BIGINT, BIGINT, INT, DECIMAL, DECIMAL, DECIMAL, DECIMAL, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+select max(file__position) from functional_parquet.lineitem_multiblock_variable_num_rows;
+---- RESULTS
+19999
+---- TYPES
+BIGINT
+====
+---- QUERY
+select file__position, l_orderkey from functional_parquet.lineitem_multiblock_variable_num_rows
+where file__position = 7000;
+---- RESULTS
+7000,12996
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+select file__position, l_orderkey from functional_parquet.lineitem_multiblock_variable_num_rows
+where l_orderkey = 12996;
+---- RESULTS
+6998,12996
+6999,12996
+7000,12996
+7001,12996
+---- TYPES
+BIGINT, BIGINT
+====
+---- QUERY
+select file__position, * from functional_parquet.lineitem_multiblock_variable_num_rows
+where file__position = 19993;
+---- RESULTS
+19993,2,106170,1191,1,38.00,44694.46,0.00,0.05,'N','O','1997-01-28','1997-01-14','1997-02-02','TAKE BACK RETURN','RAIL','ven requests. deposits breach a'
+---- TYPES
+BIGINT, BIGINT, BIGINT, BIGINT, INT, DECIMAL, DECIMAL, DECIMAL, DECIMAL, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+select file__position, * from functional_parquet.lineitem_multiblock_variable_num_rows
+where l_orderkey = 2;
+---- RESULTS
+19993,2,106170,1191,1,38.00,44694.46,0.00,0.05,'N','O','1997-01-28','1997-01-14','1997-02-02','TAKE BACK RETURN','RAIL','ven requests. deposits breach a'
+---- TYPES
+BIGINT, BIGINT, BIGINT, BIGINT, INT, DECIMAL, DECIMAL, DECIMAL, DECIMAL, STRING, STRING, STRING, STRING, STRING, STRING, STRING, STRING
+====
+---- QUERY
+select max(file__position) from tpcds_parquet.store_sales
+---- TYPES
+BIGINT
+====
+---- QUERY
+select file__position, id, float_col from functional_parquet.alltypes
+where float_col < 1 and year=2009 and month=6;
+---- TYPES
+BIGINT, INT, FLOAT
+====
+---- QUERY
+select a.file__position, b.file__position, id, l_orderkey
+from alltypes_tiny_pages a, functional_parquet.lineitem_multiblock_variable_num_rows b
+where a.file__position = b.file__position and
+      a.file__position > 6710 and a.file__position < 6720;
+---- RESULTS
+6713,6713,6021,13253
+6715,6715,6069,13253
+6718,6718,6066,13252
+6711,6711,6023,13253
+6712,6712,6022,13253
+6714,6714,6020,13253
+6716,6716,6068,13253
+6717,6717,6067,13253
+6719,6719,6065,13252
+---- TYPES
+BIGINT, BIGINT, INT, BIGINT
+====
+---- QUERY
+select a.file__position, b.file__position, id, l_orderkey
+from alltypes_tiny_pages a, functional_parquet.lineitem_multiblock_variable_num_rows b
+where id = l_orderkey and
+      id > 6000 and id < 6030;
+---- RESULTS
+6765,13971,6019,6019
+6767,13974,6017,6017
+6767,13975,6017,6017
+6767,13976,6017,6017
+6767,13977,6017,6017
+6767,13978,6017,6017
+6713,13958,6021,6021
+6713,13959,6021,6021
+6713,13960,6021,6021
+6713,13961,6021,6021
+6713,13962,6021,6021
+6713,13963,6021,6021
+6766,13972,6018,6018
+6766,13973,6018,6018
+6768,13979,6016,6016
+6768,13980,6016,6016
+6768,13981,6016,6016
+6711,13944,6023,6023
+6711,13945,6023,6023
+6711,13946,6023,6023
+6711,13947,6023,6023
+6711,13948,6023,6023
+6711,13949,6023,6023
+6711,13950,6023,6023
+6712,13951,6022,6022
+6712,13952,6022,6022
+6712,13953,6022,6022
+6712,13954,6022,6022
+6712,13955,6022,6022
+6712,13956,6022,6022
+6712,13957,6022,6022
+6714,13964,6020,6020
+6714,13965,6020,6020
+6714,13966,6020,6020
+6714,13967,6020,6020
+6714,13968,6020,6020
+6714,13969,6020,6020
+6714,13970,6020,6020
+---- TYPES
+BIGINT, BIGINT, INT, BIGINT
+====
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 389467be8..bce5a9373 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -117,6 +117,24 @@ class TestScannersAllTableFormats(ImpalaTestSuite):
     else:
       self.run_test_case('QueryTest/string-escaping', vector)
 
+
+class TestScannersVirtualColumns(ImpalaTestSuite):
+  BATCH_SIZES = [0, 1, 16]
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestScannersVirtualColumns, cls).add_test_dimensions()
+    if cls.exploration_strategy() == 'core':
+      # The purpose of this test is to get some base coverage of all the file formats.
+      # Even in 'core', we'll test each format by using the pairwise strategy.
+      cls.ImpalaTestMatrix.add_dimension(cls.create_table_info_dimension('pairwise'))
+    cls.ImpalaTestMatrix.add_dimension(
+        ImpalaTestDimension('batch_size', *TestScannersAllTableFormats.BATCH_SIZES))
+
   def test_virtual_column_input_file_name(self, vector, unique_database):
     file_format = vector.get_value('table_format').file_format
     if file_format in ['hbase', 'kudu']:
@@ -129,6 +147,24 @@ class TestScannersAllTableFormats(ImpalaTestSuite):
       self.run_test_case('QueryTest/virtual-column-input-file-name-in-table', vector,
           use_db=unique_database)
 
+  def test_virtual_column_file_position_generic(self, vector):
+    # Generic tests about virtual column file position.
+    file_format = vector.get_value('table_format').file_format
+    # TODO: add support for other file formats, especially ORC
+    if file_format not in ['parquet']:
+      pytest.skip()
+    self.run_test_case('QueryTest/virtual-column-file-position-generic', vector)
+
+  def test_mixing_virtual_columns(self, vector, unique_database):
+    # Test queries with multiple virtual columns.
+    file_format = vector.get_value('table_format').file_format
+    # TODO: add support for other file formats, especially ORC
+    if file_format not in ['parquet']:
+      pytest.skip()
+    create_table_from_parquet(self.client, unique_database, 'alltypes_tiny_pages')
+    self.run_test_case('QueryTest/mixing-virtual-columns', vector, unique_database)
+
+
 # Test all the scanners with a simple limit clause. The limit clause triggers
 # cancellation in the scanner code paths.
 class TestScannersAllTableFormatsWithLimit(ImpalaTestSuite):
@@ -386,6 +422,16 @@ class TestParquet(ImpalaTestSuite):
   def test_parquet(self, vector):
     self.run_test_case('QueryTest/parquet', vector)
 
+  def test_virtual_column_file_position_parquet(self, vector, unique_database):
+    # Parquet-specific tests for virtual column FILE__POSITION
+    create_table_from_parquet(self.client, unique_database, 'alltypes_tiny_pages')
+    new_vector = deepcopy(vector)
+    for late_mat in [-1, 1, 17]:
+      new_vector.get_value('exec_option')['parquet_late_materialization_threshold'] = \
+          late_mat
+      self.run_test_case('QueryTest/virtual-column-file-position-parquet', new_vector,
+          unique_database)
+
   def test_corrupt_files(self, vector):
     new_vector = deepcopy(vector)
     del new_vector.get_value('exec_option')['num_nodes']  # .test file sets num_nodes


[impala] 01/02: IMPALA-11480: Retain all YARN container logs in a Jenkins run

Posted by jo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit adadec7fd9f93acb1347947f88f31f37c6aa2d59
Author: Fang-Yu Rao <fa...@cloudera.com>
AuthorDate: Sun Aug 7 20:09:19 2022 -0700

    IMPALA-11480: Retain all YARN container logs in a Jenkins run
    
    This patch explicitly sets 'yarn.nodemanager.log.retain-seconds' to a
    large enough value so that all the YARN container log files created in a
    Jenkins run will be archived.
    
    Testing:
     - Verified in a Jenkins run that all the YARN container logs are indeed
       archived.
    
    Change-Id: I10953f5de98d4312dfc563fcace17908c86be2d3
    Reviewed-on: http://gerrit.cloudera.org:8080/18821
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.py    | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.py b/testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.py
index 262caffa2..eb6359365 100644
--- a/testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.py
+++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/yarn-site.xml.py
@@ -54,6 +54,10 @@ CONFIG = {
   'yarn.nodemanager.local-dirs': '${NODE_DIR}/var/lib/hadoop-yarn/cache/${USER}/nm-local-dir',
   'yarn.nodemanager.log-dirs': '${NODE_DIR}/var/log/hadoop-yarn/containers',
 
+  # Set it to a large enough value so that the logs of all the containers ever created in
+  # a Jenkins run will be retained.
+  'yarn.nodemanager.log.retain-seconds': 86400,
+
   # Enable the MR shuffle service, which is also used by Tez.
   'yarn.nodemanager.aux-services': 'mapreduce_shuffle',
   'yarn.nodemanager.aux-services.mapreduce_shuffle.class': 'org.apache.hadoop.mapred.ShuffleHandler',