You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/02/01 17:03:29 UTC

[impala] branch master updated (f086d5c -> a81c6a7)

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from f086d5c  IMPALA-10462: Include org/apache/hive/hadoop/common/type/* in impala-minimal-hive-exec
     new 646b0e0  IMPALA-10456: Implement TRUNCATE for Iceberg tables
     new a81c6a7  IMPALA-10460: Impala should write normalized paths in Iceberg manifests

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/hdfs-table-sink.cc                     | 11 ++-
 .../org/apache/impala/analysis/TruncateStmt.java   |  7 +-
 .../org/apache/impala/catalog/IcebergTable.java    |  1 +
 .../impala/catalog/local/LocalIcebergTable.java    |  5 --
 .../apache/impala/service/CatalogOpExecutor.java   | 18 ++++-
 .../impala/service/IcebergCatalogOpExecutor.java   | 13 ++++
 .../queries/QueryTest/iceberg-negative.test        |  5 --
 .../queries/QueryTest/iceberg-truncate.test        | 82 ++++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |  3 +
 9 files changed, 125 insertions(+), 20 deletions(-)
 create mode 100644 testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test


[impala] 02/02: IMPALA-10460: Impala should write normalized paths in Iceberg manifests

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a81c6a78294d1da72b57ed90ec4e365de8c4e54b
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Thu Jan 28 11:51:44 2021 +0100

    IMPALA-10460: Impala should write normalized paths in Iceberg manifests
    
    Currently Impala writes double slashes in the paths of datafiles
    for non-partitioned Iceberg tables. Unnormalized paths can cause
    problems later.
    
    This patch removes the redundant slashes.
    
    Testing:
     * Tested manually by inspecting the manifest files of the
       Iceberg tables. Used both non-partitioned and partitioned tables.
    
    Change-Id: If5ecac78102ed35710dd70a18edc71f6e891e748
    Reviewed-on: http://gerrit.cloudera.org:8080/16993
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/hdfs-table-sink.cc | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index ae4ae6e..ffc79af 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -258,9 +258,14 @@ void HdfsTableSink::BuildHdfsFileNames(
   }
   if (IsIceberg()) {
     //TODO: implement LocationProviders.
-    output_partition->final_hdfs_file_name_prefix =
-        Substitute("$0/data/$1/", table_desc_->IcebergTableLocation(),
-            output_partition->partition_name);
+    if (output_partition->partition_name.empty()) {
+      output_partition->final_hdfs_file_name_prefix =
+          Substitute("$0/data/", table_desc_->IcebergTableLocation());
+    } else {
+      output_partition->final_hdfs_file_name_prefix =
+          Substitute("$0/data/$1/", table_desc_->IcebergTableLocation(),
+              output_partition->partition_name);
+    }
   }
   output_partition->final_hdfs_file_name_prefix += query_suffix;
 


[impala] 01/02: IMPALA-10456: Implement TRUNCATE for Iceberg tables

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 646b0e011c1af93901667de987c350f60f34e62d
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed Jan 27 19:06:37 2021 +0100

    IMPALA-10456: Implement TRUNCATE for Iceberg tables
    
    This patch adds support for the TRUNCATE statement for
    Iceberg tables.
    
    The TRUNCATE operation creates a new snapshot for the target
    table that doesn't have any data files. Table and column stats
    are also cleared. This patch also fixes a bug that caused
    table/column stats not being propagated.
    
    Testing
     * added e2e tests for both partitioned and unpartitioned tables
    
    Change-Id: I6116c7c36aba871c0be79f499e0ac618072ca7b8
    Reviewed-on: http://gerrit.cloudera.org:8080/16987
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: wangsheng <sk...@163.com>
---
 .../org/apache/impala/analysis/TruncateStmt.java   |  7 +-
 .../org/apache/impala/catalog/IcebergTable.java    |  1 +
 .../impala/catalog/local/LocalIcebergTable.java    |  5 --
 .../apache/impala/service/CatalogOpExecutor.java   | 18 ++++-
 .../impala/service/IcebergCatalogOpExecutor.java   | 13 ++++
 .../queries/QueryTest/iceberg-negative.test        |  5 --
 .../queries/QueryTest/iceberg-truncate.test        | 82 ++++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |  3 +
 8 files changed, 117 insertions(+), 17 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
index ad0068b..52e7bb9 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
@@ -21,7 +21,6 @@ import java.util.List;
 
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeFsTable;
-import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TTruncateParams;
@@ -63,14 +62,10 @@ public class TruncateStmt extends StatementBase {
       if (ifExists_) return;
       throw e;
     }
-    // We only support truncating hdfs tables now, we also cannot truncate Iceberg
-    // tables.
+    // We only support truncating hdfs tables now
     if (!(table_ instanceof FeFsTable)) {
       throw new AnalysisException(String.format(
           "TRUNCATE TABLE not supported on non-HDFS table: %s", table_.getFullName()));
-    } else if (table_ instanceof FeIcebergTable) {
-      throw new AnalysisException(String.format(
-          "TRUNCATE TABLE not supported on iceberg table: %s", table_.getFullName()));
     }
     Analyzer.ensureTableNotFullAcid(table_, "TRUNCATE");
     Analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index b035e19..0cfce43 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -250,6 +250,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
     try {
       // Copy the table to check later if anything has changed.
       msTable_ = msTbl.deepCopy();
+      setTableStats(msTable_);
       // Load metadata from Iceberg
       final Timer.Context ctxStorageLdTime =
           getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA).time();
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 4701057..ff3b372 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -171,11 +171,6 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
   }
 
   @Override
-  protected void loadColumnStats() {
-    localFsTable_.loadColumnStats();
-  }
-
-  @Override
   public long snapshotId() {
     return snapshotId_;
   }
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 3841dfd..0de5c5d 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2119,7 +2119,7 @@ public class CatalogOpExecutor {
       throw e;
     }
     Preconditions.checkNotNull(table);
-    if (!(table instanceof HdfsTable)) {
+    if (!(table instanceof FeFsTable)) {
       throw new CatalogException(
           String.format("TRUNCATE TABLE not supported on non-HDFS table: %s",
           table.getFullName()));
@@ -2135,6 +2135,8 @@ public class CatalogOpExecutor {
       try {
         if (AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) {
           newCatalogVersion = truncateTransactionalTable(params, table);
+        } else if (table instanceof FeIcebergTable) {
+          newCatalogVersion = truncateIcebergTable(params, table);
         } else {
           newCatalogVersion = truncateNonTransactionalTable(params, table);
         }
@@ -2257,6 +2259,20 @@ public class CatalogOpExecutor {
     }
   }
 
+  private long truncateIcebergTable(TTruncateParams params, Table table)
+      throws Exception {
+    Preconditions.checkState(table.isWriteLockedByCurrentThread());
+    Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread());
+    Preconditions.checkState(table instanceof FeIcebergTable);
+    long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+    catalog_.getLock().writeLock().unlock();
+    FeIcebergTable iceTable = (FeIcebergTable)table;
+    dropColumnStats(table);
+    dropTableStats(table);
+    IcebergCatalogOpExecutor.truncateTable(iceTable);
+    return newCatalogVersion;
+  }
+
   private long truncateNonTransactionalTable(TTruncateParams params, Table table)
       throws Exception {
     Preconditions.checkState(table.isWriteLockedByCurrentThread());
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index ca72cbe..db44aef 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -25,11 +25,13 @@ import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
@@ -190,4 +192,15 @@ public class IcebergCatalogOpExecutor {
     }
     append.commit();
   }
+
+  /**
+   * Creates new snapshot for the iceberg table by deleting all data files.
+   */
+  public static void truncateTable(FeIcebergTable feIceTable)
+      throws ImpalaRuntimeException, TableLoadingException {
+    Table iceTable = IcebergUtil.loadTable(feIceTable);
+    DeleteFiles delete = iceTable.newDelete();
+    delete.deleteFromRowFilter(Expressions.alwaysTrue());
+    delete.commit();
+  }
 }
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index a567532..6e5e563 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -32,11 +32,6 @@ STORED AS ICEBERG
 TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
 ====
 ---- QUERY
