You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/03/30 08:37:59 UTC

[impala] branch master updated (2c779939d -> 3b3acd5c0)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 2c779939d IMPALA-11509: Prevent queries hanging when Iceberg metadata is missing.
     new 409ea25a2 IMPALA-12028: test_execute_rollback might fail due to used timezone
     new 3b3acd5c0 IMPALA-11908: Parser change for Iceberg metadata querying

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/impala/analysis/Analyzer.java  |  61 ++++++++--
 .../org/apache/impala/analysis/FromClause.java     |   4 +
 .../impala/analysis/IcebergMetadataTableRef.java   |  56 +++++++++
 .../main/java/org/apache/impala/analysis/Path.java |  10 +-
 .../java/org/apache/impala/analysis/TableName.java |  46 +++++--
 .../org/apache/impala/catalog/IcebergTable.java    |  14 ++-
 .../impala/catalog/IcebergTimeTravelTable.java     |   3 +-
 .../impala/catalog/iceberg/IcebergCtasTarget.java  |   6 +-
 .../catalog/iceberg/IcebergMetadataTable.java      | 103 ++++++++++++++++
 .../apache/impala/planner/SingleNodePlanner.java   |   5 +
 .../impala/service/IcebergCatalogOpExecutor.java   |   6 +-
 .../apache/impala/util/IcebergSchemaConverter.java |   9 +-
 .../queries/QueryTest/iceberg-metadata-tables.test | 132 +++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |   9 +-
 14 files changed, 427 insertions(+), 37 deletions(-)
 create mode 100644 fe/src/main/java/org/apache/impala/analysis/IcebergMetadataTableRef.java
 create mode 100644 fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test


[impala] 02/02: IMPALA-11908: Parser change for Iceberg metadata querying

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3b3acd5c08c55258a9f5bc72fff2ac991ef977cd
Author: Tamas Mate <tm...@apache.org>
AuthorDate: Wed Feb 8 15:41:48 2023 +0100

    IMPALA-11908: Parser change for Iceberg metadata querying
    
    This change extends parsing table references with Iceberg metadata
    tables. The TableName class has been extended with an extra vTbl field
    which is filled when a virtual table reference is suspected. This
    additional field helps to keep the real table in the statement table
    cache next to the virtual table, which should be loaded so Iceberg
    metadata tables can be created.
    
    Iceberg provides a rich API to query metadata, these Iceberg API tables
    are accessible through the MetadataTableUtils class. Using these table
    schemas it is possible to create an Impala table that can be queried
    later on.
    
    Querying a metadata table at this point is expected to throw a
    NotImplementedException.
    
    Testing:
     - Added E2E test to test it for some tables.
    
    Change-Id: I0b5db884b5f3fecbd132fcb2c2cbd6c622ff965b
    Reviewed-on: http://gerrit.cloudera.org:8080/19483
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../java/org/apache/impala/analysis/Analyzer.java  |  61 ++++++++--
 .../org/apache/impala/analysis/FromClause.java     |   4 +
 .../impala/analysis/IcebergMetadataTableRef.java   |  56 +++++++++
 .../main/java/org/apache/impala/analysis/Path.java |  10 +-
 .../java/org/apache/impala/analysis/TableName.java |  46 +++++--
 .../org/apache/impala/catalog/IcebergTable.java    |  14 ++-
 .../impala/catalog/IcebergTimeTravelTable.java     |   3 +-
 .../impala/catalog/iceberg/IcebergCtasTarget.java  |   6 +-
 .../catalog/iceberg/IcebergMetadataTable.java      | 103 ++++++++++++++++
 .../apache/impala/planner/SingleNodePlanner.java   |   5 +
 .../impala/service/IcebergCatalogOpExecutor.java   |   6 +-
 .../apache/impala/util/IcebergSchemaConverter.java |   9 +-
 .../queries/QueryTest/iceberg-metadata-tables.test | 132 +++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |   4 +
 14 files changed, 423 insertions(+), 36 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index b5945eb76..8073fedfc 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.impala.analysis.Path.PathType;
 import org.apache.impala.analysis.StmtMetadataLoader.StmtTableCache;
 import org.apache.impala.authorization.AuthorizationChecker;
