You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/12/20 17:51:33 UTC

(impala) 01/02: IMPALA-12205: Add support to STRUCT type Iceberg Metadata table columns

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4417fbccc219b0166674b650a3a98d8af6d4001d
Author: Tamas Mate <tm...@apache.org>
AuthorDate: Wed Dec 6 14:24:16 2023 +0100

    IMPALA-12205: Add support to STRUCT type Iceberg Metadata table columns
    
    As the slots have already been created on the frontend this change
    focuses on populating them on the backend side. There are two major
    parts of this commit. Obtaining the right Accessors for the slot and
    recursively filling the tuples with data.
    
    The field ids are present in the struct slot's ColumnType field as a
    list of integers. This list can be indexed with the correct element of
    the SchemaPath to obtain the field id for a struct member and with that
    the Accessor.
    
    Once the Accessors are available the IcebergRowReader's MaterializeTuple
    method can be called recursively to write the primitive slots of a
    struct slot.
    
    Testing:
     - Added E2E tests
    
    Change-Id: I953ad7253b270f2855bfcaee4ad023d1c4469273
    Reviewed-on: http://gerrit.cloudera.org:8080/20759
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Gabor Kaszab <ga...@cloudera.com>
---
 .../iceberg-metadata/iceberg-metadata-scan-node.cc | 73 +++++++++++++---
 .../iceberg-metadata/iceberg-metadata-scan-node.h  | 26 ++++--
 be/src/exec/iceberg-metadata/iceberg-row-reader.cc | 37 +++++++--
 be/src/exec/iceberg-metadata/iceberg-row-reader.h  | 19 +++--
 .../org/apache/impala/analysis/FromClause.java     | 13 +++
 .../java/org/apache/impala/analysis/SlotRef.java   | 43 ++++++----
 .../catalog/iceberg/IcebergMetadataTable.java      | 20 ++++-
 .../apache/impala/util/IcebergMetadataScanner.java |  6 +-
 .../PlannerTest/iceberg-metadata-table-scan.test   | 36 --------
 .../queries/QueryTest/iceberg-metadata-tables.test | 97 ++++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |  4 +-
 11 files changed, 280 insertions(+), 94 deletions(-)

diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
index 1e1ba76a5..f7f5c9a6d 100644
--- a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
@@ -83,22 +83,73 @@ Status IcebergMetadataScanNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jmetadata_scanner, &jmetadata_scanner_));
   RETURN_ERROR_IF_EXC(env);
   RETURN_IF_ERROR(ScanMetadataTable());
-  // Create field accessors
+  RETURN_IF_ERROR(CreateFieldAccessors());
+  return Status::OK();
+}
+
+Status IcebergMetadataScanNode::CreateFieldAccessors() {
+  JNIEnv* env = JniUtil::GetJNIEnv();
+  if (env == nullptr) return Status("Failed to get/create JVM");
   for (SlotDescriptor* slot_desc: tuple_desc_->slots()) {
-    jobject accessor_for_field = env->CallObjectMethod(jmetadata_scanner_,
-        get_accessor_, slot_desc->col_pos());
-    RETURN_ERROR_IF_EXC(env);
-    jobject accessor_for_field_global_ref;
-    RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, accessor_for_field,
-        &accessor_for_field_global_ref));
-    jaccessors_[slot_desc->col_pos()] = accessor_for_field_global_ref;
+    if (slot_desc->type().IsStructType()) {
+      // Get the top level struct's field id from the ColumnDescriptor then recursively
+      // get the field ids for struct fields
+      int field_id = tuple_desc_->table_desc()->GetColumnDesc(slot_desc).field_id();
+      RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, slot_desc->id()));
+      RETURN_IF_ERROR(CreateFieldAccessors(env, slot_desc));
+    } else if (slot_desc->col_path().size() > 1) {
+      DCHECK(!slot_desc->type().IsComplexType());
+      // Slot that is child of a struct without tuple, can occur when a struct member is
+      // in the select list. ColumnType has a tree structure, and this loop finds the
+      // STRUCT node that stores the primitive type. Because, that struct node has the
+      // field id list of its childs.
+      int root_type_index = slot_desc->col_path()[0];
+      ColumnType current_type =
+          tuple_desc_->table_desc()->col_descs()[root_type_index].type();
+      for (int i = 1; i < slot_desc->col_path().size() - 1; ++i) {
+        current_type = current_type.children[slot_desc->col_path()[i]];
+      }
+      int field_id = current_type.field_ids[slot_desc->col_path().back()];
+      RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, slot_desc->id()));
+    } else {
+      // For primitives in the top level tuple, use the ColumnDescriptor
+      int field_id = tuple_desc_->table_desc()->GetColumnDesc(slot_desc).field_id();
+      RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, slot_desc->id()));
+    }
   }
   return Status::OK();
 }
 
