You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/09/28 17:36:11 UTC

[impala] 03/03: IMPALA-12406: OPTIMIZE statement as an alias for INSERT OVERWRITE

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2d3289027c2ffdd245d13b60e6fa3f9b3e7bf833
Author: Noemi Pap-Takacs <np...@cloudera.com>
AuthorDate: Mon Jul 10 11:04:27 2023 +0200

    IMPALA-12406: OPTIMIZE statement as an alias for INSERT OVERWRITE
    
    If an Iceberg table is frequently updated/written to in small batches,
    a lot of small files are created. This decreases read performance.
    Similarly, frequent row-level deletes contribute to this problem
    by creating delete files, which have to be merged on read.
    
    So far INSERT OVERWRITE (rewriting the table with itself) has been used
    to compact Iceberg tables.
    However, it comes with some RESTRICTIONS:
    - The table should not have multiple partition specs/partition evolution.
    - The table should not contain complex types.
    
    The OPTIMIZE statement offers a new syntax and a solution limited to
    Iceberg tables to enhance read performance for subsequent operations.
    See IMPALA-12293 for details.
    
    Syntax: OPTIMIZE TABLE <table_name>;
    
    This first patch introduces the new syntax, temporarily as an alias
    for INSERT OVERWRITE.
    
    Note that executing OPTIMIZE TABLE requires ALL privileges.
    
    Testing:
     - negative tests
     - FE planner test
     - Ranger test
     - E2E tests
    
    Change-Id: Ief42537499ffe64fafdefe25c8d175539234c4e7
    Reviewed-on: http://gerrit.cloudera.org:8080/20405
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/src/main/cup/sql-parser.cup                     |  17 ++-
 .../apache/impala/analysis/AnalysisContext.java    |  14 ++-
 .../org/apache/impala/analysis/InsertStmt.java     |   3 +-
 .../org/apache/impala/analysis/OptimizeStmt.java   | 109 ++++++++++++++++++
 .../org/apache/impala/planner/PlannerContext.java  |   4 +-
 .../java/org/apache/impala/service/Frontend.java   |   5 +-
 fe/src/main/jflex/sql-scanner.flex                 |   1 +
 .../org/apache/impala/analysis/ParserTest.java     |   4 +-
 .../queries/PlannerTest/insert-sort-by-zorder.test |  27 +++++
 .../queries/QueryTest/iceberg-negative.test        |  18 ++-
 .../queries/QueryTest/iceberg-optimize.test        | 123 +++++++++++++++++++++
 .../queries/QueryTest/ranger_column_masking.test   |   6 +
 tests/authorization/test_ranger.py                 |   5 +
 tests/query_test/test_iceberg.py                   |  31 ++++++
 14 files changed, 356 insertions(+), 11 deletions(-)

diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index eca1ee3b8..e46a57307 100755
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -318,7 +318,7 @@ terminal
   KW_JOIN, KW_JSONFILE, KW_KUDU, KW_LAST, KW_LEFT, KW_LEXICAL, KW_LIKE, KW_LIMIT, KW_LINES,
   KW_LOAD, KW_LOCATION, KW_LOGICAL_OR, KW_MANAGED_LOCATION, KW_MAP, KW_MERGE_FN,
   KW_METADATA, KW_MINUS, KW_NON, KW_NORELY, KW_NOT,
-  KW_NOVALIDATE, KW_NULL, KW_NULLS, KW_OF, KW_OFFSET, KW_ON, KW_OR,
+  KW_NOVALIDATE, KW_NULL, KW_NULLS, KW_OF, KW_OFFSET, KW_ON, KW_OPTIMIZE, KW_OR,
   KW_ORC, KW_ORDER, KW_OUTER,
   KW_OVER, KW_OVERWRITE, KW_PARQUET, KW_PARQUETFILE, KW_PARTITION, KW_PARTITIONED,
   KW_PARTITIONS, KW_PRECEDING, KW_PREPARE_FN, KW_PRIMARY, KW_PRODUCED, KW_PURGE,
@@ -462,6 +462,7 @@ nonterminal Expr sign_chain_expr;
 nonterminal InsertStmt insert_stmt, upsert_stmt;
 nonterminal UpdateStmt update_stmt;
 nonterminal DeleteStmt delete_stmt;
+nonterminal OptimizeStmt optimize_stmt;
 nonterminal List<Pair<SlotRef, Expr>> update_set_expr_list;
 nonterminal StatementBase explain_stmt;
 // Optional partition spec
