You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2023/02/22 18:19:01 UTC
[iceberg] branch master updated: Core: Handle statistics file clean up from expireSnapshots (#6090)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8592dee5b8 Core: Handle statistics file clean up from expireSnapshots (#6090)
8592dee5b8 is described below
commit 8592dee5b896050aaf25af3e56d82895e889ddde
Author: Ajantha Bhat <aj...@gmail.com>
AuthorDate: Wed Feb 22 23:48:54 2023 +0530
Core: Handle statistics file clean up from expireSnapshots (#6090)
---
.../org/apache/iceberg/FileCleanupStrategy.java | 23 ++++
.../org/apache/iceberg/IncrementalFileCleanup.java | 9 +-
.../org/apache/iceberg/ReachableFileCleanup.java | 5 +
.../java/org/apache/iceberg/TableMetadata.java | 1 +
.../org/apache/iceberg/TestRemoveSnapshots.java | 126 +++++++++++++++++++++
5 files changed, 161 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
index 73429a62da..c0a113898a 100644
--- a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
@@ -21,10 +21,12 @@ package org.apache.iceberg;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,4 +85,25 @@ abstract class FileCleanupStrategy {
(file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown))
.run(deleteFunc::accept);
}
+
+ protected Set<String> expiredStatisticsFilesLocations(
+ TableMetadata beforeExpiration, TableMetadata afterExpiration) {
+ Set<String> statsFileLocationsBeforeExpiration = statsFileLocations(beforeExpiration);
+ Set<String> statsFileLocationsAfterExpiration = statsFileLocations(afterExpiration);
+
+ return Sets.difference(statsFileLocationsBeforeExpiration, statsFileLocationsAfterExpiration);
+ }
+
+ private Set<String> statsFileLocations(TableMetadata tableMetadata) {
+ Set<String> statsFileLocations = Sets.newHashSet();
+
+ if (tableMetadata.statisticsFiles() != null) {
+ statsFileLocations =
+ tableMetadata.statisticsFiles().stream()
+ .map(StatisticsFile::path)
+ .collect(Collectors.toSet());
+ }
+
+ return statsFileLocations;
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
index d894dcbf36..f0a46f1576 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
@@ -27,7 +27,6 @@ import java.util.function.Consumer;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
@@ -260,10 +259,14 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration);
deleteFiles(filesToDelete, "data");
- LOG.warn("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete));
- LOG.warn("Manifests Lists to delete: {}", Joiner.on(", ").join(manifestListsToDelete));
deleteFiles(manifestsToDelete, "manifest");
deleteFiles(manifestListsToDelete, "manifest list");
+
+ if (!beforeExpiration.statisticsFiles().isEmpty()) {
+ Set<String> expiredStatisticsFilesLocations =
+ expiredStatisticsFilesLocations(beforeExpiration, afterExpiration);
+ deleteFiles(expiredStatisticsFilesLocations, "statistics files");
+ }
}
private Set<String> findFilesToDelete(
diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
index ccbee78e27..6b15e9147d 100644
--- a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
@@ -81,6 +81,11 @@ class ReachableFileCleanup extends FileCleanupStrategy {
}
deleteFiles(manifestListsToDelete, "manifest list");
+
+ if (!beforeExpiration.statisticsFiles().isEmpty()) {
+ deleteFiles(
+ expiredStatisticsFilesLocations(beforeExpiration, afterExpiration), "statistics files");
+ }
}
private Set<ManifestFile> pruneReferencedManifests(
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 2cb223db0f..f332f84fc8 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -1255,6 +1255,7 @@ public class TableMetadata implements Serializable {
if (idsToRemove.contains(snapshotId)) {
snapshotsById.remove(snapshotId);
changes.add(new MetadataUpdate.RemoveSnapshot(snapshotId));
+ removeStatistics(snapshotId);
} else {
retainedSnapshots.add(snapshot);
}
diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
index 53e5af520d..bad0635293 100644
--- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
+++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
@@ -18,9 +18,13 @@
*/
package org.apache.iceberg;
+import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@@ -28,10 +32,15 @@ import java.util.stream.Collectors;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
@@ -1234,6 +1243,81 @@ public class TestRemoveSnapshots extends TableTestBase {
.commit());
}
+ @Test
+ public void testExpireWithStatisticsFiles() throws IOException {
+ table.newAppend().appendFile(FILE_A).commit();
+ String statsFileLocation1 = statsFileLocation(table.location());
+ StatisticsFile statisticsFile1 =
+ writeStatsFile(
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().sequenceNumber(),
+ statsFileLocation1,
+ table.io());
+ commitStats(table, statisticsFile1);
+
+ table.newAppend().appendFile(FILE_B).commit();
+ String statsFileLocation2 = statsFileLocation(table.location());
+ StatisticsFile statisticsFile2 =
+ writeStatsFile(
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().sequenceNumber(),
+ statsFileLocation2,
+ table.io());
+ commitStats(table, statisticsFile2);
+ Assert.assertEquals("Should have 2 statistics file", 2, table.statisticsFiles().size());
+
+ long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis());
+ removeSnapshots(table).expireOlderThan(tAfterCommits).commit();
+
+ // only the current snapshot and its stats file should be retained
+ Assert.assertEquals("Should keep 1 snapshot", 1, Iterables.size(table.snapshots()));
+ Assertions.assertThat(table.statisticsFiles())
+ .hasSize(1)
+ .extracting(StatisticsFile::snapshotId)
+ .as("Should contain only the statistics file of snapshot2")
+ .isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId()));
+
+ Assertions.assertThat(new File(statsFileLocation1).exists()).isFalse();
+ Assertions.assertThat(new File(statsFileLocation2).exists()).isTrue();
+ }
+
+ @Test
+ public void testExpireWithStatisticsFilesWithReuse() throws IOException {
+ table.newAppend().appendFile(FILE_A).commit();
+ String statsFileLocation1 = statsFileLocation(table.location());
+ StatisticsFile statisticsFile1 =
+ writeStatsFile(
+ table.currentSnapshot().snapshotId(),
+ table.currentSnapshot().sequenceNumber(),
+ statsFileLocation1,
+ table.io());
+ commitStats(table, statisticsFile1);
+
+ table.newAppend().appendFile(FILE_B).commit();
+ // If an expired snapshot's stats file is reused for some reason by the live snapshots,
+ // that stats file should not get deleted from the file system as the live snapshots still
+ // reference it.
+ StatisticsFile statisticsFile2 =
+ reuseStatsFile(table.currentSnapshot().snapshotId(), statisticsFile1);
+ commitStats(table, statisticsFile2);
+
+ Assert.assertEquals("Should have 2 statistics file", 2, table.statisticsFiles().size());
+
+ long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis());
+ removeSnapshots(table).expireOlderThan(tAfterCommits).commit();
+
+ // only the current snapshot and its stats file (reused from previous snapshot) should be
+ // retained
+ Assert.assertEquals("Should keep 1 snapshot", 1, Iterables.size(table.snapshots()));
+ Assertions.assertThat(table.statisticsFiles())
+ .hasSize(1)
+ .extracting(StatisticsFile::snapshotId)
+ .as("Should contain only the statistics file of snapshot2")
+ .isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId()));
+ // the reused stats file should exist.
+ Assertions.assertThat(new File(statsFileLocation1).exists()).isTrue();
+ }
+
@Test
public void testFailRemovingSnapshotWhenStillReferencedByBranch() {
table.newAppend().appendFile(FILE_A).commit();
@@ -1515,4 +1599,46 @@ public class TestRemoveSnapshots extends TableTestBase {
RemoveSnapshots removeSnapshots = (RemoveSnapshots) table.expireSnapshots();
return (RemoveSnapshots) removeSnapshots.withIncrementalCleanup(incrementalCleanup);
}
+
+ private StatisticsFile writeStatsFile(
+ long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO)
+ throws IOException {
+ try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
+ puffinWriter.add(
+ new Blob(
+ "some-blob-type",
+ ImmutableList.of(1),
+ snapshotId,
+ snapshotSequenceNumber,
+ ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
+ puffinWriter.finish();
+
+ return new GenericStatisticsFile(
+ snapshotId,
+ statsLocation,
+ puffinWriter.fileSize(),
+ puffinWriter.footerSize(),
+ puffinWriter.writtenBlobsMetadata().stream()
+ .map(GenericBlobMetadata::from)
+ .collect(ImmutableList.toImmutableList()));
+ }
+ }
+
+ private StatisticsFile reuseStatsFile(long snapshotId, StatisticsFile statisticsFile) {
+ return new GenericStatisticsFile(
+ snapshotId,
+ statisticsFile.path(),
+ statisticsFile.fileSizeInBytes(),
+ statisticsFile.fileFooterSizeInBytes(),
+ statisticsFile.blobMetadata());
+ }
+
+ private void commitStats(Table table, StatisticsFile statisticsFile) {
+ table.updateStatistics().setStatistics(statisticsFile.snapshotId(), statisticsFile).commit();
+ }
+
+ private String statsFileLocation(String tableLocation) {
+ String statsFileName = "stats-file-" + UUID.randomUUID();
+ return tableLocation + "/metadata/" + statsFileName;
+ }
}