You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/02/01 17:03:30 UTC

[impala] 01/02: IMPALA-10456: Implement TRUNCATE for Iceberg tables

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 646b0e011c1af93901667de987c350f60f34e62d
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed Jan 27 19:06:37 2021 +0100

    IMPALA-10456: Implement TRUNCATE for Iceberg tables
    
    This patch adds support for the TRUNCATE statement for
    Iceberg tables.
    
    The TRUNCATE operation creates a new snapshot for the target
    table that doesn't have any data files. Table and column stats
    are also cleared. This patch also fixes a bug that caused
    table/column stats not being propagated.
    
    Testing
     * added e2e tests for both partitioned and unpartitioned tables
    
    Change-Id: I6116c7c36aba871c0be79f499e0ac618072ca7b8
    Reviewed-on: http://gerrit.cloudera.org:8080/16987
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: wangsheng <sk...@163.com>
---
 .../org/apache/impala/analysis/TruncateStmt.java   |  7 +-
 .../org/apache/impala/catalog/IcebergTable.java    |  1 +
 .../impala/catalog/local/LocalIcebergTable.java    |  5 --
 .../apache/impala/service/CatalogOpExecutor.java   | 18 ++++-
 .../impala/service/IcebergCatalogOpExecutor.java   | 13 ++++
 .../queries/QueryTest/iceberg-negative.test        |  5 --
 .../queries/QueryTest/iceberg-truncate.test        | 82 ++++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |  3 +
 8 files changed, 117 insertions(+), 17 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
index ad0068b..52e7bb9 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
@@ -21,7 +21,6 @@ import java.util.List;
 
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeFsTable;
-import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TTruncateParams;
@@ -63,14 +62,10 @@ public class TruncateStmt extends StatementBase {
       if (ifExists_) return;
       throw e;
     }
-    // We only support truncating hdfs tables now, we also cannot truncate Iceberg
-    // tables.
+    // We only support truncating hdfs tables now
     if (!(table_ instanceof FeFsTable)) {
       throw new AnalysisException(String.format(
           "TRUNCATE TABLE not supported on non-HDFS table: %s", table_.getFullName()));
-    } else if (table_ instanceof FeIcebergTable) {
-      throw new AnalysisException(String.format(
-          "TRUNCATE TABLE not supported on iceberg table: %s", table_.getFullName()));
     }
     Analyzer.ensureTableNotFullAcid(table_, "TRUNCATE");
     Analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index b035e19..0cfce43 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -250,6 +250,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
     try {
       // Copy the table to check later if anything has changed.
       msTable_ = msTbl.deepCopy();
+      setTableStats(msTable_);
       // Load metadata from Iceberg
       final Timer.Context ctxStorageLdTime =
           getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA).time();
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 4701057..ff3b372 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -171,11 +171,6 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
   }
 
   @Override
-  protected void loadColumnStats() {
-    localFsTable_.loadColumnStats();
-  }
-
-  @Override
   public long snapshotId() {
     return snapshotId_;
   }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 3841dfd..0de5c5d 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2119,7 +2119,7 @@ public class CatalogOpExecutor {
       throw e;
     }
     Preconditions.checkNotNull(table);