-TRUNCATE iceberg_table_hadoop_tables
----- CATCH
-AnalysisException: TRUNCATE TABLE not supported on iceberg table: $DATABASE.iceberg_table_hadoop_tables
-====
----- QUERY
 # iceberg_non_partitioned is not partitioned
 SHOW PARTITIONS functional_parquet.iceberg_non_partitioned
 ---- CATCH
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test
new file mode 100644
index 0000000..b64f67f
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test
@@ -0,0 +1,82 @@
+====
+---- QUERY
+# Create unpartitioned Iceberg table
+create table ice_nopart (i int)
+stored as iceberg;
+insert into ice_nopart values (1), (2), (3);
+select * from ice_nopart;
+---- RESULTS
+1
+2
+3
+====
+---- QUERY
+# Column tats should be cleared by TRUNCATE.
+compute stats ice_nopart;
+show column stats ice_nopart;
+---- RESULTS
+'i','INT',3,0,4,4,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
+---- QUERY
+# TRUNCATE iceberg table
+truncate table ice_nopart;
+====
+---- QUERY
+# SELECT from truncated table
+select * from ice_nopart
+---- RESULTS
+====
+---- QUERY
+show column stats ice_nopart;
+---- RESULTS
+'i','INT',-1,-1,4,4,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
+---- QUERY
+# Create partitioned Iceberg table
+create table ice_part (i int, s string, t timestamp)
+partition by spec (t year, i bucket 10)
+stored as iceberg;
+insert into ice_part
+values (1, 'ice',  '2021-01-27 18:57:25.155746000'),
+       (2, 'berg', '2020-01-27 18:57:25.155746000');
+select * from ice_part;
+---- RESULTS
+1,'ice',2021-01-27 18:57:25.155746000
+2,'berg',2020-01-27 18:57:25.155746000
+---- TYPES
+INT,STRING,TIMESTAMP
+====
+---- QUERY
+# Column tats should be cleared by TRUNCATE.
+compute stats ice_part;
+show column stats ice_part;
+---- RESULTS
+'i','INT',2,0,4,4,-1,-1
+'s','STRING',2,0,4,3.5,-1,-1
+'t','TIMESTAMP',2,0,16,16,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
+---- QUERY
+# TRUNCATE iceberg table
+truncate table ice_part;
+====
+---- QUERY
+# SELECT from truncated table
+select * from ice_part
+---- RESULTS
+====
+---- QUERY
+# Column tats should be cleared by TRUNCATE.
+show column stats ice_part;
+---- RESULTS
+'i','INT',-1,-1,4,4,-1,-1
+'s','STRING',-1,-1,-1,-1,-1,-1
+'t','TIMESTAMP',-1,-1,16,16,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 57659c6..514a9bd 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -49,6 +49,9 @@ class TestIcebergTable(ImpalaTestSuite):
   def test_alter_iceberg_tables(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-alter', vector, use_db=unique_database)
 
+  def test_truncate_iceberg_tables(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database)
+
   @SkipIf.not_hdfs
   def test_drop_incomplete_table(self, vector, unique_database):
     """Test DROP TABLE when the underlying directory is deleted. In that case table