@@ -67,10 +68,12 @@ import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.VirtualColumn;
 import org.apache.impala.catalog.VirtualTable;
+import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
 import org.apache.impala.catalog.local.LocalKuduTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.IdGenerator;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.RuntimeEnv;
@@ -918,6 +921,9 @@ public class Analyzer {
     if (resolvedPath.destTable() != null) {
       FeTable table = resolvedPath.destTable();
       if (table instanceof FeView) return new InlineViewRef((FeView) table, tableRef);
+      if (table instanceof IcebergMetadataTable) {
+        return new IcebergMetadataTableRef(tableRef, resolvedPath);
+      }
       // The table must be a base table.
       Preconditions.checkState(table instanceof FeFsTable ||
           table instanceof FeKuduTable ||
@@ -1356,7 +1362,7 @@ public class Analyzer {
         TableName tblName = candidateTbls.get(tblNameIdx);
         FeTable tbl = null;
         try {
-          tbl = getTable(tblName.getDb(), tblName.getTbl(), /* must_exist */ false);
+          tbl = getTable(tblName, /* must_exist */ false);
         } catch (AnalysisException e) {
           // Ignore to allow path resolution to continue.
         }
@@ -1374,11 +1380,10 @@ public class Analyzer {
             timeTravelSpec.analyze(this);
 
             FeIcebergTable rootTable = (FeIcebergTable) tbl;
-            IcebergTimeTravelTable timeTravelTable =
-                new IcebergTimeTravelTable(rootTable, timeTravelSpec);
-            tbl = timeTravelTable;
+            tbl = new IcebergTimeTravelTable(rootTable, timeTravelSpec);
           }
-          candidates.add(new Path(tbl, rawPath.subList(tblNameIdx + 1, rawPath.size())));
+          int offset = tblNameIdx + (tbl instanceof IcebergMetadataTable ? 2 : 1);
+          candidates.add(new Path(tbl, rawPath.subList(offset, rawPath.size())));
         }
       }
       LOG.trace("Replace candidates with {}", candidates);
@@ -3377,15 +3382,14 @@ public class Analyzer {
   }
 
   /**
-   * Returns the Table for the given database and table name from the 'stmtTableCache'
-   * in the global analysis state.
+   * Returns the Table for the given TableName from the 'stmtTableCache' in the global
+   * analysis state.
    * Throws an AnalysisException if the database or table does not exist.
    * Throws a TableLoadingException if the registered table failed to load.
    * Does not register authorization requests or access events.
    */
-  public FeTable getTable(String dbName, String tableName, boolean mustExist)
+  public FeTable getTable(TableName tblName, boolean mustExist)
       throws AnalysisException, TableLoadingException {
-    TableName tblName = new TableName(dbName, tableName);
     FeTable table = globalState_.stmtTableCache.tables.get(tblName);
     if (table == null) {
       if (!mustExist) {
@@ -3403,11 +3407,20 @@ public class Analyzer {
       // when it is accessed.
       ImpalaException cause = ((FeIncompleteTable) table).getCause();
       if (cause instanceof TableLoadingException) throw (TableLoadingException) cause;
-      throw new TableLoadingException("Missing metadata for table: " + tableName, cause);
+      throw new TableLoadingException("Missing metadata for table: " + tblName, cause);
     }
     return table;
   }
 
+  /**
+   * Wrapper around {@link #getTable(TableName tblName, boolean mustExist)}.
+   */
+  public FeTable getTable(String dbName, String tableName, boolean mustExist)
+      throws AnalysisException, TableLoadingException {
+    TableName tblName = new TableName(dbName, tableName);
+    return getTable(tblName, mustExist);
+  }
+
   /**
    * Adds auxiliary virtual table for a query.
    */
@@ -3416,6 +3429,34 @@ public class Analyzer {
     globalState_.stmtTableCache.tables.put(tblName, virtTable);
   }
 
+  /**
+   * Adds a new Iceberg metadata table to the stmt table cache. At this point it is
+   * unknown if the base table is loaded for scanning as well, therefore the original
+   * table is kept. The metadata table will have its vTbl field filled, while the original
+   * table gets a new key without the vTbl field.
+   * 'tblRefPath' parameter has to be an IcebergMetadataTable reference path.
+   */
+  public void addMetadataVirtualTable(List<String> tblRefPath) throws AnalysisException {
+    Preconditions.checkArgument(IcebergMetadataTable.isIcebergMetadataTable(tblRefPath));
+    try {
+      TableName catalogTableName = new TableName(tblRefPath.get(0),
+          tblRefPath.get(1));
+      TableName virtualTableName = new TableName(tblRefPath.get(0),
+          tblRefPath.get(1), tblRefPath.get(2));
+      // The catalog table (the base of the virtual table) has been loaded and cached
+      // under the name of the virtual table.
+      FeTable catalogTable = getStmtTableCache().tables.get(virtualTableName);
+      if (catalogTable instanceof IcebergMetadataTable) return;
+      IcebergMetadataTable virtualTable =
+          new IcebergMetadataTable(catalogTable, tblRefPath.get(2));
+      getStmtTableCache().tables.put(catalogTableName, catalogTable);
+      getStmtTableCache().tables.put(virtualTableName, virtualTable);
+    } catch (ImpalaRuntimeException e) {
+      throw new AnalysisException("Could not create metadata table for table "
+          + "reference: " + StringUtils.join(tblRefPath, "."), e);
+    }
+  }
+
   public org.apache.kudu.client.KuduTable getKuduTable(FeKuduTable feKuduTable)
       throws AnalysisException {
     String tableName = feKuduTable.getFullName();
diff --git a/fe/src/main/java/org/apache/impala/analysis/FromClause.java b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
index 765980b90..b61951592 100644
--- a/fe/src/main/java/org/apache/impala/analysis/FromClause.java
+++ b/fe/src/main/java/org/apache/impala/analysis/FromClause.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.impala.analysis.TableRef.ZippingUnnestType;
 import org.apache.impala.catalog.Type;
+import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.util.AcidUtils;
 
@@ -83,6 +84,9 @@ public class FromClause extends StmtNode implements Iterable<TableRef> {
     boolean hasJoiningUnnest = false;
     for (int i = 0; i < tableRefs_.size(); ++i) {
       TableRef tblRef = tableRefs_.get(i);
+      if (IcebergMetadataTable.isIcebergMetadataTable(tblRef.getPath())) {
+        analyzer.addMetadataVirtualTable(tblRef.getPath());
+      }
       tblRef = analyzer.resolveTableRef(tblRef);
       tableRefs_.set(i, Preconditions.checkNotNull(tblRef));
       tblRef.setLeftTblRef(leftTblRef);
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergMetadataTableRef.java b/fe/src/main/java/org/apache/impala/analysis/IcebergMetadataTableRef.java
new file mode 100644
index 000000000..78823d002
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergMetadataTableRef.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
+import org.apache.impala.common.AnalysisException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * TableRef class for Iceberg metadata tables.
+ */
+public class IcebergMetadataTableRef extends TableRef {
+
+  public IcebergMetadataTableRef(TableRef tableRef, Path resolvedPath) {
+    super(tableRef);
+    Preconditions.checkState(resolvedPath.isResolved());
+    Preconditions.checkState(resolvedPath.isRootedAtTable());
+    Preconditions.checkState(resolvedPath.getRootTable() instanceof IcebergMetadataTable);
+    resolvedPath_ = resolvedPath;
+    IcebergMetadataTable iceMTbl = (IcebergMetadataTable)resolvedPath.getRootTable();
+    FeIcebergTable iceTbl = iceMTbl.getBaseTable();
+    if (hasExplicitAlias()) return;
+    aliases_ = new String[] {
+      iceTbl.getTableName().toString().toLowerCase(),
+      iceTbl.getName().toLowerCase()};
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    if (isAnalyzed_) return;
+    IcebergMetadataTable rootTable = (IcebergMetadataTable)resolvedPath_.getRootTable();
+    FeTable iceRootTable = rootTable.getBaseTable();
+    analyzer.registerAuthAndAuditEvent(iceRootTable, priv_, requireGrantOption_);
+    desc_ = analyzer.registerTableRef(this);
+    isAnalyzed_ = true;
+  }
+
+}
diff --git a/fe/src/main/java/org/apache/impala/analysis/Path.java b/fe/src/main/java/org/apache/impala/analysis/Path.java
index 044a42be4..2f37489e4 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Path.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Path.java
@@ -29,6 +29,7 @@ import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.catalog.VirtualColumn;
+import org.apache.impala.catalog.iceberg.IcebergMetadataTable;
 import org.apache.impala.thrift.TVirtualColumnType;
 import org.apache.impala.util.AcidUtils;
 
@@ -336,6 +337,9 @@ public class Path {
    * a.b -> [<sessionDb>.a, a.b]
    * a.b.c -> [<sessionDb>.a, a.b]
    * a.b.c... -> [<sessionDb>.a, a.b]
+   *
+   * Notes on Iceberg tables:
+   * a.b.c -> translates to metadata table querying
    */
   public static List<TableName> getCandidateTables(List<String> path, String sessionDb) {
     Preconditions.checkArgument(path != null && !path.isEmpty());
@@ -344,7 +348,11 @@ public class Path {
     for (int tblNameIdx = 0; tblNameIdx < end; ++tblNameIdx) {
       String dbName = (tblNameIdx == 0) ? sessionDb : path.get(0);
       String tblName = path.get(tblNameIdx);
-      result.add(new TableName(dbName, tblName));
+      String vTblName = null;
+      if (IcebergMetadataTable.isIcebergMetadataTable(path)) {
+        vTblName = path.get(2);
+      }
+      result.add(new TableName(dbName, tblName, vTblName));
     }
     return result;
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableName.java b/fe/src/main/java/org/apache/impala/analysis/TableName.java
index e413cca2b..d7cdcc6c6 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableName.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableName.java
@@ -31,27 +31,36 @@ import org.apache.impala.util.CatalogBlacklistUtils;
 
 /**
  * Represents a table/view name that optionally includes its database (a fully qualified
- * table name). Analysis of this table name checks for validity of the database and
- * table name according to the Metastore's policy (see @MetaStoreUtils).
- * According to that definition, we can still use "invalid" table names for tables/views
- * that are not stored in the Metastore, e.g., for Inline Views or WITH-clause views.
+ * table name) or a virtual table which requires fully qualified name. Analysis of this
+ * table name checks for validity of the database and table name according to the
+ * Metastore's policy (see @MetaStoreUtils). According to that definition, we can still
+ * use "invalid" table names for tables/views that are not stored in the Metastore, e.g.,
+ * for Inline Views or WITH-clause views. Virtual tables are metadata tables of the "real"
+ * tables, currently only supported for Iceberg tables.
  */
 public class TableName {
   private final String db_;
   private final String tbl_;
+  private final String vTbl_;
 
   public TableName(String db, String tbl) {
+    this(db, tbl, null);
+  }
+
+  public TableName(String db, String tbl, String vTbl) {
     super();
     Preconditions.checkArgument(db == null || !db.isEmpty());
     this.db_ = db;
     Preconditions.checkNotNull(tbl);
     this.tbl_ = tbl;
+    Preconditions.checkArgument(vTbl == null || !vTbl.isEmpty());
+    this.vTbl_ = vTbl;
   }
 
   /**
-   * Parse the given full name (in format <db>.<tbl>) and return a TableName object.
-   * Return null for any failures. Note that we keep table names in lower case so the
-   * string will be converted to lower case first.
+   * Parse the given full name (in format <db>.<tbl>.<vTbl>) and return a TableName
+   * object. Return null for any failures. Note that we keep table names in lower case so
+   * the string will be converted to lower case first.
    */
   public static TableName parse(String fullName) {
     // Avoid "db1." and ".tbl1" being treated as the same. We resolve ".tbl1" as
@@ -66,11 +75,16 @@ public class TableName {
     if (parts.size() == 2) {
       return new TableName(parts.get(0), parts.get(1));
     }
+    if (parts.size() == 3) {
+      // Only fully qualified names are supported for vtables
+      return new TableName(parts.get(0), parts.get(1), parts.get(2));
+    }
     return null;
   }
 
   public String getDb() { return db_; }
   public String getTbl() { return tbl_; }
+  public String getVTbl() { return vTbl_; }
   public boolean isEmpty() { return tbl_.isEmpty(); }
 
   /**
@@ -99,20 +113,30 @@ public class TableName {
   public String toSql() {
     // Enclose the database and/or table name in quotes if Hive cannot parse them
     // without quotes. This is needed for view compatibility between Impala and Hive.
+    StringBuilder result = new StringBuilder();
     if (db_ == null) {
-      return ToSqlUtils.getIdentSql(tbl_);
+      result.append(ToSqlUtils.getIdentSql(tbl_));
     } else {
-      return ToSqlUtils.getIdentSql(db_) + "." + ToSqlUtils.getIdentSql(tbl_);
+      result.append(ToSqlUtils.getIdentSql(db_) + "." + ToSqlUtils.getIdentSql(tbl_));
+      if (vTbl_ != null && !vTbl_.isEmpty()) {
+        result.append("." + ToSqlUtils.getIdentSql(vTbl_));
+      }
     }
+    return result.toString();
   }
 
   @Override
   public String toString() {
+    StringBuilder result = new StringBuilder();
     if (db_ == null) {
-      return tbl_;
+      result.append(tbl_);
     } else {
-      return db_ + "." + tbl_;
+      result.append(db_ + "." + tbl_);
+      if (vTbl_ != null && !vTbl_.isEmpty()) {
+        result.append( "." + vTbl_);
+      }
     }
+    return result.toString();
   }
 
   public List<String> toPath() {
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index 5b329f121..2dead16eb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -33,6 +33,7 @@ import org.apache.impala.analysis.IcebergPartitionField;
 import org.apache.impala.analysis.IcebergPartitionSpec;
 import org.apache.impala.analysis.IcebergPartitionTransform;
 import org.apache.impala.catalog.iceberg.GroupedContentFiles;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCompressionCodec;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
@@ -405,10 +406,15 @@ public class IcebergTable extends Table implements FeIcebergTable {
    */
   private void loadSchema() throws TableLoadingException {
     clearColumns();
-    msTable_.getSd().setCols(IcebergSchemaConverter.convertToHiveSchema(
-        getIcebergSchema()));
-    for (Column col : IcebergSchemaConverter.convertToImpalaSchema(getIcebergSchema())) {
-      addColumn(col);
+    try {
+      msTable_.getSd().setCols(IcebergSchemaConverter.convertToHiveSchema(
+          getIcebergSchema()));
+      for (Column col : IcebergSchemaConverter.convertToImpalaSchema(
+          getIcebergSchema())) {
+        addColumn(col);
+      }
+    } catch (ImpalaRuntimeException e) {
+      throw new TableLoadingException(e.getMessage(), e);
     }
   }
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTimeTravelTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTimeTravelTable.java
index 6b7967858..381c929b6 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTimeTravelTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTimeTravelTable.java
@@ -34,6 +34,7 @@ import org.apache.impala.analysis.TableName;
 import org.apache.impala.analysis.TimeTravelSpec;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumnDescriptor;
 import org.apache.impala.thrift.TCompressionCodec;
@@ -119,7 +120,7 @@ public class IcebergTimeTravelTable
       for (Column col : IcebergSchemaConverter.convertToImpalaSchema(icebergSchema)) {
         addColumn(col);
       }
-    } catch (TableLoadingException e) {
+    } catch (ImpalaRuntimeException e) {
       throw new AnalysisException("Could not create iceberg schema.", e);
     }
   }
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
index 678e95667..67eb50525 100644
--- a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergCtasTarget.java
@@ -116,13 +116,13 @@ public class IcebergCtasTarget extends CtasTargetTable implements FeIcebergTable
       // In genIcebergSchema() we did our best to assign correct field ids to columns,
       // but to be sure, let's use Iceberg's API function to assign field ids.
       iceSchema_ = TypeUtil.assignIncreasingFreshIds(iceSchema_);
+      for (Column col : IcebergSchemaConverter.convertToImpalaSchema(iceSchema_)) {
+        addColumn((IcebergColumn)col);
+      }
     } catch (ImpalaRuntimeException ex) {
       throw new CatalogException(
         "Exception caught during generating Iceberg schema:", ex);
     }
-    for (Column col : IcebergSchemaConverter.convertToImpalaSchema(iceSchema_)) {
-      addColumn((IcebergColumn)col);
-    }
   }
 
   private void createPartitionSpec(IcebergPartitionSpec partSpec)
diff --git a/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
new file mode 100644
index 000000000..42756c62c
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/iceberg/IcebergMetadataTable.java
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.iceberg;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.EnumUtils;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.impala.catalog.Column;
+import org.apache.impala.catalog.FeCatalogUtils;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.catalog.VirtualTable;
+import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.thrift.TTableDescriptor;
+import org.apache.impala.thrift.TTableStats;
+import org.apache.impala.util.IcebergSchemaConverter;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Iceberg metadata tables are predefined tables by Iceberg library. IcebergMetadataTable
+ * is the Impala representation of these tables so this data can be queried. The schema of
+ * the Iceberg metadata table is available through the Iceberg API. This class creates a
+ * table object based on the Iceberg API.
+ */
+public class IcebergMetadataTable extends VirtualTable {
+  private FeIcebergTable baseTable_;
+
+  public IcebergMetadataTable(FeTable baseTable, String metadataTableTypeStr)
+      throws ImpalaRuntimeException {
+    super(null, baseTable.getDb(), baseTable.getName(), baseTable.getOwnerUser());
+    Preconditions.checkArgument(baseTable instanceof FeIcebergTable);
+    baseTable_ = (FeIcebergTable) baseTable;
+    MetadataTableType type = MetadataTableType.from(metadataTableTypeStr.toUpperCase());
+    Preconditions.checkNotNull(type);
+    Table metadataTable = MetadataTableUtils.createMetadataTableInstance(
+        baseTable_.getIcebergApiTable(), type);
+    Schema metadataTableSchema = metadataTable.schema();
+    for (Column col : IcebergSchemaConverter.convertToImpalaSchema(
+        metadataTableSchema)) {
+      addColumn(col);
+    }
+  }
+
+  @Override
+  public long getNumRows() {
+    return -1;
+  }
+
+  public FeIcebergTable getBaseTable() {
+    return baseTable_;
+  }
+
+  @Override
+  public TTableStats getTTableStats() {
+    long totalBytes = 0;
+    TTableStats ret = new TTableStats(getNumRows());
+    ret.setTotal_file_bytes(totalBytes);
+    return ret;
+  }
+
+  /**
+   * Return same descriptor as the base table, but with a schema that corresponds to
+   * the metadtata table schema.
+   */
+  @Override
+  public TTableDescriptor toThriftDescriptor(int tableId,
+      Set<Long> referencedPartitions) {
+    TTableDescriptor desc = baseTable_.toThriftDescriptor(tableId, referencedPartitions);
+    desc.setColumnDescriptors(FeCatalogUtils.getTColumnDescriptors(this));
+    return desc;
+  }
+
+  /**
+   * Returns true if the table ref is referring to a valid metadata table.
+   */
+  public static boolean isIcebergMetadataTable(List<String> tblRefPath) {
+    if (tblRefPath == null) return false;
+    if (tblRefPath.size() != 3) return false;
+    String vTableName = tblRefPath.get(2).toUpperCase();
+    return EnumUtils.isValidEnum(MetadataTableType.class, vTableName);
+  }
+}
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index ec2a61617..ebfbff32b 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -38,6 +38,7 @@ import org.apache.impala.analysis.CollectionTableRef;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.analysis.ExprId;
 import org.apache.impala.analysis.ExprSubstitutionMap;
+import org.apache.impala.analysis.IcebergMetadataTableRef;
 import org.apache.impala.analysis.InlineViewRef;
 import org.apache.impala.analysis.JoinOperator;
 import org.apache.impala.analysis.MultiAggregateInfo;
@@ -2211,6 +2212,10 @@ public class SingleNodePlanner {
       Preconditions.checkState(ctx_.hasSubplan());
       result = new SingularRowSrcNode(ctx_.getNextNodeId(), ctx_.getSubplan());
       result.init(analyzer);
+    } else if (tblRef instanceof IcebergMetadataTableRef) {
+      throw new NotImplementedException(String.format("'%s' refers to a metadata table "
+          + "which is currently not supported.", String.join(".",
+          tblRef.getPath())));
     } else {
       throw new NotImplementedException(
           "Planning not implemented for table ref class: " + tblRef.getClass());
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 10d19f7c5..a49a5b3bf 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -97,7 +97,11 @@ public class IcebergCatalogOpExecutor {
   public static void populateExternalTableCols(
       org.apache.hadoop.hive.metastore.api.Table msTbl, Table iceTbl)
       throws TableLoadingException {
-    msTbl.getSd().setCols(IcebergSchemaConverter.convertToHiveSchema(iceTbl.schema()));
+    try {
+      msTbl.getSd().setCols(IcebergSchemaConverter.convertToHiveSchema(iceTbl.schema()));
+    } catch (ImpalaRuntimeException e) {
+      throw new TableLoadingException(e.getMessage());
+    }
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java b/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java
index 9757210f5..ddb18f00f 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergSchemaConverter.java
@@ -31,7 +31,6 @@ import org.apache.impala.catalog.MapType;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
-import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TColumn;
@@ -56,7 +55,7 @@ public class IcebergSchemaConverter {
    * Transform iceberg type to impala type
    */
   public static Type toImpalaType(org.apache.iceberg.types.Type t)
-      throws TableLoadingException {
+      throws ImpalaRuntimeException {
     switch (t.typeId()) {
       case BOOLEAN:
         return Type.BOOLEAN;
@@ -101,7 +100,7 @@ public class IcebergSchemaConverter {
         return new StructType(structFields);
       }
       default:
-        throw new TableLoadingException(String.format(
+        throw new ImpalaRuntimeException(String.format(
             "Iceberg type '%s' is not supported in Impala", t.typeId()));
     }
   }
@@ -110,7 +109,7 @@ public class IcebergSchemaConverter {
    * Converts Iceberg schema to a Hive schema.
    */
   public static List<FieldSchema> convertToHiveSchema(Schema schema)
-      throws TableLoadingException {
+      throws ImpalaRuntimeException {
     List<FieldSchema> ret = new ArrayList<>();
     for (Types.NestedField column : schema.columns()) {
       Type colType = toImpalaType(column.type());
@@ -125,7 +124,7 @@ public class IcebergSchemaConverter {
    * Converts Iceberg schema to an Impala schema.
    */
   public static List<Column> convertToImpalaSchema(Schema schema)
-      throws TableLoadingException {
+      throws ImpalaRuntimeException {
     List<Column> ret = new ArrayList<>();
     int pos = 0;
     for (Types.NestedField column : schema.columns()) {
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
new file mode 100644
index 000000000..ee1059e41
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
@@ -0,0 +1,132 @@
+====
+---- QUERY
+# List of all metadata tables in current version
+select * from functional_parquet.iceberg_alltypes_part_orc.entries
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.entries' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+# 'Files' is a keyword and need to be escaped
+select * from functional_parquet.iceberg_alltypes_part_orc.`files`
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.files' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc.data_files
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.data_files' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc.delete_files
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.delete_files' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc.history
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.history' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc.snapshots
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.snapshots' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc.manifests
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.manifests' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+# 'Partitions' is a keyword and need to be escaped
+select * from functional_parquet.iceberg_alltypes_part_orc.`partitions`
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.partitions' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc.all_data_files
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.all_data_files' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc.all_files
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.all_files' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc.all_manifests
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.all_manifests' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+select * from functional_parquet.iceberg_alltypes_part_orc.all_entries
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.all_entries' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+# Select list with column name
+select snapshot_id from functional_parquet.iceberg_alltypes_part_orc.history
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.history' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+# Joining tables
+select *
+from functional_parquet.iceberg_alltypes_part_orc.history q
+  join functional_parquet.iceberg_alltypes_part_orc.snapshots z
+  on z.snapshot_id = q.snapshot_id
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.history' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+# Inline query
+select x.snapshot_id
+from (select * from functional_parquet.iceberg_alltypes_part_orc.history) x;
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.history' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+# Complext type
+select *, a.partition_summaries.pos from functional_parquet.iceberg_alltypes_part_orc.manifests a, a.partition_summaries
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.manifests' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
+---- QUERY
+# Using complex type 'map' column without a join
+select summary from functional_parquet.iceberg_alltypes_part_orc.snapshots;
+---- CATCH
+NotImplementedException: 'functional_parquet.iceberg_alltypes_part_orc.snapshots' refers to a metadata table which is currently not supported.
+---- TYPES
+STRING
+====
\ No newline at end of file
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 328c03247..9154ca6be 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1136,3 +1136,7 @@ class TestIcebergV2Table(IcebergTestSuite):
   def test_table_sampling_v2(self, vector):
     self.run_test_case('QueryTest/iceberg-tablesample-v2', vector,
         use_db="functional_parquet")
+
+  def test_metadata_tables(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-metadata-tables', vector,
+        use_db="functional_parquet")


[impala] 01/02: IMPALA-12028: test_execute_rollback might fail due to used timezone

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 409ea25a299b8d760659c4e091e19857d7c66909
Author: Gabor Kaszab <ga...@cloudera.com>
AuthorDate: Tue Mar 28 12:41:06 2023 +0200

    IMPALA-12028: test_execute_rollback might fail due to used timezone
    
    There are some steps in the test that set the timezone and execute
    rollback to given time. However, the snapshot creation, and querying
    the current time uses local timezone. As a reuslt if the test is run
    e.g. in CET timezone it fails when expecting an error in the Icelandic
    timezone.
    
    Tests:
      - Re-ran TestIcebergTable.test_execute_rollback.
    
    Change-Id: Iba9724f9b86cc508e6497eb33844a6480498b6e4
    Reviewed-on: http://gerrit.cloudera.org:8080/19655
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/query_test/test_iceberg.py | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 5895a78f8..328c03247 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -21,6 +21,7 @@ import datetime
 import logging
 import os
 import pytest
+import pytz
 import random
 
 import re
@@ -292,6 +293,8 @@ class TestIcebergTable(IcebergTestSuite):
       # We are setting the TIMEZONE query option in this test, so let's create a local
       # impala client.
       with self.create_impala_client() as impalad_client:
+        orig_timezone = 'America/Los_Angeles'
+        impalad_client.execute("SET TIMEZONE='" + orig_timezone + "'")
         impalad_client.execute("""
           create table {0} (i int) stored as iceberg
           TBLPROPERTIES ({1})""".format(tbl_name, catalog_properties))
@@ -321,7 +324,7 @@ class TestIcebergTable(IcebergTestSuite):
                           "state")
 
         # Create another snapshot.
-        before_insert = datetime.datetime.now()
+        before_insert = datetime.datetime.now(pytz.timezone(orig_timezone))
         impalad_client.execute("INSERT INTO {0} VALUES ({1})".format(tbl_name, 4))
         snapshots = get_snapshots(impalad_client, tbl_name, expected_result_size=5)