+Status IcebergMetadataScanNode::CreateFieldAccessors(JNIEnv* env,
+    const SlotDescriptor* struct_slot_desc) {
+  if (!struct_slot_desc->type().IsStructType()) return Status::OK();
+  const std::vector<int>& struct_field_ids = struct_slot_desc->type().field_ids;
+  for (SlotDescriptor* child_slot_desc:
+      struct_slot_desc->children_tuple_descriptor()->slots()) {
+    int field_id = struct_field_ids[child_slot_desc->col_path().back()];
+    RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, child_slot_desc->id()));
+    if (child_slot_desc->type().IsStructType()) {
+      RETURN_IF_ERROR(CreateFieldAccessors(env, child_slot_desc));
+    }
+  }
+  return Status::OK();
+}
+
+Status IcebergMetadataScanNode::AddAccessorForFieldId(JNIEnv* env, int field_id,
+    SlotId slot_id) {
+  jobject accessor_for_field = env->CallObjectMethod(jmetadata_scanner_,
+      get_accessor_, field_id);
+  RETURN_ERROR_IF_EXC(env);
+  jobject accessor_for_field_global_ref;
+  RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, accessor_for_field,
+      &accessor_for_field_global_ref));
+  jaccessors_[slot_id] = accessor_for_field_global_ref;
+  return Status::OK();
+}
+
 Status IcebergMetadataScanNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(ScanNode::Open(state));
-  iceberg_row_reader_.reset(new IcebergRowReader(tuple_desc_, jaccessors_));
+  iceberg_row_reader_.reset(new IcebergRowReader(jaccessors_));
   return Status::OK();
 }
 
@@ -128,8 +179,8 @@ Status IcebergMetadataScanNode::GetNext(RuntimeState* state, RowBatch* row_batch
       return Status::OK();
     }
     // Translate a StructLikeRow from Iceberg to Tuple
-    RETURN_IF_ERROR(iceberg_row_reader_->MaterializeRow(env, struct_like_row, tuple,
-        row_batch->tuple_data_pool()));
+    RETURN_IF_ERROR(iceberg_row_reader_->MaterializeTuple(env, struct_like_row,
+        tuple_desc_, tuple, row_batch->tuple_data_pool()));
     env->DeleteLocalRef(struct_like_row);
     RETURN_ERROR_IF_EXC(env);
     COUNTER_ADD(rows_read_counter(), 1);
diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
index c994c7682..64f1d3aa5 100644
--- a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
@@ -100,12 +100,10 @@ class IcebergMetadataScanNode : public ScanNode {
 
   /// Accessor map for the scan result, pairs the slot ids with the java Accessor
   /// objects.
-  std::unordered_map<int, jobject> jaccessors_;
+  std::unordered_map<SlotId, jobject> jaccessors_;
 
-  /// Tuple id resolved in Prepare() to set 'tuple_desc_'.
-  TupleId tuple_id_;
-
-  /// Descriptor of tuples read from Iceberg metadata table.
+  // The TupleId and TupleDescriptor of the tuple that this scan node will populate.
+  const TupleId tuple_id_;
   const TupleDescriptor* tuple_desc_ = nullptr;
 
   /// Table and metadata table names.
@@ -121,6 +119,24 @@ class IcebergMetadataScanNode : public ScanNode {
 
   /// Gets the FeIceberg table from the Frontend.
   Status GetCatalogTable(jobject* jtable);
+
+  /// Populates the jaccessors_ map by creating the accessors for the columns in the JVM.
+  /// To create a field accessor for a column the Iceberg field id is needed. For
+  /// primitive type columns that are not a field of a struct, this can be found in the
+  /// ColumnDescriptor. However, ColumnDescriptors are not available for struct fields,
+  /// in this case the SlotDescriptor can be used.
+  Status CreateFieldAccessors();
+
+  /// Recursive part of the Accessor collection, when there is a struct in the tuple.
+  /// Collects the field ids of the struct members. The type_ field inside the struct slot
+  /// stores an ordered list of Iceberg Struct member field ids. This list can be indexed
+  /// with the last element of SchemaPath col_path to obtain the correct field id of the
+  /// struct member.
+  Status CreateFieldAccessors(JNIEnv* env, const SlotDescriptor* struct_slot_desc);
+
+  /// Helper method to simplify adding new accessors to the jaccessors_ map. It obtains
+  /// the Accessor through JNI and persists it into the jaccessors_ map.
+  Status AddAccessorForFieldId(JNIEnv* env, int field_id, SlotId slot_id);
 };
 
 }