@@ -673,6 +674,8 @@ stmt ::=
   {: RESULT = upsert; :}
   | delete_stmt:delete
   {: RESULT = delete; :}
+  | optimize_stmt:optimize
+  {: RESULT = optimize; :}
   | use_stmt:use
   {: RESULT = use; :}
   | show_tables_stmt:show_tables
@@ -837,6 +840,11 @@ explain_stmt ::=
      delete.setIsExplain();
      RESULT = delete;
   :}
+  | KW_EXPLAIN optimize_stmt:optimize
+  {:
+     optimize.setIsExplain();
+     RESULT = optimize;
+  :}
   ;
 
 copy_testcase_stmt ::=
@@ -986,6 +994,11 @@ delete_stmt ::=
   {: RESULT = new DeleteStmt(target_table, from, where); :}
   ;
 
+optimize_stmt ::=
+  KW_OPTIMIZE KW_TABLE table_name:table
+  {: RESULT = new OptimizeStmt(table); :}
+  ;
+
 opt_query_stmt ::=
   query_stmt:query
   {: RESULT = query; :}
@@ -4422,6 +4435,8 @@ word ::=
   {: RESULT = r.toString(); :}
   | KW_ON:r
   {: RESULT = r.toString(); :}
+  | KW_OPTIMIZE:r
+  {: RESULT = r.toString(); :}
   | KW_OR:r
   {: RESULT = r.toString(); :}
   | KW_ORC:r
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index a712cbaa8..c2776d04b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -95,6 +95,7 @@ public class AnalysisContext {
     public boolean isQueryStmt() { return stmt_ instanceof QueryStmt; }
     public boolean isSetOperationStmt() { return stmt_ instanceof SetOperationStmt; }
     public boolean isInsertStmt() { return stmt_ instanceof InsertStmt; }
+    public boolean isOptimizeStmt() { return stmt_ instanceof OptimizeStmt; }
     public boolean isDropDbStmt() { return stmt_ instanceof DropDbStmt; }
     public boolean isDropTableOrViewStmt() {
       return stmt_ instanceof DropTableOrViewStmt;
@@ -187,7 +188,7 @@ public class AnalysisContext {
     }
 
     public boolean isDmlStmt() {
-      return isInsertStmt() || isUpdateStmt() || isDeleteStmt();
+      return isInsertStmt() || isUpdateStmt() || isDeleteStmt() || isOptimizeStmt();
     }
 
     /**
@@ -197,7 +198,7 @@ public class AnalysisContext {
     public boolean isHierarchicalAuthStmt() {
       return isQueryStmt() || isInsertStmt() || isUpdateStmt() || isDeleteStmt()
           || isCreateTableAsSelectStmt() || isCreateViewStmt() || isAlterViewStmt()
-          || isTestCaseStmt();
+          || isOptimizeStmt() || isTestCaseStmt();
     }
 
     /**
@@ -296,9 +297,16 @@ public class AnalysisContext {
       return (QueryStmt) stmt_;
     }
 
+    public OptimizeStmt getOptimizeStmt() {
+      Preconditions.checkState(isOptimizeStmt());
+      return (OptimizeStmt) stmt_;
+    }
+
     public InsertStmt getInsertStmt() {
       if (isCreateTableAsSelectStmt()) {
         return getCreateTableAsSelectStmt().getInsertStmt();
+      } else if (isOptimizeStmt()) {
+        return getOptimizeStmt().getInsertStmt();
       } else {
         Preconditions.checkState(isInsertStmt());
         return (InsertStmt) stmt_;
@@ -407,7 +415,7 @@ public class AnalysisContext {
     }
     public boolean requiresExprRewrite() {
       return isQueryStmt() || isInsertStmt() || isCreateTableAsSelectStmt()
-          || isUpdateStmt() || isDeleteStmt();
+          || isUpdateStmt() || isDeleteStmt() || isOptimizeStmt();
     }
     public boolean requiresSetOperationRewrite() {
       return analyzer_.containsSetOperation() && !isCreateViewStmt() && !isAlterViewStmt()
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index ec9dc8946..433164ddf 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -602,7 +602,8 @@ public class InsertStmt extends DmlStatementBase {
         if (iceTable.getPartitionSpecs().size() > 1) {
           throw new AnalysisException("The Iceberg table has multiple partition specs. " +
               "This means the outcome of dynamic partition overwrite is unforeseeable. " +
-              "Consider using TRUNCATE and INSERT INTO to overwrite your table.");
+              "Consider using TRUNCATE then INSERT INTO from the previous snapshot " +
+              "to overwrite your table.");
         }
         validateBucketTransformForOverwrite(iceTable);
       }
diff --git a/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
new file mode 100644
index 000000000..519823e5a
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/analysis/OptimizeStmt.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.analysis;
+
+import com.google.common.base.Preconditions;
+import org.apache.impala.authorization.Privilege;
+import org.apache.impala.catalog.FeIcebergTable;
+import org.apache.impala.catalog.FeTable;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.rewrite.ExprRewriter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Representation of an OPTIMIZE statement used to execute table maintenance tasks in
+ * Iceberg tables, such as:
+ * 1. compacting small files,
+ * 2. merging delete deltas.
+ * Currently, it executes these tasks as an alias for INSERT OVERWRITE:
+ * OPTIMIZE TABLE tbl; -->> INSERT OVERWRITE TABLE tbl SELECT * FROM tbl;
+ */
+public class OptimizeStmt extends DmlStatementBase {
+
+  // INSERT OVERWRITE statement that this OPTIMIZE statement is translated to.
+  private InsertStmt insertStmt_;
+  // Target table name as seen by the parser.
+  private final TableName originalTableName_;
+  // Target table that should be compacted. May be qualified by analyze().
+  private TableName tableName_;
+
+  public OptimizeStmt(TableName tableName) {
+    tableName_ = tableName;
+    originalTableName_ = tableName_;
+    List<SelectListItem> selectListItems = new ArrayList<>();
+    selectListItems.add(SelectListItem.createStarItem(null));
+    SelectList selectList = new SelectList(selectListItems);
+    List<TableRef> tableRefs = new ArrayList<>();
+    tableRefs.add(new TableRef(tableName.toPath(), null));
+    QueryStmt queryStmt = new SelectStmt(selectList, new FromClause(tableRefs), null,
+        null, null, null, null);
+    insertStmt_ = new InsertStmt(null, tableName, true, null, null, null, queryStmt,
+        null, false);
+  }
+
+  @Override
+  public void analyze(Analyzer analyzer) throws AnalysisException {
+    if (isAnalyzed()) return;
+    super.analyze(analyzer);
+    insertStmt_.analyze(analyzer);
+
+    Preconditions.checkState(table_ == null);
+    if (!tableName_.isFullyQualified()) {
+      tableName_ = new TableName(analyzer.getDefaultDb(), tableName_.getTbl());
+    }
+    table_ = analyzer.getTable(tableName_, Privilege.ALL);
+    Preconditions.checkState(table_ == insertStmt_.getTargetTable());
+    if (!(table_ instanceof FeIcebergTable)) {
+      throw new AnalysisException("OPTIMIZE is only supported for Iceberg tables.");
+    }
+  }
+
+  @Override
+  public void reset() {
+    super.reset();
+    tableName_ = originalTableName_;
+    insertStmt_.reset();
+  }
+
+
+  public InsertStmt getInsertStmt() { return insertStmt_; }
+
+  @Override
+  public List<Expr> getPartitionKeyExprs() {
+    return insertStmt_.getPartitionKeyExprs();
+  }
+
+  @Override
+  public List<Expr> getSortExprs() {
+    return insertStmt_.getSortExprs();
+  }
+
+  @Override
+  public void collectTableRefs(List<TableRef> tblRefs) {
+    insertStmt_.collectTableRefs(tblRefs);
+  }
+
+  @Override
+  public void rewriteExprs(ExprRewriter rewriter) throws AnalysisException {
+    Preconditions.checkState(isAnalyzed());
+    insertStmt_.rewriteExprs(rewriter);
+  }
+
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
index 802b73893..03418e578 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlannerContext.java
@@ -103,7 +103,9 @@ public class PlannerContext {
   public PlanNodeId getNextNodeId() { return nodeIdGenerator_.getNextId(); }
   public PlanFragmentId getNextFragmentId() { return fragmentIdGenerator_.getNextId(); }
   public boolean isInsertOrCtas() {
-    return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt();
+    //TODO: IMPALA-12412: remove isOptimizeStmt().
+    return analysisResult_.isInsertStmt() || analysisResult_.isCreateTableAsSelectStmt()
+        || analysisResult_.isOptimizeStmt();
   }
   public boolean isInsert() { return analysisResult_.isInsertStmt(); }
   public boolean isCtas() { return analysisResult_.isCreateTableAsSelectStmt(); }
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 398a14854..2821dfc5e 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -2381,7 +2381,8 @@ public class Frontend {
         }
       }
       if (!analysisResult.isExplainStmt() &&
-          (analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())) {
+          (analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt()
+          || analysisResult.isOptimizeStmt())) {
         InsertStmt insertStmt = analysisResult.getInsertStmt();
         FeTable targetTable = insertStmt.getTargetTable();
         if (AcidUtils.isTransactionalTable(
@@ -2508,7 +2509,7 @@ public class Frontend {
         result.query_exec_request.stmt_type = result.stmt_type;
         // fill in the metadata
         result.setResult_set_metadata(createQueryResultSetMetadata(analysisResult));
-      } else if (analysisResult.isInsertStmt() ||
+      } else if (analysisResult.isInsertStmt() || analysisResult.isOptimizeStmt() ||
           analysisResult.isCreateTableAsSelectStmt()) {
         // For CTAS the overall TExecRequest statement type is DDL, but the
         // query_exec_request should be DML
diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex
index 45cf79da9..481a7bd2f 100644
--- a/fe/src/main/jflex/sql-scanner.flex
+++ b/fe/src/main/jflex/sql-scanner.flex
@@ -198,6 +198,7 @@ import org.apache.impala.thrift.TReservedWordsVersion;
     keywordMap.put("of", SqlParserSymbols.KW_OF);
     keywordMap.put("offset", SqlParserSymbols.KW_OFFSET);
     keywordMap.put("on", SqlParserSymbols.KW_ON);
+    keywordMap.put("optimize", SqlParserSymbols.KW_OPTIMIZE);
     keywordMap.put("or", SqlParserSymbols.KW_OR);
     keywordMap.put("||", SqlParserSymbols.KW_LOGICAL_OR);
     keywordMap.put("orc", SqlParserSymbols.KW_ORC);
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index e7c5214ff..d8972bc7a 100755
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3579,8 +3579,8 @@ public class ParserTest extends FrontendTestBase {
         "^\n" +
         "Encountered: IDENTIFIER\n" +
         "Expected: ALTER, COMMENT, COMPUTE, COPY, CREATE, DELETE, DESCRIBE, DROP, " +
-            "EXPLAIN, GRANT, INSERT, INVALIDATE, LOAD, REFRESH, REVOKE, SELECT, SET, " +
-            "SHOW, TRUNCATE, UNSET, UPDATE, UPSERT, USE, VALUES, WITH\n");
+            "EXPLAIN, GRANT, INSERT, INVALIDATE, LOAD, OPTIMIZE, REFRESH, REVOKE, " +
+            "SELECT, SET, SHOW, TRUNCATE, UNSET, UPDATE, UPSERT, USE, VALUES, WITH\n");
 
     // missing select list
     ParserError("select from t",
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test b/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
index 76c8a81c4..a6dbe3f93 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/insert-sort-by-zorder.test
@@ -389,3 +389,30 @@ WRITE TO HDFS [test_sort_by_zorder.t, OVERWRITE=false, PARTITION-KEYS=(b.`year`,
    HDFS partitions=24/24 files=24 size=478.45KB
    runtime filters: RF000 -> b.id
    row-size=12B cardinality=7.30K
+====
+# IMPALA-12293: Verify that OPTIMIZE respects the sorting properties.
+optimize table functional_parquet.iceberg_partition_transforms_zorder
+---- PLAN
+WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=true, PARTITION-KEYS=(year(functional_parquet.iceberg_partition_transforms_zorder.ts),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))]
+|  partitions=unavailable
+|
+01:SORT
+|  order by: LEXICAL: year(functional_parquet.iceberg_partition_transforms_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: i, j
+|  row-size=44B cardinality=0
+|
+00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder]
+   HDFS partitions=1/1 files=0 size=0B
+   row-size=36B cardinality=0
+---- DISTRIBUTEDPLAN
+WRITE TO HDFS [functional_parquet.iceberg_partition_transforms_zorder, OVERWRITE=true, PARTITION-KEYS=(year(functional_parquet.iceberg_partition_transforms_zorder.ts),iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5))]
+|  partitions=unavailable
+|
+02:SORT
+|  order by: LEXICAL: year(functional_parquet.iceberg_partition_transforms_zorder.ts) ASC NULLS LAST, iceberg_bucket_transform(functional_parquet.iceberg_partition_transforms_zorder.s, 5) ASC NULLS LAST, ZORDER: i, j
+|  row-size=44B cardinality=0
+|
+01:EXCHANGE [UNPARTITIONED]
+|
+00:SCAN HDFS [functional_parquet.iceberg_partition_transforms_zorder]
+   HDFS partitions=1/1 files=0 size=0B
+   row-size=36B cardinality=0
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 9d9cc820e..7dfc2c287 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -779,9 +779,25 @@ delete from ice_delete where i = 1;
 AnalysisException: Impala can only write delete files in PARQUET, but the given table uses a different file format: $DATABASE.ice_delete
 ====
 ---- QUERY
-# Cannot delete from Iceberg table is write mode is not 'merge-on-read'
+# Cannot delete from Iceberg table if write mode is not 'merge-on-read'
 alter table ice_delete set tblproperties ('write.delete.format.default'='PARQUET', 'write.delete.mode'='copy-on-write');
 delete from ice_delete where i = 1;
 ---- CATCH
 AnalysisException: Unsupported delete mode: 'copy-on-write' for Iceberg table: $DATABASE.ice_delete
 ====
+---- QUERY
+optimize table non_iceberg_table;
+---- CATCH
+AnalysisException: OPTIMIZE is only supported for Iceberg tables.
+====
+---- QUERY
+optimize table iceberg_overwrite_bucket;
+---- CATCH
+AnalysisException: The Iceberg table has multiple partition specs. This means the outcome of dynamic partition overwrite is unforeseeable. Consider using TRUNCATE then INSERT INTO from the previous snapshot to overwrite your table.
+====
+---- QUERY
+CREATE TABLE ice_complex (id BIGINT NULL, int_array ARRAY<INT> NULL) STORED AS ICEBERG;
+optimize table ice_complex;
+---- CATCH
+AnalysisException: Unable to INSERT into target table ($DATABASE.ice_complex) because the column 'int_array' has a complex type 'ARRAY<INT>' and Impala doesn't support inserting into tables containing complex type columns
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
new file mode 100644
index 000000000..3912ce67b
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-optimize.test
@@ -0,0 +1,123 @@
+====
+---- QUERY
+CREATE TABLE ice_optimize (i int, s string)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='2');
+====
+---- QUERY
+# Insert rows one by one to write multiple small files.
+INSERT INTO ice_optimize VALUES(1, 'one');
+INSERT INTO ice_optimize VALUES(2, 'two');
+INSERT INTO ice_optimize VALUES(3, 'three');
+SHOW FILES IN ice_optimize;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+# OPTIMIZE TABLE should create 1 data file.
+OPTIMIZE TABLE ice_optimize;
+SHOW FILES IN ice_optimize;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+SELECT * FROM ice_optimize;
+---- RESULTS
+1,'one'
+2,'two'
+3,'three'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+DELETE FROM ice_optimize WHERE i = 2;
+SHOW FILES IN ice_optimize;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_SUBSET
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/delete-.*parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+# Checking that the delete file was merged and there is no delete file in the table.
+OPTIMIZE TABLE ice_optimize;
+SHOW FILES IN ice_optimize;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize/data/.*.0.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+SELECT * FROM ice_optimize;
+---- RESULTS
+1,'one'
+3,'three'
+---- TYPES
+INT,STRING
+====
+---- QUERY
+# Schema evolution should work and return correct results according to the latest schema.
+ALTER TABLE ice_optimize DROP COLUMN s;
+ALTER TABLE ice_optimize ADD COLUMN b BOOLEAN;
+INSERT INTO ice_optimize VALUES(4, true);
+OPTIMIZE TABLE ice_optimize;
+SELECT * FROM ice_optimize;
+---- RESULTS
+1,NULL
+3,NULL
+4,true
+---- TYPES
+INT,BOOLEAN
+====
+---- QUERY
+CREATE TABLE ice_optimize_part
+PARTITIONED BY(i int)
+STORED BY ICEBERG
+TBLPROPERTIES ('format-version'='1');
+====
+---- QUERY
+# Insert values into each partition to write multiple small files in each.
+INSERT INTO ice_optimize_part VALUES(1), (2), (3);
+INSERT INTO ice_optimize_part VALUES(2), (3);
+INSERT INTO ice_optimize_part VALUES(1), (3);
+SHOW FILES IN ice_optimize_part;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=1/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=1/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=2/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=2/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=3/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=3/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=3/.*.0.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
+---- QUERY
+# OPTIMIZE TABLE should create 1 data file per partition.
+OPTIMIZE TABLE ice_optimize_part;
+SHOW FILES IN ice_optimize_part;
+---- LABELS
+Path,Size,Partition,EC Policy
+---- RESULTS: VERIFY_IS_EQUAL
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=1/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=2/.*.0.parq','.*','','$ERASURECODE_POLICY'
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_optimize_part/data/i=3/.*.0.parq','.*','','$ERASURECODE_POLICY'
+---- TYPES
+STRING, STRING, STRING, STRING
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
index 50297ae76..f7e5d9dfc 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/ranger_column_masking.test
@@ -733,6 +733,12 @@ compute stats functional.alltypestiny
 AuthorizationException: User '$USER' does not have privileges to execute 'ALTER' on: functional.alltypestiny
 ====
 ---- QUERY
+# Optimize statement on masked tables should be blocked, because reading and inserting masked data would result in data loss.
+optimize table functional_parquet.iceberg_partitioned
+---- CATCH
+AuthorizationException: User '$USER' does not have privileges to execute 'INSERT' on: functional_parquet.iceberg_partitioned
+====
+---- QUERY
 # Select masked INPUT_FILE__NAME plus all cols
 select input__file__name, * from alltypestiny order by id;
 ---- RESULTS
diff --git a/tests/authorization/test_ranger.py b/tests/authorization/test_ranger.py
index 5b089bf88..936e9627c 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -1481,6 +1481,11 @@ class TestRanger(CustomClusterTestSuite):
         "input__file__name",
         "CUSTOM", "mask_show_last_n({col}, 10, 'x', 'x', 'x', -1, '1')")
       policy_cnt += 1
+      # Add column masking policy to an Iceberg table.
+      TestRanger._add_column_masking_policy(
+        unique_name + str(policy_cnt), user, "functional_parquet", "iceberg_partitioned",
+        "id", "MASK_NULL")
+      policy_cnt += 1
       self.execute_query_expect_success(admin_client, "refresh authorization",
                                         user=ADMIN)
       self.run_test_case("QueryTest/ranger_column_masking", vector,
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 02cf985d9..d6317d268 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -1301,3 +1301,34 @@ class TestIcebergV2Table(IcebergTestSuite):
     hive_output = self.run_stmt_in_hive("SELECT id FROM {} ORDER BY id".format(ice_t))
     # Test that Hive sees the same rows deleted.
     assert hive_output == "id\n4\n5\n6\n7\n8\n"
+
+  def test_optimize(self, vector, unique_database):
+    tbl_name = unique_database + ".optimize_iceberg"
+    self.execute_query("""create table {0} (i int)
+        stored as iceberg""".format(tbl_name))
+    self.execute_query("insert into {0} values (1);".format(tbl_name))
+    self.execute_query("insert into {0} values (2);".format(tbl_name))
+    self.execute_query("insert into {0} values (8);".format(tbl_name))
+    result_before_opt = self.execute_query("SELECT * FROM {}".format(tbl_name))
+    snapshots = get_snapshots(self.client, tbl_name, expected_result_size=3)
+    snapshot_before = snapshots[2]
+
+    # Check that a new snapshot is created after Iceberg table optimization.
+    self.execute_query("optimize table {0};".format(tbl_name))
+    snapshots = get_snapshots(self.client, tbl_name, expected_result_size=4)
+    snapshot_after = snapshots[3]
+    assert(snapshot_before.get_creation_time() < snapshot_after.get_creation_time())
+    # Check that the last snapshot's parent ID is the snapshot ID before 'OPTIMIZE TABLE'.
+    assert(snapshot_before.get_snapshot_id() == snapshot_after.get_parent_id())
+
+    result_after_opt = self.execute_query("SELECT * FROM {0}".format(tbl_name))
+    # Check that we get the same result from the table before and after 'OPTIMIZE TABLE'.
+    assert result_after_opt.data.sort() == result_before_opt.data.sort()
+
+    result_time_travel = self.execute_query(
+        "select * from {0} for system_version as of {1};".format(
+            tbl_name, snapshot_before.get_snapshot_id()))
+    # Check that time travel to the previous snapshot returns all results correctly.
+    assert result_after_opt.data.sort() == result_time_travel.data.sort()
+
+    self.run_test_case('QueryTest/iceberg-optimize', vector, unique_database)