You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/02/01 17:03:30 UTC
[impala] 01/02: IMPALA-10456: Implement TRUNCATE for Iceberg tables
This is an automated email from the ASF dual-hosted git repository.
boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 646b0e011c1af93901667de987c350f60f34e62d
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Wed Jan 27 19:06:37 2021 +0100
IMPALA-10456: Implement TRUNCATE for Iceberg tables
This patch adds support for the TRUNCATE statement for
Iceberg tables.
The TRUNCATE operation creates a new snapshot for the target
table that doesn't have any data files. Table and column stats
are also cleared. This patch also fixes a bug that caused
table/column stats not being propagated.
Testing
* added e2e tests for both partitioned and unpartitioned tables
Change-Id: I6116c7c36aba871c0be79f499e0ac618072ca7b8
Reviewed-on: http://gerrit.cloudera.org:8080/16987
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: wangsheng <sk...@163.com>
---
.../org/apache/impala/analysis/TruncateStmt.java | 7 +-
.../org/apache/impala/catalog/IcebergTable.java | 1 +
.../impala/catalog/local/LocalIcebergTable.java | 5 --
.../apache/impala/service/CatalogOpExecutor.java | 18 ++++-
.../impala/service/IcebergCatalogOpExecutor.java | 13 ++++
.../queries/QueryTest/iceberg-negative.test | 5 --
.../queries/QueryTest/iceberg-truncate.test | 82 ++++++++++++++++++++++
tests/query_test/test_iceberg.py | 3 +
8 files changed, 117 insertions(+), 17 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
index ad0068b..52e7bb9 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TruncateStmt.java
@@ -21,7 +21,6 @@ import java.util.List;
import org.apache.impala.authorization.Privilege;
import org.apache.impala.catalog.FeFsTable;
-import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.FeTable;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.thrift.TTruncateParams;
@@ -63,14 +62,10 @@ public class TruncateStmt extends StatementBase {
if (ifExists_) return;
throw e;
}
- // We only support truncating hdfs tables now, we also cannot truncate Iceberg
- // tables.
+ // We only support truncating hdfs tables now
if (!(table_ instanceof FeFsTable)) {
throw new AnalysisException(String.format(
"TRUNCATE TABLE not supported on non-HDFS table: %s", table_.getFullName()));
- } else if (table_ instanceof FeIcebergTable) {
- throw new AnalysisException(String.format(
- "TRUNCATE TABLE not supported on iceberg table: %s", table_.getFullName()));
}
Analyzer.ensureTableNotFullAcid(table_, "TRUNCATE");
Analyzer.checkTableCapability(table_, Analyzer.OperationType.WRITE);
diff --git a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
index b035e19..0cfce43 100644
--- a/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/IcebergTable.java
@@ -250,6 +250,7 @@ public class IcebergTable extends Table implements FeIcebergTable {
try {
// Copy the table to check later if anything has changed.
msTable_ = msTbl.deepCopy();
+ setTableStats(msTable_);
// Load metadata from Iceberg
final Timer.Context ctxStorageLdTime =
getMetrics().getTimer(Table.LOAD_DURATION_STORAGE_METADATA).time();
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
index 4701057..ff3b372 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalIcebergTable.java
@@ -171,11 +171,6 @@ public class LocalIcebergTable extends LocalTable implements FeIcebergTable {
}
@Override
- protected void loadColumnStats() {
- localFsTable_.loadColumnStats();
- }
-
- @Override
public long snapshotId() {
return snapshotId_;
}
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index 3841dfd..0de5c5d 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -2119,7 +2119,7 @@ public class CatalogOpExecutor {
throw e;
}
Preconditions.checkNotNull(table);
- if (!(table instanceof HdfsTable)) {
+ if (!(table instanceof FeFsTable)) {
throw new CatalogException(
String.format("TRUNCATE TABLE not supported on non-HDFS table: %s",
table.getFullName()));
@@ -2135,6 +2135,8 @@ public class CatalogOpExecutor {
try {
if (AcidUtils.isTransactionalTable(table.getMetaStoreTable().getParameters())) {
newCatalogVersion = truncateTransactionalTable(params, table);
+ } else if (table instanceof FeIcebergTable) {
+ newCatalogVersion = truncateIcebergTable(params, table);
} else {
newCatalogVersion = truncateNonTransactionalTable(params, table);
}
@@ -2257,6 +2259,20 @@ public class CatalogOpExecutor {
}
}
+ private long truncateIcebergTable(TTruncateParams params, Table table)
+ throws Exception {
+ Preconditions.checkState(table.isWriteLockedByCurrentThread());
+ Preconditions.checkState(catalog_.getLock().isWriteLockedByCurrentThread());
+ Preconditions.checkState(table instanceof FeIcebergTable);
+ long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
+ catalog_.getLock().writeLock().unlock();
+ FeIcebergTable iceTable = (FeIcebergTable)table;
+ dropColumnStats(table);
+ dropTableStats(table);
+ IcebergCatalogOpExecutor.truncateTable(iceTable);
+ return newCatalogVersion;
+ }
+
private long truncateNonTransactionalTable(TTruncateParams params, Table table)
throws Exception {
Preconditions.checkState(table.isWriteLockedByCurrentThread());
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index ca72cbe..db44aef 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -25,11 +25,13 @@ import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
@@ -190,4 +192,15 @@ public class IcebergCatalogOpExecutor {
}
append.commit();
}
+
+ /**
+ * Creates new snapshot for the iceberg table by deleting all data files.
+ */
+ public static void truncateTable(FeIcebergTable feIceTable)
+ throws ImpalaRuntimeException, TableLoadingException {
+ Table iceTable = IcebergUtil.loadTable(feIceTable);
+ DeleteFiles delete = iceTable.newDelete();
+ delete.deleteFromRowFilter(Expressions.alwaysTrue());
+ delete.commit();
+ }
}
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index a567532..6e5e563 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -32,11 +32,6 @@ STORED AS ICEBERG
TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
====
---- QUERY
-TRUNCATE iceberg_table_hadoop_tables
----- CATCH
-AnalysisException: TRUNCATE TABLE not supported on iceberg table: $DATABASE.iceberg_table_hadoop_tables
-====
----- QUERY
# iceberg_non_partitioned is not partitioned
SHOW PARTITIONS functional_parquet.iceberg_non_partitioned
---- CATCH
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test
new file mode 100644
index 0000000..b64f67f
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-truncate.test
@@ -0,0 +1,82 @@
+====
+---- QUERY
+# Create unpartitioned Iceberg table
+create table ice_nopart (i int)
+stored as iceberg;
+insert into ice_nopart values (1), (2), (3);
+select * from ice_nopart;
+---- RESULTS
+1
+2
+3
+====
+---- QUERY
+# Column tats should be cleared by TRUNCATE.
+compute stats ice_nopart;
+show column stats ice_nopart;
+---- RESULTS
+'i','INT',3,0,4,4,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
+---- QUERY
+# TRUNCATE iceberg table
+truncate table ice_nopart;
+====
+---- QUERY
+# SELECT from truncated table
+select * from ice_nopart
+---- RESULTS
+====
+---- QUERY
+show column stats ice_nopart;
+---- RESULTS
+'i','INT',-1,-1,4,4,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
+---- QUERY
+# Create partitioned Iceberg table
+create table ice_part (i int, s string, t timestamp)
+partition by spec (t year, i bucket 10)
+stored as iceberg;
+insert into ice_part
+values (1, 'ice', '2021-01-27 18:57:25.155746000'),
+ (2, 'berg', '2020-01-27 18:57:25.155746000');
+select * from ice_part;
+---- RESULTS
+1,'ice',2021-01-27 18:57:25.155746000
+2,'berg',2020-01-27 18:57:25.155746000
+---- TYPES
+INT,STRING,TIMESTAMP
+====
+---- QUERY
+# Column tats should be cleared by TRUNCATE.
+compute stats ice_part;
+show column stats ice_part;
+---- RESULTS
+'i','INT',2,0,4,4,-1,-1
+'s','STRING',2,0,4,3.5,-1,-1
+'t','TIMESTAMP',2,0,16,16,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
+---- QUERY
+# TRUNCATE iceberg table
+truncate table ice_part;
+====
+---- QUERY
+# SELECT from truncated table
+select * from ice_part
+---- RESULTS
+====
+---- QUERY
+# Column tats should be cleared by TRUNCATE.
+show column stats ice_part;
+---- RESULTS
+'i','INT',-1,-1,4,4,-1,-1
+'s','STRING',-1,-1,-1,-1,-1,-1
+'t','TIMESTAMP',-1,-1,16,16,-1,-1
+---- TYPES
+STRING, STRING, BIGINT, BIGINT, BIGINT, DOUBLE, BIGINT, BIGINT
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 57659c6..514a9bd 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -49,6 +49,9 @@ class TestIcebergTable(ImpalaTestSuite):
def test_alter_iceberg_tables(self, vector, unique_database):
self.run_test_case('QueryTest/iceberg-alter', vector, use_db=unique_database)
+ def test_truncate_iceberg_tables(self, vector, unique_database):
+ self.run_test_case('QueryTest/iceberg-truncate', vector, use_db=unique_database)
+
@SkipIf.not_hdfs
def test_drop_incomplete_table(self, vector, unique_database):
"""Test DROP TABLE when the underlying directory is deleted. In that case table