You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2023/02/22 18:19:01 UTC

[iceberg] branch master updated: Core: Handle statistics file clean up from expireSnapshots (#6090)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8592dee5b8 Core: Handle statistics file clean up from expireSnapshots (#6090)
8592dee5b8 is described below

commit 8592dee5b896050aaf25af3e56d82895e889ddde
Author: Ajantha Bhat <aj...@gmail.com>
AuthorDate: Wed Feb 22 23:48:54 2023 +0530

    Core: Handle statistics file clean up from expireSnapshots (#6090)
---
 .../org/apache/iceberg/FileCleanupStrategy.java    |  23 ++++
 .../org/apache/iceberg/IncrementalFileCleanup.java |   9 +-
 .../org/apache/iceberg/ReachableFileCleanup.java   |   5 +
 .../java/org/apache/iceberg/TableMetadata.java     |   1 +
 .../org/apache/iceberg/TestRemoveSnapshots.java    | 126 +++++++++++++++++++++
 5 files changed, 161 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
index 73429a62da..c0a113898a 100644
--- a/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java
@@ -21,10 +21,12 @@ package org.apache.iceberg;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.Tasks;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,4 +85,25 @@ abstract class FileCleanupStrategy {
             (file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown))
         .run(deleteFunc::accept);
   }
+
+  protected Set<String> expiredStatisticsFilesLocations(
+      TableMetadata beforeExpiration, TableMetadata afterExpiration) {
+    Set<String> statsFileLocationsBeforeExpiration = statsFileLocations(beforeExpiration);
+    Set<String> statsFileLocationsAfterExpiration = statsFileLocations(afterExpiration);
+
+    return Sets.difference(statsFileLocationsBeforeExpiration, statsFileLocationsAfterExpiration);
+  }
+
+  private Set<String> statsFileLocations(TableMetadata tableMetadata) {
+    Set<String> statsFileLocations = Sets.newHashSet();
+
+    if (tableMetadata.statisticsFiles() != null) {
+      statsFileLocations =
+          tableMetadata.statisticsFiles().stream()
+              .map(StatisticsFile::path)
+              .collect(Collectors.toSet());
+    }
+
+    return statsFileLocations;
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
index d894dcbf36..f0a46f1576 100644
--- a/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
+++ b/core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java
@@ -27,7 +27,6 @@ import java.util.function.Consumer;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.util.PropertyUtil;
@@ -260,10 +259,14 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
         findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration);
 
     deleteFiles(filesToDelete, "data");
-    LOG.warn("Manifests to delete: {}", Joiner.on(", ").join(manifestsToDelete));
-    LOG.warn("Manifests Lists to delete: {}", Joiner.on(", ").join(manifestListsToDelete));
     deleteFiles(manifestsToDelete, "manifest");
     deleteFiles(manifestListsToDelete, "manifest list");
+
+    if (!beforeExpiration.statisticsFiles().isEmpty()) {
+      Set<String> expiredStatisticsFilesLocations =
+          expiredStatisticsFilesLocations(beforeExpiration, afterExpiration);
+      deleteFiles(expiredStatisticsFilesLocations, "statistics files");
+    }
   }
 
   private Set<String> findFilesToDelete(
diff --git a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
index ccbee78e27..6b15e9147d 100644
--- a/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
+++ b/core/src/main/java/org/apache/iceberg/ReachableFileCleanup.java
@@ -81,6 +81,11 @@ class ReachableFileCleanup extends FileCleanupStrategy {
     }
 
     deleteFiles(manifestListsToDelete, "manifest list");
+
+    if (!beforeExpiration.statisticsFiles().isEmpty()) {
+      deleteFiles(
+          expiredStatisticsFilesLocations(beforeExpiration, afterExpiration), "statistics files");
+    }
   }
 
   private Set<ManifestFile> pruneReferencedManifests(
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 2cb223db0f..f332f84fc8 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -1255,6 +1255,7 @@ public class TableMetadata implements Serializable {
         if (idsToRemove.contains(snapshotId)) {
           snapshotsById.remove(snapshotId);
           changes.add(new MetadataUpdate.RemoveSnapshot(snapshotId));
+          removeStatistics(snapshotId);
         } else {
           retainedSnapshots.add(snapshot);
         }
diff --git a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
index 53e5af520d..bad0635293 100644
--- a/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
+++ b/core/src/test/java/org/apache/iceberg/TestRemoveSnapshots.java
@@ -18,9 +18,13 @@
  */
 package org.apache.iceberg;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -28,10 +32,15 @@ import java.util.stream.Collectors;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.puffin.Blob;
+import org.apache.iceberg.puffin.Puffin;
+import org.apache.iceberg.puffin.PuffinWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
@@ -1234,6 +1243,81 @@ public class TestRemoveSnapshots extends TableTestBase {
                 .commit());
   }
 
+  @Test
+  public void testExpireWithStatisticsFiles() throws IOException {
+    table.newAppend().appendFile(FILE_A).commit();
+    String statsFileLocation1 = statsFileLocation(table.location());
+    StatisticsFile statisticsFile1 =
+        writeStatsFile(
+            table.currentSnapshot().snapshotId(),
+            table.currentSnapshot().sequenceNumber(),
+            statsFileLocation1,
+            table.io());
+    commitStats(table, statisticsFile1);
+
+    table.newAppend().appendFile(FILE_B).commit();
+    String statsFileLocation2 = statsFileLocation(table.location());
+    StatisticsFile statisticsFile2 =
+        writeStatsFile(
+            table.currentSnapshot().snapshotId(),
+            table.currentSnapshot().sequenceNumber(),
+            statsFileLocation2,
+            table.io());
+    commitStats(table, statisticsFile2);
+    Assert.assertEquals("Should have 2 statistics file", 2, table.statisticsFiles().size());
+
+    long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis());
+    removeSnapshots(table).expireOlderThan(tAfterCommits).commit();
+
+    // only the current snapshot and its stats file should be retained
+    Assert.assertEquals("Should keep 1 snapshot", 1, Iterables.size(table.snapshots()));
+    Assertions.assertThat(table.statisticsFiles())
+        .hasSize(1)
+        .extracting(StatisticsFile::snapshotId)
+        .as("Should contain only the statistics file of snapshot2")
+        .isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId()));
+
+    Assertions.assertThat(new File(statsFileLocation1).exists()).isFalse();
+    Assertions.assertThat(new File(statsFileLocation2).exists()).isTrue();
+  }
+
+  @Test
+  public void testExpireWithStatisticsFilesWithReuse() throws IOException {
+    table.newAppend().appendFile(FILE_A).commit();
+    String statsFileLocation1 = statsFileLocation(table.location());
+    StatisticsFile statisticsFile1 =
+        writeStatsFile(
+            table.currentSnapshot().snapshotId(),
+            table.currentSnapshot().sequenceNumber(),
+            statsFileLocation1,
+            table.io());
+    commitStats(table, statisticsFile1);
+
+    table.newAppend().appendFile(FILE_B).commit();
+    // If an expired snapshot's stats file is reused for some reason by the live snapshots,
+    // that stats file should not get deleted from the file system as the live snapshots still
+    // reference it.
+    StatisticsFile statisticsFile2 =
+        reuseStatsFile(table.currentSnapshot().snapshotId(), statisticsFile1);
+    commitStats(table, statisticsFile2);
+
+    Assert.assertEquals("Should have 2 statistics file", 2, table.statisticsFiles().size());
+
+    long tAfterCommits = waitUntilAfter(table.currentSnapshot().timestampMillis());
+    removeSnapshots(table).expireOlderThan(tAfterCommits).commit();
+
+    // only the current snapshot and its stats file (reused from previous snapshot) should be
+    // retained
+    Assert.assertEquals("Should keep 1 snapshot", 1, Iterables.size(table.snapshots()));
+    Assertions.assertThat(table.statisticsFiles())
+        .hasSize(1)
+        .extracting(StatisticsFile::snapshotId)
+        .as("Should contain only the statistics file of snapshot2")
+        .isEqualTo(Lists.newArrayList(statisticsFile2.snapshotId()));
+    // the reused stats file should exist.
+    Assertions.assertThat(new File(statsFileLocation1).exists()).isTrue();
+  }
+
   @Test
   public void testFailRemovingSnapshotWhenStillReferencedByBranch() {
     table.newAppend().appendFile(FILE_A).commit();
@@ -1515,4 +1599,46 @@ public class TestRemoveSnapshots extends TableTestBase {
     RemoveSnapshots removeSnapshots = (RemoveSnapshots) table.expireSnapshots();
     return (RemoveSnapshots) removeSnapshots.withIncrementalCleanup(incrementalCleanup);
   }
+
+  private StatisticsFile writeStatsFile(
+      long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO)
+      throws IOException {
+    try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
+      puffinWriter.add(
+          new Blob(
+              "some-blob-type",
+              ImmutableList.of(1),
+              snapshotId,
+              snapshotSequenceNumber,
+              ByteBuffer.wrap("blob content".getBytes(StandardCharsets.UTF_8))));
+      puffinWriter.finish();
+
+      return new GenericStatisticsFile(
+          snapshotId,
+          statsLocation,
+          puffinWriter.fileSize(),
+          puffinWriter.footerSize(),
+          puffinWriter.writtenBlobsMetadata().stream()
+              .map(GenericBlobMetadata::from)
+              .collect(ImmutableList.toImmutableList()));
+    }
+  }
+
+  private StatisticsFile reuseStatsFile(long snapshotId, StatisticsFile statisticsFile) {
+    return new GenericStatisticsFile(
+        snapshotId,
+        statisticsFile.path(),
+        statisticsFile.fileSizeInBytes(),
+        statisticsFile.fileFooterSizeInBytes(),
+        statisticsFile.blobMetadata());
+  }
+
+  private void commitStats(Table table, StatisticsFile statisticsFile) {
+    table.updateStatistics().setStatistics(statisticsFile.snapshotId(), statisticsFile).commit();
+  }
+
+  private String statsFileLocation(String tableLocation) {
+    String statsFileName = "stats-file-" + UUID.randomUUID();
+    return tableLocation + "/metadata/" + statsFileName;
+  }
 }