You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/03/03 05:45:29 UTC

[iceberg] branch master updated: Spark 3.2: Support PURGE flag in DROP TABLE (#3056)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dc245b  Spark 3.2: Support PURGE flag in DROP TABLE (#3056)
3dc245b is described below

commit 3dc245b33b0e3bf4edf93f4e6a3262eff22ac7ce
Author: Xianyang Liu <li...@hotmail.com>
AuthorDate: Thu Mar 3 13:45:15 2022 +0800

    Spark 3.2: Support PURGE flag in DROP TABLE (#3056)
---
 .../extensions/TestRemoveOrphanFilesProcedure.java |   7 +-
 .../org/apache/iceberg/spark/SparkCatalog.java     |  44 ++++++-
 .../apache/iceberg/spark/SparkSessionCatalog.java  |   7 ++
 .../apache/iceberg/spark/sql/TestDropTable.java    | 140 +++++++++++++++++++++
 4 files changed, 193 insertions(+), 5 deletions(-)

diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index d73f69c..9bc82bb 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -60,8 +60,8 @@ public class TestRemoveOrphanFilesProcedure extends SparkExtensionsTestBase {
 
   @After
   public void removeTable() {
-    sql("DROP TABLE IF EXISTS %s", tableName);
-    sql("DROP TABLE IF EXISTS p", tableName);
+    sql("DROP TABLE IF EXISTS %s PURGE", tableName);
+    sql("DROP TABLE IF EXISTS p PURGE");
   }
 
   @Test
@@ -200,6 +200,9 @@ public class TestRemoveOrphanFilesProcedure extends SparkExtensionsTestBase {
     AssertHelpers.assertThrows("Should reject call",
         ValidationException.class, "Cannot remove orphan files: GC is disabled",
         () -> sql("CALL %s.system.remove_orphan_files('%s')", catalogName, tableIdent));
+
+    // reset the property to enable the table purging in removeTable.
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('%s' 'true')", tableName, GC_ENABLED);
   }
 
   @Test
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index d80b9bf..b8e9817 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CachingCatalog;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -39,6 +40,7 @@ import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -49,6 +51,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.source.SparkTable;
 import org.apache.iceberg.spark.source.StagedSparkTable;
 import org.apache.iceberg.util.Pair;
@@ -71,6 +74,9 @@ import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
+import static org.apache.iceberg.TableProperties.GC_ENABLED;
+import static org.apache.iceberg.TableProperties.GC_ENABLED_DEFAULT;
+
 /**
  * A Spark TableCatalog implementation that wraps an Iceberg {@link Catalog}.
  * <p>
@@ -244,15 +250,47 @@ public class SparkCatalog extends BaseCatalog {
 
   @Override
   public boolean dropTable(Identifier ident) {
+    return dropTableWithoutPurging(ident);
+  }
+
+  @Override
+  public boolean purgeTable(Identifier ident) {
     try {
-      return isPathIdentifier(ident) ?
-          tables.dropTable(((PathIdentifier) ident).location()) :
-          icebergCatalog.dropTable(buildIdentifier(ident));
+      Table table = load(ident).first();
+      ValidationException.check(
+          PropertyUtil.propertyAsBoolean(table.properties(), GC_ENABLED, GC_ENABLED_DEFAULT),
+          "Cannot purge table: GC is disabled (deleting files may corrupt other tables)");
+      String metadataFileLocation = ((HasTableOperations) table).operations().current().metadataFileLocation();
+
+      boolean dropped = dropTableWithoutPurging(ident);
+
+      if (dropped) {
+        // We should check whether the metadata file exists. Because the HadoopCatalog/HadoopTables will drop the
+        // warehouse directly and ignore the `purge` argument.
+        boolean metadataFileExists = table.io().newInputFile(metadataFileLocation).exists();
+
+        if (metadataFileExists) {
+          SparkActions.get()
+              .deleteReachableFiles(metadataFileLocation)
+              .io(table.io())
+              .execute();
+        }
+      }
+
+      return dropped;
     } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
       return false;
     }
   }
 
+  private boolean dropTableWithoutPurging(Identifier ident) {
+    if (isPathIdentifier(ident)) {
+      return tables.dropTable(((PathIdentifier) ident).location(), false /* don't purge data */);
+    } else {
+      return icebergCatalog.dropTable(buildIdentifier(ident), false /* don't purge data */);
+    }
+  }
+
   @Override
   public void renameTable(Identifier from, Identifier to) throws NoSuchTableException, TableAlreadyExistsException {
     try {
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index c6310fc..8245ef1 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -238,6 +238,13 @@ public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
   }
 
   @Override
+  public boolean purgeTable(Identifier ident) {
+    // no need to check table existence to determine which catalog to use. if a table doesn't exist then both are
+    // required to return false.
+    return icebergCatalog.purgeTable(ident) || getSessionCatalog().purgeTable(ident);
+  }
+
+  @Override
   public void renameTable(Identifier from, Identifier to) throws NoSuchTableException, TableAlreadyExistsException {
     // rename is not supported by HadoopCatalog. to avoid UnsupportedOperationException for session catalog tables,
     // check table existence first to ensure that the table belongs to the Iceberg catalog.
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java
new file mode 100644
index 0000000..535cd39
--- /dev/null
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/sql/TestDropTable.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDropTable extends SparkCatalogTestBase {
+
+  public TestDropTable(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  public void createTable() {
+    sql("CREATE TABLE %s (id INT, name STRING) USING iceberg", tableName);
+    sql("INSERT INTO %s VALUES (1, 'test')", tableName);
+  }
+
+  @After
+  public void removeTable() throws IOException {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDropTable() throws IOException {
+    dropTableInternal();
+  }
+
+  @Test
+  public void testDropTableGCDisabled() throws IOException {
+    sql("ALTER TABLE %s SET TBLPROPERTIES (gc.enabled = false)", tableName);
+    dropTableInternal();
+  }
+
+  private void dropTableInternal() throws IOException {
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1, "test")), sql("SELECT * FROM %s", tableName));
+
+    List<String> manifestAndFiles = manifestsAndFiles();
+    Assert.assertEquals("There should be 2 files for manifests and files", 2, manifestAndFiles.size());
+    Assert.assertTrue("All files should be existed", checkFilesExist(manifestAndFiles, true));
+
+    sql("DROP TABLE %s", tableName);
+    Assert.assertFalse("Table should not exist", validationCatalog.tableExists(tableIdent));
+
+    if (catalogName.equals("testhadoop")) {
+      // HadoopCatalog drop table without purge will delete the base table location.
+      Assert.assertTrue("All files should be deleted", checkFilesExist(manifestAndFiles, false));
+    } else {
+      Assert.assertTrue("All files should not be deleted", checkFilesExist(manifestAndFiles, true));
+    }
+  }
+
+  @Test
+  public void testPurgeTable() throws IOException {
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1, "test")), sql("SELECT * FROM %s", tableName));
+
+    List<String> manifestAndFiles = manifestsAndFiles();
+    Assert.assertEquals("There should be 2 files for manifests and files", 2, manifestAndFiles.size());
+    Assert.assertTrue("All files should exist", checkFilesExist(manifestAndFiles, true));
+
+    sql("DROP TABLE %s PURGE", tableName);
+    Assert.assertFalse("Table should not exist", validationCatalog.tableExists(tableIdent));
+    Assert.assertTrue("All files should be deleted", checkFilesExist(manifestAndFiles, false));
+  }
+
+  @Test
+  public void testPurgeTableGCDisabled() throws IOException {
+    sql("ALTER TABLE %s SET TBLPROPERTIES (gc.enabled = false)", tableName);
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1, "test")), sql("SELECT * FROM %s", tableName));
+
+    List<String> manifestAndFiles = manifestsAndFiles();
+    Assert.assertEquals("There totally should have 2 files for manifests and files", 2, manifestAndFiles.size());
+    Assert.assertTrue("All files should be existed", checkFilesExist(manifestAndFiles, true));
+
+    AssertHelpers.assertThrows("Purge table is not allowed when GC is disabled", ValidationException.class,
+        "Cannot purge table: GC is disabled (deleting files may corrupt other tables",
+        () -> sql("DROP TABLE %s PURGE", tableName));
+
+    Assert.assertTrue("Table should not been dropped", validationCatalog.tableExists(tableIdent));
+    Assert.assertTrue("All files should not be deleted", checkFilesExist(manifestAndFiles, true));
+  }
+
+  private List<String> manifestsAndFiles() {
+    List<Object[]> files = sql("SELECT file_path FROM %s.%s", tableName, MetadataTableType.FILES);
+    List<Object[]> manifests = sql("SELECT path FROM %s.%s", tableName, MetadataTableType.MANIFESTS);
+    return Streams.concat(files.stream(), manifests.stream()).map(row -> (String) row[0]).collect(Collectors.toList());
+  }
+
+  private boolean checkFilesExist(List<String> files, boolean shouldExist) throws IOException {
+    boolean mask = !shouldExist;
+    if (files.isEmpty()) {
+      return mask;
+    }
+
+    FileSystem fs = new Path(files.get(0)).getFileSystem(hiveConf);
+    return files.stream().allMatch(file -> {
+      try {
+        return fs.exists(new Path(file)) ^ mask;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+}