You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/10/11 22:00:04 UTC

[GitHub] [iceberg] szehon-ho commented on a diff in pull request #3457: Spark: Improve performance of expire snapshot by not double-scanning retained Snapshots

szehon-ho commented on code in PR #3457:
URL: https://github.com/apache/iceberg/pull/3457#discussion_r992812006


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -151,17 +160,27 @@ protected Dataset<FileInfo> contentFileDS(Table table) {
             .repartition(numShufflePartitions) // avoid adaptive execution combining tasks
             .as(ManifestFileBean.ENCODER);
 
-    return allManifests.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
+    return allManifestsBean.flatMap(new ReadManifest(tableBroadcast), FileInfo.ENCODER);
   }
 
   protected Dataset<FileInfo> manifestDS(Table table) {
-    return loadMetadataTable(table, ALL_MANIFESTS)
-        .select(col("path"), lit(MANIFEST).as("type"))
-        .as(FileInfo.ENCODER);
+    return manifestDS(table, null);
+  }
+
+  protected Dataset<FileInfo> manifestDS(Table table, Set<Long> snapshots) {
+    Dataset<Row> allManifests = loadMetadataTable(table, ALL_MANIFESTS);
+    if (snapshots != null) {
+      allManifests = filterAllManifests(allManifests, snapshots);

Review Comment:
   Good point, renamed.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -296,6 +315,11 @@ public long totalFilesCount() {
     }
   }
 
+  protected Dataset<Row> filterAllManifests(Dataset<Row> allManifestDF, Set<Long> snapshots) {

Review Comment:
   Refactored snapshots -> snapshotIds in these files



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java:
##########
@@ -808,4 +810,11 @@ public static Set<DeleteFile> deleteFiles(Table table) {
 
     return deleteFiles;
   }
+
+  public static Set<String> reachableManifestPaths(Table table) {
+    return StreamSupport.stream(table.snapshots().spliterator(), false)

Review Comment:
   Not sure if I miss something, but looks like that is for data files of a manifest, while this is for listing manifests?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -133,14 +135,21 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file path and type by reading all manifests
   protected Dataset<FileInfo> contentFileDS(Table table) {
-    Table serializableTable = SerializableTableWithSize.copyOf(table);
-    Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
+    return contentFileDS(table, null);
+  }
+
+  protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshots) {
+    Broadcast<Table> tableBroadcast =
+        sparkContext.broadcast(SerializableTableWithSize.copyOf(table));

Review Comment:
   Reverted



##########
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java:
##########
@@ -1250,4 +1252,70 @@ public void testExpireAfterExecute() {
     List<Row> untypedExpiredFiles = action.expire().collectAsList();
     Assert.assertEquals("Expired results must match", 1, untypedExpiredFiles.size());
   }
+
+  @Test
+  public void testExpireFileDeletionMostExpired() {
+    testExpireFilesAreDeleted(5, 2);
+  }
+
+  @Test
+  public void testExpireFileDeletionMostRetained() {
+    testExpireFilesAreDeleted(2, 5);
+  }
+
+  public void testExpireFilesAreDeleted(int dataFilesExpired, int dataFilesRetained) {

Review Comment:
   You are right, added test below



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -133,14 +135,21 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file path and type by reading all manifests
   protected Dataset<FileInfo> contentFileDS(Table table) {
-    Table serializableTable = SerializableTableWithSize.copyOf(table);
-    Broadcast<Table> tableBroadcast = sparkContext.broadcast(serializableTable);
+    return contentFileDS(table, null);
+  }
+
+  protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshots) {
+    Broadcast<Table> tableBroadcast =
+        sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
     int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();
 
-    Dataset<ManifestFileBean> allManifests =
-        loadMetadataTable(table, ALL_MANIFESTS)
+    Dataset<Row> allManifests = loadMetadataTable(table, ALL_MANIFESTS);
+    if (snapshots != null) {
+      allManifests = filterAllManifests(allManifests, snapshots);
+    }
+    Dataset<ManifestFileBean> allManifestsBean =

Review Comment:
   Added



##########
core/src/main/java/org/apache/iceberg/ReachableFileUtil.java:
##########
@@ -103,14 +106,24 @@ private static TableMetadata findFirstExistentPreviousMetadata(
    * @return the location of manifest Lists
    */
   public static List<String> manifestListLocations(Table table) {
-    Iterable<Snapshot> snapshots = table.snapshots();
-    List<String> manifestListLocations = Lists.newArrayList();
-    for (Snapshot snapshot : snapshots) {
-      String manifestListLocation = snapshot.manifestListLocation();
-      if (manifestListLocation != null) {
-        manifestListLocations.add(manifestListLocation);
-      }
+    return manifestListLocations(table, null);
+  }
+
+  /**
+   * Returns locations of manifest lists in a table.
+   *
+   * @param table table for which manifestList needs to be fetched
+   * @param snapshots ids of snapshots for which manifest lists will be returned
+   * @return the location of manifest Lists
+   */
+  public static List<String> manifestListLocations(Table table, Set<Long> snapshots) {
+    Stream<Snapshot> snapshotStream = StreamSupport.stream(table.snapshots().spliterator(), false);

Review Comment:
   Done, used Iterables.filter and restored the old for loop



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org