-    if (!(table instanceof HdfsTable)) {
+    if (!(table instanceof FeFsTable)) {
       throw new CatalogException(
           String.format("TRUNCATE TABLE not supported on non-HDFS table: %s",
           table.getFullName()));
@@ -2135,6 +2135,8 @@ public class CatalogOpExecutor {
       try {
         if (AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) {
           newCatalogVersion = truncateTransactionalTable(params, table);
+        } else if (table instanceof FeIcebergTable) {
+          newCatalogVersion = truncateIcebergTable(params, table);
         } else {
           newCatalogVersion = truncateNonTransactionalTable(params, table);
         }
@@ -2257,6 +2259,20 @@ public class CatalogOpExecutor {
     }
   }
 
+  private long truncateIcebergTable(TTruncateParams params, Table table)
+      throws Exception {
+    Preconditions.checkState(table.isWriteLockedByCurrentThread());
+    Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread());
+    Preconditions.checkState(table instanceof FeIcebergTable);
+    long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+    catalog_.getLock().writeLock().unlock();
+    FeIcebergTable iceTable = (FeIcebergTable)table;
+    dropColumnStats(table);
+    dropTableStats(table);
+    IcebergCatalogOpExecutor.truncateTable(iceTable);
+    return newCatalogVersion;
+  }
+
   private long truncateNonTransactionalTable(TTruncateParams params, Table table)
       throws Exception {
     Preconditions.checkState(table.isWriteLockedByCurrentThread());
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index ca72cbe..db44aef 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -25,11 +25,13 @@ import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
@@ -190,4 +192,15 @@ public class IcebergCatalogOpExecutor {
     }
     append.commit();
   }
+
+  /**
+   * Creates new snapshot for the iceberg table by deleting all data files.
+   */
+  public static void truncateTable(FeIcebergTable feIceTable)
+      throws ImpalaRuntimeException, TableLoadingException {
+    Table iceTable = IcebergUtil.loadTable(feIceTable);
+    DeleteFiles delete = iceTable.newDelete();
+    delete.deleteFromRowFilter(Expressions.alwaysTrue());
+    delete.commit();
+  }
 }
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index a567532..6e5e563 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -32,11 +32,6 @@ STORED AS ICEBERG
 TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 ====
 ---- QUERY
-TRUNCATE iceberg_table_hadoop_tables
----- CATCH
-AnalysisException: TRUNCATE TABLE not supported on iceberg table: $DATABASE.iceberg_table_hadoop_tables
-====
----- QUERY
 # iceberg_non_partitioned is not partitioned
 SHOW PARTITIONS functional_parquet.iceberg_non_partitioned
 ---- CATCH
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test
new file mode 100644
index 0000000..b64f67f
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test
@@ -0,0 +1,82 @@
+====
+---- QUERY
+# Create unpartitioned Iceberg table
+create table ice_nopart (i int)
+stored as iceberg;
+insert into ice_nopart values (1), (2), (3);
+select * from ice_nopart;
+---- RESULTS
+1
+2
+3
+====
+---- QUERY
+# Column tats should be cleared by TRUNCATE.
+compute stats ice_nopart;
+show column stats ice_nopart;
+---- RESULTS
+'i','INT',3,0,4,4,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
+---- QUERY
+# TRUNCATE iceberg table
+truncate table ice_nopart;
+====
+---- QUERY
+# SELECT from truncated table
+select * from ice_nopart
+---- RESULTS
+====
+---- QUERY
+show column stats ice_nopart;
+---- RESULTS
+'i','INT',-1,-1,4,4,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
+---- QUERY
+# Create partitioned Iceberg table
+create table ice_part (i int, s string, t timestamp)
+partition by spec (t year, i bucket 10)
+stored as iceberg;
+insert into ice_part
+values (1, 'ice',  '2021-01-27 18:57:25.155746000'),
+       (2, 'berg', '2020-01-27 18:57:25.155746000');
+select * from ice_part;
+---- RESULTS
+1,'ice',2021-01-27 18:57:25.155746000
+2,'berg',2020-01-27 18:57:25.155746000
+---- TYPES
+INT,STRING,TIMESTAMP
+====
+---- QUERY
+# Column tats should be cleared by TRUNCATE.
+compute stats ice_part;
+show column stats ice_part;
+---- RESULTS
+'i','INT',2,0,4,4,-1,-1
+'s','STRING',2,0,4,3.5,-1,-1
+'t','TIMESTAMP',2,0,16,16,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
+---- QUERY
+# TRUNCATE iceberg table
+truncate table ice_part;
+====
+---- QUERY
+# SELECT from truncated table
+select * from ice_part
+---- RESULTS
+====
+---- QUERY
+# Column tats should be cleared by TRUNCATE.
+show column stats ice_part;
+---- RESULTS
+'i','INT',-1,-1,4,4,-1,-1
+'s','STRING',-1,-1,-1,-1,-1,-1
+'t','TIMESTAMP',-1,-1,16,16,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 57659c6..514a9bd 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -49,6 +49,9 @@ class TestIcebergTable(ImpalaTestSuite):
   def test_alter_iceberg_tables(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-alter', vector, use_db=unique_database)
 
+  def test_truncate_iceberg_tables(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database)
+
   @SkipIf.not_hdfs
   def test_drop_incomplete_table(self, vector, unique_database):
     """Test DROP TABLE when the underlying directory is deleted. In that case table