diff --git a/be/src/exec/iceberg-metadata/iceberg-row-reader.cc b/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
index 219569ad5..d2f680222 100644
--- a/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
+++ b/be/src/exec/iceberg-metadata/iceberg-row-reader.cc
@@ -24,10 +24,8 @@
 
 namespace impala {
 
-IcebergRowReader::IcebergRowReader(
-    const TupleDescriptor* tuple_desc, const std::unordered_map<int, jobject>& jaccessor)
-  : tuple_desc_(tuple_desc),
-    jaccessors_(jaccessor) {}
+IcebergRowReader::IcebergRowReader(const std::unordered_map<SlotId, jobject>& jaccessors)
+  : jaccessors_(jaccessors) {}
 
 Status IcebergRowReader::InitJNI() {
   DCHECK(iceberg_accessor_cl_ == nullptr) << "InitJNI() already called!";
@@ -65,15 +63,19 @@ Status IcebergRowReader::InitJNI() {
   return Status::OK();
 }
 
-Status IcebergRowReader::MaterializeRow(JNIEnv* env,
-    jobject struct_like_row, Tuple* tuple, MemPool* tuple_data_pool) {
+Status IcebergRowReader::MaterializeTuple(JNIEnv* env,
+    jobject struct_like_row, const TupleDescriptor* tuple_desc, Tuple* tuple,
+    MemPool* tuple_data_pool) {
   DCHECK(env != nullptr);
   DCHECK(struct_like_row != nullptr);
   DCHECK(tuple != nullptr);
   DCHECK(tuple_data_pool != nullptr);
-  for (SlotDescriptor* slot_desc: tuple_desc_->slots()) {
-    jobject accessed_value = env->CallObjectMethod(jaccessors_.at(slot_desc->col_pos()),
-        iceberg_accessor_get_, struct_like_row);
+  DCHECK(tuple_desc != nullptr);
+
+  for (SlotDescriptor* slot_desc: tuple_desc->slots()) {
+    jobject accessor = jaccessors_.at(slot_desc->id());
+    jobject accessed_value = env->CallObjectMethod(accessor, iceberg_accessor_get_,
+        struct_like_row);
     RETURN_ERROR_IF_EXC(env);
     if (accessed_value == nullptr) {
       tuple->SetNull(slot_desc->null_indicator_offset());
@@ -96,6 +98,10 @@ Status IcebergRowReader::MaterializeRow(JNIEnv* env,
       } case TYPE_STRING: { // java.lang.String
         RETURN_IF_ERROR(WriteStringSlot(env, accessed_value, slot, tuple_data_pool));
         break;
+      } case TYPE_STRUCT: {
+        RETURN_IF_ERROR(WriteStructSlot(env, struct_like_row, slot_desc, tuple,
+            tuple_data_pool));
+        break;
       }
       default:
         // Skip the unsupported type and set it to NULL
@@ -108,6 +114,7 @@ Status IcebergRowReader::MaterializeRow(JNIEnv* env,
 
 Status IcebergRowReader::WriteBooleanSlot(JNIEnv* env, jobject accessed_value,
     void* slot) {
+  DCHECK(accessed_value != nullptr);
   DCHECK(env->IsInstanceOf(accessed_value, java_boolean_cl_) == JNI_TRUE);
   jboolean result = env->CallBooleanMethod(accessed_value, boolean_value_);
   RETURN_ERROR_IF_EXC(env);
@@ -116,6 +123,7 @@ Status IcebergRowReader::WriteBooleanSlot(JNIEnv* env, jobject accessed_value,
 }
 
 Status IcebergRowReader::WriteIntSlot(JNIEnv* env, jobject accessed_value, void* slot) {
+  DCHECK(accessed_value != nullptr);
   DCHECK(env->IsInstanceOf(accessed_value, java_int_cl_) == JNI_TRUE);
   jint result = env->CallIntMethod(accessed_value, int_value_);
   RETURN_ERROR_IF_EXC(env);
@@ -124,6 +132,7 @@ Status IcebergRowReader::WriteIntSlot(JNIEnv* env, jobject accessed_value, void*
 }
 
 Status IcebergRowReader::WriteLongSlot(JNIEnv* env, jobject accessed_value, void* slot) {
+  DCHECK(accessed_value != nullptr);
   DCHECK(env->IsInstanceOf(accessed_value, java_long_cl_) == JNI_TRUE);
   jlong result = env->CallLongMethod(accessed_value, long_value_);
   RETURN_ERROR_IF_EXC(env);
@@ -133,6 +142,7 @@ Status IcebergRowReader::WriteLongSlot(JNIEnv* env, jobject accessed_value, void
 
 Status IcebergRowReader::WriteTimeStampSlot(JNIEnv* env, jobject accessed_value,
     void* slot) {
+  DCHECK(accessed_value != nullptr);
   DCHECK(env->IsInstanceOf(accessed_value, java_long_cl_) == JNI_TRUE);
   jlong result = env->CallLongMethod(accessed_value, long_value_);
   RETURN_ERROR_IF_EXC(env);
@@ -143,6 +153,7 @@ Status IcebergRowReader::WriteTimeStampSlot(JNIEnv* env, jobject accessed_value,
 
 Status IcebergRowReader::WriteStringSlot(JNIEnv* env, jobject accessed_value, void* slot,
       MemPool* tuple_data_pool) {
+  DCHECK(accessed_value != nullptr);
   DCHECK(env->IsInstanceOf(accessed_value, java_char_sequence_cl_) == JNI_TRUE);
   jstring result = static_cast<jstring>(env->CallObjectMethod(accessed_value,
       char_sequence_to_string_));
@@ -162,4 +173,12 @@ Status IcebergRowReader::WriteStringSlot(JNIEnv* env, jobject accessed_value, vo
   return Status::OK();
 }
 
+Status IcebergRowReader::WriteStructSlot(JNIEnv* env, jobject struct_like_row,
+    SlotDescriptor* slot_desc, Tuple* tuple, MemPool* tuple_data_pool) {
+  DCHECK(slot_desc != nullptr);
+  RETURN_IF_ERROR(MaterializeTuple(env, struct_like_row,
+      slot_desc->children_tuple_descriptor(), tuple, tuple_data_pool));
+  return Status::OK();
+}
+
 }
\ No newline at end of file
diff --git a/be/src/exec/iceberg-metadata/iceberg-row-reader.h b/be/src/exec/iceberg-metadata/iceberg-row-reader.h
index 15c71a038..51395df20 100644
--- a/be/src/exec/iceberg-metadata/iceberg-row-reader.h
+++ b/be/src/exec/iceberg-metadata/iceberg-row-reader.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include "common/global-types.h"
+
 #include <jni.h>
 #include <unordered_map>
 
@@ -33,16 +35,15 @@ class TupleDescriptor;
 class IcebergRowReader {
  public:
   /// Initialize the tuple descriptor and accessors
-  IcebergRowReader(const TupleDescriptor* tuple_desc,
-      const std::unordered_map<int, jobject>& jaccessors);
+  IcebergRowReader(const std::unordered_map<SlotId, jobject>& jaccessors);
 
   /// JNI setup. Create global references for Java classes and find method ids.
   /// Initializes static members, should be called once per process lifecycle.
   static Status InitJNI();
 
-  /// Materlilize the StructLike Java objects into Impala rows.
-  Status MaterializeRow(JNIEnv* env, jobject struct_like_row, Tuple* tuple,
-      MemPool* tuple_data_pool);
+  /// Materialize the StructLike Java objects into Impala rows.
+  Status MaterializeTuple(JNIEnv* env, jobject struct_like_row,
+      const TupleDescriptor* tuple_desc, Tuple* tuple,  MemPool* tuple_data_pool);
 
  private:
   /// Global class references created with JniUtil.
@@ -62,12 +63,9 @@ class IcebergRowReader {
   inline static jmethodID long_value_ = nullptr;
   inline static jmethodID char_sequence_to_string_ = nullptr;
 
-  /// TupleDescriptor received from the ScanNode.
-  const TupleDescriptor* tuple_desc_;
-
   /// Accessor map for the scan result, pairs the slot ids with the java Accessor
   /// objects.
-  const std::unordered_map<int, jobject> jaccessors_;
+  const std::unordered_map<SlotId, jobject> jaccessors_;
 
   /// Reads the value of a primitive from the StructLike, translates it to a matching
   /// Impala type and writes it into the target tuple. The related Accessor objects are
@@ -82,6 +80,9 @@ class IcebergRowReader {
   /// and reclaims the memory area.
   Status WriteStringSlot(JNIEnv* env, jobject accessed_value, void* slot,
       MemPool* tuple_data_pool);
+  /// Recursively calls MaterializeTuple method with the child tuple of the struct slot.
+  Status WriteStructSlot(JNIEnv* env, jobject struct_like_row, SlotDescriptor* slot_desc,
+      Tuple* tuple, MemPool* tuple_data_pool);
 };
 
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/FromClause.java b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
index b61951592..656c609c4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FromClause.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
@@ -93,6 +93,7 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
       tblRef.analyze(analyzer);
       leftTblRef = tblRef;
       if (tblRef instanceof CollectionTableRef) {
+        checkIcebergCollectionSupport((CollectionTableRef)tblRef);
         checkTopLevelComplexAcidScan(analyzer, (CollectionTableRef)tblRef);
         if (firstZippingUnnestRef != null && tblRef.isZippingUnnest() &&
             firstZippingUnnestRef.getResolvedPath().getRootTable() !=
@@ -161,6 +162,18 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
     analyzer.setHasTopLevelAcidCollectionTableRef();
   }
 
+  private void checkIcebergCollectionSupport(CollectionTableRef tblRef)
+      throws AnalysisException {
+    Preconditions.checkNotNull(tblRef);
+    Preconditions.checkNotNull(tblRef.getDesc());
+    Preconditions.checkNotNull(tblRef.getDesc().getPath());
+    Preconditions.checkNotNull(tblRef.getDesc().getPath().getRootTable());
+    if (tblRef.getDesc().getPath().getRootTable() instanceof IcebergMetadataTable) {
+      throw new AnalysisException("Querying collection types (ARRAY/MAP) is not " +
+          "supported for Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)");
+    }
+  }
+
   @Override
   public FromClause clone() {
     List<TableRef> clone = new ArrayList<>();
diff --git a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
index 65e74d098..33aef0458 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SlotRef.java
@@ -29,6 +29,7 @@ import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.TypeCompatibility;
+import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.UnsupportedFeatureException;
 import org.apache.impala.thrift.TExprNode;
@@ -226,29 +227,37 @@ public class SlotRef extends Expr {
     if (resolvedPath_ != null) {
       FeTable rootTable = resolvedPath_.getRootTable();
       if (rootTable != null) {
-        if (!(rootTable instanceof FeFsTable)) {
-          throw new AnalysisException(
-              String.format("%s is not supported when querying STRUCT type %s", rootTable,
-                  type_.toSql()));
-        }
-        FeFsTable feTable = (FeFsTable) rootTable;
-        for (HdfsFileFormat format : feTable.getFileFormats()) {
-          if (!formatSupportsQueryingStruct(format)) {
-            throw new AnalysisException("Querying STRUCT is only supported for ORC and "
-                + "Parquet file formats.");
-          }
+        checkTableTypeSupportsStruct(rootTable);
+        if (rootTable instanceof FeFsTable) {
+          checkFileFormatSupportsStruct((FeFsTable)rootTable);
         }
       }
     }
   }
 
-  // Returns true if the given HdfsFileFormat supports querying STRUCT types. Iceberg
-  // tables also have ICEBERG as HdfsFileFormat. We can return TRUE in case of Iceberg
-  // because the data file formats in the Iceberg table will be also tested separately.
-  private static boolean formatSupportsQueryingStruct(HdfsFileFormat format) {
-    return format == HdfsFileFormat.PARQUET ||
+  private void checkTableTypeSupportsStruct(FeTable feTable) throws AnalysisException {
+    if (!(feTable instanceof FeFsTable) &&
+        !(feTable instanceof IcebergMetadataTable)) {
+      throw new AnalysisException(
+          String.format("%s is not supported when querying STRUCT type %s",
+              feTable, type_.toSql()));
+    }
+  }
+
+  // Throws exception if the given HdfsFileFormat does not support querying STRUCT types.
+  // Iceberg tables also have ICEBERG as HdfsFileFormat. In case of Iceberg there is no
+  // need to throw exception because the data file formats in the Iceberg table will be
+  // also tested separately.
+  private void checkFileFormatSupportsStruct(FeFsTable feFsTable)
+      throws AnalysisException {
+    for (HdfsFileFormat format : feFsTable.getFileFormats()) {
+      if (! (format == HdfsFileFormat.PARQUET ||
            format == HdfsFileFormat.ORC ||
-           format == HdfsFileFormat.ICEBERG;
+           format == HdfsFileFormat.ICEBERG)) {
+        throw new AnalysisException("Querying STRUCT is only supported for ORC and "
+            + "Parquet file formats.");
+      }
+    }
   }
 
   // Assumes this 'SlotRef' is a struct and that desc_.itemTupleDesc_ has already been
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
index a92b07471..877b31a46 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
@@ -26,14 +26,20 @@ import org.apache.iceberg.MetadataTableUtils;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.impala.analysis.TableName;
+import org.apache.impala.catalog.CatalogObject.ThriftObjectType;
 import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeCatalogUtils;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.VirtualTable;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TColumnDescriptor;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.IcebergSchemaConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -44,6 +50,7 @@ import com.google.common.base.Preconditions;
  * table object based on the Iceberg API.
  */
 public class IcebergMetadataTable extends VirtualTable {
+  private final static Logger LOG = LoggerFactory.getLogger(IcebergMetadataTable.class);
 
   // The Iceberg table that is the base of the metadata table.
   private FeIcebergTable baseTable_;
@@ -64,6 +71,8 @@ public class IcebergMetadataTable extends VirtualTable {
     Schema metadataTableSchema = metadataTable.schema();
     for (Column col : IcebergSchemaConverter.convertToImpalaSchema(
         metadataTableSchema)) {
+      LOG.trace("Adding column: \"{}\" with type: \"{}\" to metadata table.",
+          col.getName(), col.getType());
       addColumn(col);
     }
   }
@@ -111,16 +120,23 @@ public class IcebergMetadataTable extends VirtualTable {
   @Override
   public TTableDescriptor toThriftDescriptor(int tableId,
       Set<Long> referencedPartitions) {
-    TTableDescriptor desc = baseTable_.toThriftDescriptor(tableId, referencedPartitions);
+    TTableDescriptor desc = new TTableDescriptor(tableId, TTableType.ICEBERG_TABLE,
+        getTColumnDescriptors(), numClusteringCols_, name_, db_.getName());
+    desc.setIcebergTable(FeIcebergTable.Utils.getTIcebergTable(baseTable_,
+        ThriftObjectType.DESCRIPTOR_ONLY));
     return desc;
   }
 
+  private List<TColumnDescriptor> getTColumnDescriptors() {
+    return FeCatalogUtils.getTColumnDescriptors(this);
+  }
+
   /**
    * Returns true if the table ref is referring to a valid metadata table.
    */
   public static boolean isIcebergMetadataTable(List<String> tblRefPath) {
     if (tblRefPath == null) return false;
-    if (tblRefPath.size() != 3) return false;
+    if (tblRefPath.size() < 3) return false;
     String vTableName = tblRefPath.get(2).toUpperCase();
     return EnumUtils.isValidEnum(MetadataTableType.class, vTableName);
   }
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java b/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
index a2f7c5b23..18f3d8981 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergMetadataScanner.java
@@ -89,11 +89,10 @@ public class IcebergMetadataScanner {
   }
 
   /**
-   * Returns the field {Accessor} for the specified column position. This {Accessor} is
+   * Returns the field {Accessor} for the specified field id. This {Accessor} then is
    * used to access a field in the {StructLike} object.
    */
-  public Accessor GetAccessor(int slotColPos) {
-    int fieldId = metadataTable_.schema().columns().get(slotColPos).fieldId();
+  public Accessor GetAccessor(int fieldId) {
     return metadataTable_.schema().accessorForField(fieldId);
   }
 
@@ -112,4 +111,5 @@ public class IcebergMetadataScanner {
     }
     return null;
   }
+
 }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test
index a605c94b8..2294b034b 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-metadata-table-scan.test
@@ -106,40 +106,4 @@ PLAN-ROOT SINK
 |
 00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.HISTORY q]
    row-size=8B cardinality=unavailable
-====
-select * from functional_parquet.iceberg_alltypes_part_orc.manifests a, a.partition_summaries
----- PLAN
-PLAN-ROOT SINK
-|
-01:SUBPLAN
-|  row-size=98B cardinality=unavailable
-|
-|--04:NESTED LOOP JOIN [CROSS JOIN]
-|  |  row-size=98B cardinality=10
-|  |
-|  |--02:SINGULAR ROW SRC
-|  |     row-size=72B cardinality=1
-|  |
-|  03:UNNEST [a.partition_summaries]
-|     row-size=0B cardinality=10
-|
-00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.MANIFESTS a]
-   row-size=72B cardinality=unavailable
----- DISTRIBUTEDPLAN
-PLAN-ROOT SINK
-|
-01:SUBPLAN
-|  row-size=98B cardinality=unavailable
-|
-|--04:NESTED LOOP JOIN [CROSS JOIN]
-|  |  row-size=98B cardinality=10
-|  |
-|  |--02:SINGULAR ROW SRC
-|  |     row-size=72B cardinality=1
-|  |
-|  03:UNNEST [a.partition_summaries]
-|     row-size=0B cardinality=10
-|
-00:SCAN ICEBERG METADATA [functional_parquet.iceberg_alltypes_part_orc.MANIFESTS a]
-   row-size=72B cardinality=unavailable
 ====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
index 5a9928451..c34f3aacc 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
@@ -453,4 +453,101 @@ ParseException: Syntax error in line 1
 alter table functional_parquet.iceberg_query_metadata.snapshots add columns (col int);
 ---- CATCH
 ParseException: Syntax error in line 1
+====
+
+####
+# Test 9 : Query STRUCT type columns
+####
+====
+---- QUERY
+select readable_metrics from functional_parquet.iceberg_query_metadata.entries;
+---- RESULTS
+'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":3,"upper_bound":3}}'
+'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":2,"upper_bound":2}}'
+'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":1,"upper_bound":1}}'
+'{"i":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null}}'
+---- TYPES
+STRING
+====
+---- QUERY
+select snapshot_id, readable_metrics from functional_parquet.iceberg_query_metadata.entries;
+---- RESULTS
+row_regex:[1-9]\d*|0,'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":3,"upper_bound":3}}'
+row_regex:[1-9]\d*|0,'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":2,"upper_bound":2}}'
+row_regex:[1-9]\d*|0,'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":1,"upper_bound":1}}'
+row_regex:[1-9]\d*|0,'{"i":{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null}}'
+---- TYPES
+BIGINT,STRING
+====
+---- QUERY
+select snapshot_id, readable_metrics.i.lower_bound as lower_bound from functional_parquet.iceberg_query_metadata.entries;
+---- RESULTS
+row_regex:[1-9]\d*|0,3
+row_regex:[1-9]\d*|0,2
+row_regex:[1-9]\d*|0,1
+row_regex:[1-9]\d*|0,'NULL'
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+select snapshot_id, readable_metrics.i.lower_bound as lower_bound from functional_parquet.iceberg_query_metadata.entries
+order by lower_bound;
+---- RESULTS
+row_regex:[1-9]\d*|0,1
+row_regex:[1-9]\d*|0,2
+row_regex:[1-9]\d*|0,3
+row_regex:[1-9]\d*|0,'NULL'
+---- TYPES
+BIGINT,INT
+====
+---- QUERY
+select SUM(readable_metrics.i.lower_bound) from functional_parquet.iceberg_query_metadata.entries;
+---- RESULTS
+6
+---- TYPES
+BIGINT
+====
+---- QUERY
+select all_ent.data_file.file_path, ent.readable_metrics.i.lower_bound
+from functional_parquet.iceberg_query_metadata.entries ent
+join functional_parquet.iceberg_query_metadata.all_entries all_ent
+on ent.snapshot_id = all_ent.snapshot_id
+order by ent.readable_metrics.i.lower_bound;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq',1
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq',2
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq',3
+row_regex:'$NAMENODE/test-warehouse/iceberg_test/hadoop_catalog/ice/iceberg_query_metadata/data/.*.parq',NULL
+---- TYPES
+STRING,INT
+====
+---- QUERY
+select i from functional_parquet.iceberg_query_metadata.entries.readable_metrics;
+---- CATCH
+AnalysisException: Illegal table reference to non-collection type: 'functional_parquet.iceberg_query_metadata.entries.readable_metrics'
+====
+---- QUERY
+select delete_ids.item
+from functional_parquet.iceberg_query_metadata.all_files, functional_parquet.iceberg_query_metadata.all_files.equality_ids delete_ids;
+---- CATCH
+AnalysisException: Querying collection types (ARRAY/MAP) is not supported for Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
+====
+---- QUERY
+select null_value_counts.key, null_value_counts.value
+from functional_parquet.iceberg_query_metadata.all_files, functional_parquet.iceberg_query_metadata.all_files.null_value_counts null_value_counts;
+---- CATCH
+AnalysisException: Querying collection types (ARRAY/MAP) is not supported for Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
+====
+---- QUERY
+select item
+from functional_parquet.iceberg_query_metadata.all_files a, a.equality_ids e, e.delete_ids;
+---- CATCH
+AnalysisException: Querying collection types (ARRAY/MAP) is not supported for Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
+====
+---- QUERY
+create view iceberg_query_metadata_all_files
+as select equality_ids from functional_parquet.iceberg_query_metadata.all_files;
+select item from iceberg_query_metadata_all_files a, a.equality_ids e, e.delete_ids;
+---- CATCH
+AnalysisException: Querying collection types (ARRAY/MAP) is not supported for Iceberg Metadata tables. (IMPALA-12610, IMPALA-12611)
 ====
\ No newline at end of file
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 557334544..3524e68ea 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1291,7 +1291,7 @@ class TestIcebergV2Table(IcebergTestSuite):
         use_db="functional_parquet")
 
   @SkipIf.hardcoded_uris
-  def test_metadata_tables(self, vector):
+  def test_metadata_tables(self, vector, unique_database):
     with self.create_impala_client() as impalad_client:
       overwrite_snapshot_id = impalad_client.execute("select snapshot_id from "
                              "functional_parquet.iceberg_query_metadata.snapshots "
@@ -1300,7 +1300,7 @@ class TestIcebergV2Table(IcebergTestSuite):
                              "functional_parquet.iceberg_query_metadata.snapshots "
                              "where operation = 'overwrite';")
       self.run_test_case('QueryTest/iceberg-metadata-tables', vector,
-          use_db="functional_parquet",
+          unique_database,
           test_file_vars={'$OVERWRITE_SNAPSHOT_ID': str(overwrite_snapshot_id.data[0]),
                           '$OVERWRITE_SNAPSHOT_TS': str(overwrite_snapshot_ts.data[0])})