You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/02/22 06:02:56 UTC

[iceberg] branch master updated: Add test for remove orphan files with delete files (#4182)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 165b325  Add test for remove orphan files with delete files (#4182)
165b325 is described below

commit 165b325ab5c8bbc5d53993ad9f0929b4ea7e4a27
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Mon Feb 21 22:02:16 2022 -0800

    Add test for remove orphan files with delete files (#4182)
---
 .../extensions/TestExpireSnapshotsProcedure.java   | 32 +++----------
 .../extensions/TestRemoveOrphanFilesProcedure.java | 54 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/data/TestHelpers.java | 20 ++++++++
 3 files changed, 81 insertions(+), 25 deletions(-)

diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index e1f1ff0..eb97e37 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -24,23 +24,19 @@ import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.AssertHelpers;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.spark.source.SimpleRecord;
 import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Encoders;
@@ -253,10 +249,10 @@ public class TestExpireSnapshotsProcedure extends SparkExtensionsTestBase {
 
     Table table = Spark3Util.loadIcebergTable(spark, tableName);
 
-    Assert.assertEquals("Should have 1 delete manifest", 1, deleteManifests(table).size());
-    Assert.assertEquals("Should have 1 delete file", 1, deleteFiles(table).size());
-    Path deleteManifestPath = new Path(deleteManifests(table).iterator().next().path());
-    Path deleteFilePath = new Path(String.valueOf(deleteFiles(table).iterator().next().path()));
+    Assert.assertEquals("Should have 1 delete manifest", 1, TestHelpers.deleteManifests(table).size());
+    Assert.assertEquals("Should have 1 delete file", 1, TestHelpers.deleteFiles(table).size());
+    Path deleteManifestPath = new Path(TestHelpers.deleteManifests(table).iterator().next().path());
+    Path deleteFilePath = new Path(String.valueOf(TestHelpers.deleteFiles(table).iterator().next().path()));
 
     sql("CALL %s.system.rewrite_data_files(" +
             "table => '%s'," +
@@ -270,8 +266,8 @@ public class TestExpireSnapshotsProcedure extends SparkExtensionsTestBase {
     sql("INSERT INTO TABLE %s VALUES (6, 'f')", tableName); // this txn removes the file reference
     table.refresh();
 
-    Assert.assertEquals("Should have no delete manifests", 0, deleteManifests(table).size());
-    Assert.assertEquals("Should have no delete files", 0, deleteFiles(table).size());
+    Assert.assertEquals("Should have no delete manifests", 0, TestHelpers.deleteManifests(table).size());
+    Assert.assertEquals("Should have no delete files", 0, TestHelpers.deleteFiles(table).size());
 
     FileSystem localFs = FileSystem.getLocal(new Configuration());
     Assert.assertTrue("Delete manifest should still exist", localFs.exists(deleteManifestPath));
@@ -287,18 +283,4 @@ public class TestExpireSnapshotsProcedure extends SparkExtensionsTestBase {
     Assert.assertFalse("Delete manifest should be removed", localFs.exists(deleteManifestPath));
     Assert.assertFalse("Delete file should be removed", localFs.exists(deleteFilePath));
   }
-
-  private List<ManifestFile> deleteManifests(Table table) {
-    return table.currentSnapshot().deleteManifests();
-  }
-
-  private Set<DeleteFile> deleteFiles(Table table) {
-    Set<DeleteFile> deleteFiles = Sets.newHashSet();
-
-    for (FileScanTask task : table.newScan().planFiles()) {
-      deleteFiles.addAll(task.deletes());
-    }
-
-    return deleteFiles;
-  }
 }
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
index 724e17e..d73f69c 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java
@@ -24,11 +24,21 @@ import java.sql.Timestamp;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
 import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
 import org.junit.After;
 import org.junit.Assert;
@@ -303,4 +313,48 @@ public class TestRemoveOrphanFilesProcedure extends SparkExtensionsTestBase {
             "CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)",
             catalogName, tableIdent, -1));
   }
+
+  @Test
+  public void testRemoveOrphanFilesWithDeleteFiles() throws Exception {
+    sql("CREATE TABLE %s (id int, data string) USING iceberg TBLPROPERTIES" +
+        "('format-version'='2', 'write.delete.mode'='merge-on-read')", tableName);
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c"),
+        new SimpleRecord(4, "d")
+    );
+    spark.createDataset(records, Encoders.bean(SimpleRecord.class)).coalesce(1).writeTo(tableName).append();
+    sql("DELETE FROM %s WHERE id=1", tableName);
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    Assert.assertEquals("Should have 1 delete manifest", 1, TestHelpers.deleteManifests(table).size());
+    Assert.assertEquals("Should have 1 delete file", 1, TestHelpers.deleteFiles(table).size());
+    Path deleteManifestPath = new Path(TestHelpers.deleteManifests(table).iterator().next().path());
+    Path deleteFilePath = new Path(String.valueOf(TestHelpers.deleteFiles(table).iterator().next().path()));
+
+    // wait to ensure files are old enough
+    waitUntilAfter(System.currentTimeMillis());
+    Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+
+    // delete orphans
+    List<Object[]> output = sql(
+        "CALL %s.system.remove_orphan_files(" +
+            "table => '%s'," +
+            "older_than => TIMESTAMP '%s')",
+        catalogName, tableIdent, currentTimestamp);
+    Assert.assertEquals("Should be no orphan files", 0, output.size());
+
+    FileSystem localFs = FileSystem.getLocal(new Configuration());
+    Assert.assertTrue("Delete manifest should still exist", localFs.exists(deleteManifestPath));
+    Assert.assertTrue("Delete file should still exist", localFs.exists(deleteFilePath));
+
+    records.remove(new SimpleRecord(1, "a"));
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableName);
+    List<SimpleRecord> actualRecords = resultDF
+        .as(Encoders.bean(SimpleRecord.class))
+        .collectAsList();
+    Assert.assertEquals("Rows must match", records, actualRecords);
+  }
 }
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 53d5e87..e74fe6a 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -32,12 +32,18 @@ import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
@@ -693,4 +699,18 @@ public class TestHelpers {
           actualValues.isNullAt(i) ? null : actualValues.get(i, valueType));
     }
   }
+
+  public static List<ManifestFile> deleteManifests(Table table) {
+    return table.currentSnapshot().deleteManifests();
+  }
+
+  public static Set<DeleteFile> deleteFiles(Table table) {
+    Set<DeleteFile> deleteFiles = Sets.newHashSet();
+
+    for (FileScanTask task : table.newScan().planFiles()) {
+      deleteFiles.addAll(task.deletes());
+    }
+
+    return deleteFiles;
+  }
 }