You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/03/03 05:45:29 UTC
[iceberg] branch master updated: Spark 3.2: Support PURGE flag in DROP TABLE (#3056)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3dc245b Spark 3.2: Support PURGE flag in DROP TABLE (#3056)
3dc245b is described below
commit 3dc245b33b0e3bf4edf93f4e6a3262eff22ac7ce
Author: Xianyang Liu <li...@hotmail.com>
AuthorDate: Thu Mar 3 13:45:15 2022 +0800
Spark 3.2: Support PURGE flag in DROP TABLE (#3056)
---
.../extensions/TestRemoveOrphanFilesProcedure.java | 7 +-
.../org/apache/iceberg/spark/SparkCatalog.java | 44 ++++++-
.../apache/iceberg/spark/SparkSessionCatalog.java | 7 ++
.../apache/iceberg/spark/sql/TestDropTable.java | 140 +++++++++++++++++++++
4 files changed, 193 insertions(+), 5 deletions(-)
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index d73f69c..9bc82bb 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -60,8 +60,8 @@ public class TestRemoveOrphanFilesProcedure extends SparkExtensionsTestBase {
@After
public void removeTable() {
- sql("DROP TABLE IF EXISTS %s", tableName);
- sql("DROP TABLE IF EXISTS p", tableName);
+ sql("DROP TABLE IF EXISTS %s PURGE", tableName);
+ sql("DROP TABLE IF EXISTS p PURGE");
}
@Test
@@ -200,6 +200,9 @@ public class TestRemoveOrphanFilesProcedure extends SparkExtensionsTestBase {
AssertHelpers.assertThrows("Should reject call",
ValidationException.class, "Cannot remove orphan files: GC is disabled",
() -> sql("CALL %s.system.remove_orphan_files('%s')", catalogName, tableIdent));
+
+ // reset the property to enable the table purging in removeTable.
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, GC_ENABLED);
}
@Test
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index d80b9bf..b8e9817 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -39,6 +40,7 @@ import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -49,6 +51,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.iceberg.spark.source.StagedSparkTable;
import org.apache.iceberg.util.Pair;
@@ -71,6 +74,9 @@ import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import static org.apache.iceberg.TableProperties.GC_ENABLED;
+import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
+
/**
* A Spark TableCatalog implementation that wraps an Iceberg {@link Catalog}.
* <p>
@@ -244,15 +250,47 @@ public class SparkCatalog extends BaseCatalog {
@Override
public boolean dropTable(Identifier ident) {
+ return dropTableWithoutPurging(ident);
+ }
+
+ @Override
+ public boolean purgeTable(Identifier ident) {
try {
- return isPathIdentifier(ident) ?
- tables.dropTable(((PathIdentifier) ident).location()) :
- icebergCatalog.dropTable(buildIdentifier(ident));
+ Table table = load(ident).first();
+ ValidationException.check(
+ PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
+ "Cannot purge table: GC is disabled (deleting files may corrupt other tables)");
+ String metadataFileLocation = ((HasTableOperations) table).operations().current().metadataFileLocation();
+
+ boolean dropped = dropTableWithoutPurging(ident);
+
+ if (dropped) {
+ // We should check whether the metadata file exists. Because the HadoopCatalog/HadoopTables will drop the
+ // warehouse directly and ignore the `purge` argument.
+ boolean metadataFileExists = table.io().newInputFile(metadataFileLocation).exists();
+
+ if (metadataFileExists) {
+ SparkActions.get()
+ .deleteReachableFiles(metadataFileLocation)
+ .io(table.io())
+ .execute();
+ }
+ }
+
+ return dropped;
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
return false;
}
}
+ private boolean dropTableWithoutPurging(Identifier ident) {
+ if (isPathIdentifier(ident)) {
+ return tables.dropTable(((PathIdentifier) ident).location(), false /* don't purge data */);
+ } else {
+ return icebergCatalog.dropTable(buildIdentifier(ident), false /* don't purge data */);
+ }
+ }
+
@Override
public void renameTable(Identifier from, Identifier to) throws NoSuchTableException, TableAlreadyExistsException {
try {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index c6310fc..8245ef1 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -238,6 +238,13 @@ public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
}
@Override
+ public boolean purgeTable(Identifier ident) {
+ // no need to check table existence to determine which catalog to use. if a table doesn't exist then both are
+ // required to return false.
+ return icebergCatalog.purgeTable(ident) || getSessionCatalog().purgeTable(ident);
+ }
+
+ @Override
public void renameTable(Identifier from, Identifier to) throws NoSuchTableException, TableAlreadyExistsException {
// rename is not supported by HadoopCatalog. to avoid UnsupportedOperationException for session catalog tables,
// check table existence first to ensure that the table belongs to the Iceberg catalog.
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java
new file mode 100644
index 0000000..535cd39
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDropTable extends SparkCatalogTestBase {
+
+ public TestDropTable(String catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @Before
+ public void createTable() {
+ sql("CREATE TABLE %s (id INT, name STRING) USING iceberg", tableName);
+ sql("INSERT INTO %s VALUES (1, 'test')", tableName);
+ }
+
+ @After
+ public void removeTable() throws IOException {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testDropTable() throws IOException {
+ dropTableInternal();
+ }
+
+ @Test
+ public void testDropTableGCDisabled() throws IOException {
+ sql("ALTER TABLE %s SET TBLPROPERTIES (gc.enabled = false)", tableName);
+ dropTableInternal();
+ }
+
+ private void dropTableInternal() throws IOException {
+ assertEquals("Should have expected rows",
+ ImmutableList.of(row(1, "test")), sql("SELECT * FROM %s", tableName));
+
+ List<String> manifestAndFiles = manifestsAndFiles();
+ Assert.assertEquals("There should be 2 files for manifests and files", 2, manifestAndFiles.size());
+ Assert.assertTrue("All files should be existed", checkFilesExist(manifestAndFiles, true));
+
+ sql("DROP TABLE %s", tableName);
+ Assert.assertFalse("Table should not exist", validationCatalog.tableExists(tableIdent));
+
+ if (catalogName.equals("testhadoop")) {
+ // HadoopCatalog drop table without purge will delete the base table location.
+ Assert.assertTrue("All files should be deleted", checkFilesExist(manifestAndFiles, false));
+ } else {
+ Assert.assertTrue("All files should not be deleted", checkFilesExist(manifestAndFiles, true));
+ }
+ }
+
+ @Test
+ public void testPurgeTable() throws IOException {
+ assertEquals("Should have expected rows",
+ ImmutableList.of(row(1, "test")), sql("SELECT * FROM %s", tableName));
+
+ List<String> manifestAndFiles = manifestsAndFiles();
+ Assert.assertEquals("There should be 2 files for manifests and files", 2, manifestAndFiles.size());
+ Assert.assertTrue("All files should exist", checkFilesExist(manifestAndFiles, true));
+
+ sql("DROP TABLE %s PURGE", tableName);
+ Assert.assertFalse("Table should not exist", validationCatalog.tableExists(tableIdent));
+ Assert.assertTrue("All files should be deleted", checkFilesExist(manifestAndFiles, false));
+ }
+
+ @Test
+ public void testPurgeTableGCDisabled() throws IOException {
+ sql("ALTER TABLE %s SET TBLPROPERTIES (gc.enabled = false)", tableName);
+
+ assertEquals("Should have expected rows",
+ ImmutableList.of(row(1, "test")), sql("SELECT * FROM %s", tableName));
+
+ List<String> manifestAndFiles = manifestsAndFiles();
+ Assert.assertEquals("There totally should have 2 files for manifests and files", 2, manifestAndFiles.size());
+ Assert.assertTrue("All files should be existed", checkFilesExist(manifestAndFiles, true));
+
+ AssertHelpers.assertThrows("Purge table is not allowed when GC is disabled", ValidationException.class,
+ "Cannot purge table: GC is disabled (deleting files may corrupt other tables",
+ () -> sql("DROP TABLE %s PURGE", tableName));
+
+ Assert.assertTrue("Table should not been dropped", validationCatalog.tableExists(tableIdent));
+ Assert.assertTrue("All files should not be deleted", checkFilesExist(manifestAndFiles, true));
+ }
+
+ private List<String> manifestsAndFiles() {
+ List<Object[]> files = sql("SELECT file_path FROM %s.%s", tableName, MetadataTableType.FILES);
+ List<Object[]> manifests = sql("SELECT path FROM %s.%s", tableName, MetadataTableType.MANIFESTS);
+ return Streams.concat(files.stream(), manifests.stream()).map(row -> (String) row[0]).collect(Collectors.toList());
+ }
+
+ private boolean checkFilesExist(List<String> files, boolean shouldExist) throws IOException {
+ boolean mask = !shouldExist;
+ if (files.isEmpty()) {
+ return mask;
+ }
+
+ FileSystem fs = new Path(files.get(0)).getFileSystem(hiveConf);
+ return files.stream().allMatch(file -> {
+ try {
+ return fs.exists(new Path(file)) ^ mask;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+}