You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/05/05 16:06:10 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #4629: Spark: Add positional and equality delete file count to ExpireSnapshot results

aokolnychyi commented on code in PR #4629:
URL: https://github.com/apache/iceberg/pull/4629#discussion_r866006688


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java:
##########
@@ -243,23 +246,30 @@ private BaseExpireSnapshotsActionResult deleteFiles(Iterator<Row> expired) {
           String file = fileInfo.getString(0);
           String type = fileInfo.getString(1);
           deleteFunc.accept(file);
-          switch (type) {
-            case CONTENT_FILE:
-              dataFileCount.incrementAndGet();
-              LOG.trace("Deleted Content File: {}", file);
-              break;
-            case MANIFEST:
-              manifestCount.incrementAndGet();
-              LOG.debug("Deleted Manifest: {}", file);
-              break;
-            case MANIFEST_LIST:
-              manifestListCount.incrementAndGet();
-              LOG.debug("Deleted Manifest List: {}", file);
-              break;
+
+          if (FileContent.DATA.name().equalsIgnoreCase(type)) {
+            dataFileCount.incrementAndGet();
+            LOG.trace("Deleted Data File: {}", file);
+          } else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
+            posDeleteFileCount.incrementAndGet();
+            LOG.trace("Deleted Positional Delete File: {}", file);
+          } else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
+            eqDeleteFileCount.incrementAndGet();
+            LOG.trace("Deleted Equality Delete File: {}", file);
+          } else if (type.equalsIgnoreCase(MANIFEST)) {
+            manifestCount.incrementAndGet();
+            LOG.debug("Deleted Manifest: {}", file);
+          } else if (type.equalsIgnoreCase(MANIFEST_LIST)) {
+            manifestListCount.incrementAndGet();
+            LOG.debug("Deleted Manifest List: {}", file);
+          } else {
+            throw new ValidationException("Illegal file type: %s", type);
           }
         });
 
-    LOG.info("Deleted {} total files", dataFileCount.get() + manifestCount.get() + manifestListCount.get());
-    return new BaseExpireSnapshotsActionResult(dataFileCount.get(), manifestCount.get(), manifestListCount.get());
+    long contentFileCount = dataFileCount.get() + posDeleteFileCount.get() + eqDeleteFileCount.get();
+    LOG.info("Deleted {} total files", contentFileCount + manifestCount.get() + manifestListCount.get());
+    return new BaseExpireSnapshotsActionResult(dataFileCount.get(), posDeleteFileCount.get(),

Review Comment:
   nit: I feel like an empty line before `return` would be appropriate to separate the blocks.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -122,18 +125,25 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file locations by reading all manifests
-  protected Dataset<Row> buildValidContentFileDF(Table table) {
+  // builds a DF of delete and data file path and type by reading all manifests
+  protected Dataset<Row> buildValidContentFileTypeDF(Table table) {
     JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
-    Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));
+    Broadcast<Table> tableBroadcast = context.broadcast(SerializableTable.copyOf(table));
 
     Dataset<ManifestFileBean> allManifests = loadMetadataTable(table, ALL_MANIFESTS)
         .selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId")
         .dropDuplicates("path")
         .repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks
         .as(Encoders.bean(ManifestFileBean.class));
 
-    return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF(FILE_PATH);
+    return allManifests.flatMap(
+        new ReadManifest(tableBroadcast), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))

Review Comment:
   nit: I think the formatting is a bit off. I'd do this:
   
   ```
   return allManifests
       .flatMap(new ReadManifest(tableBroadcast), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
       .toDF(FILE_PATH, FILE_TYPE);
   ```
   
   



##########
api/src/main/java/org/apache/iceberg/actions/ExpireSnapshots.java:
##########
@@ -101,6 +101,16 @@ interface Result {
      */
     long deletedDataFilesCount();
 
+    /**
+     * Returns the number of deleted equality delete files.
+     */
+    long deletedEqualityDeleteFilesCount();
+
+    /**
+     * Returns the number of deleted positional delete files.
+     */
+    long deletedPositionalDeleteFilesCount();

Review Comment:
   I think we use `positionDeleteFiles`, not `positionalDeleteFiles`. Better be consistent.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));
+    }
+
+    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean manifest) {
+      switch (manifest.content()) {
+        case DATA:

Review Comment:
   It would be nice to use `all_files` but we can't for performance reasons. If I remember correctly, we had to migrate to the current implementation to avoid reading all manifest list files on the driver. That was slow for tables with a lot of snapshots.
   
   I think we should fix `ManifestFileBean` by adding `content` field and handling it everywhere. Then we can project it while reading.
   
   
   ```
   private static final String[] MANIFEST_FILE_BEAN_PROJECTION = {
       "path", "length", "partition_spec_id as partitionSpecId", "content", "added_snapshot_id as addedSnapshotId"
   };
   
   ...
   
   Dataset<ManifestFileBean> allManifests = loadMetadataTable(table, ALL_MANIFESTS)
       .selectExpr(MANIFEST_FILE_BEAN_PROJECTION)
       .dropDuplicates("path")
       .repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks
       .as(Encoders.bean(ManifestFileBean.class));
   ```
   



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java:
##########
@@ -52,6 +52,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
 
   private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
       new StructField("deleted_data_files_count", DataTypes.LongType, true, Metadata.empty()),
+      new StructField("deleted_positional_delete_files_count", DataTypes.LongType, true, Metadata.empty()),

Review Comment:
   This should also be `position` vs `positional`.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));
+    }
+
+    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean manifest) {
+      switch (manifest.content()) {
+        case DATA:
+          return CloseableIterator.transform(
+              ManifestFiles.read(manifest, table.getValue().io(), table.getValue().specs()).iterator(),

Review Comment:
   If I understand correctly, we can't use `readPaths` as we need to fetch the content file type. That being said, I think we have to add a projection. Right now, we will read all columns whereas we read only paths before.
   
   Will something like this work?
   
   ```
       public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean manifest) {
         FileIO io = table.getValue().io();
         Map<Integer, PartitionSpec> specs = table.getValue().specs();
         ImmutableList<String> projection = ImmutableList.of(DataFile.FILE_PATH.name(), DataFile.CONTENT.name());
   
         switch (manifest.content()) {
           case DATA:
             return CloseableIterator.transform(
                 ManifestFiles.read(manifest, io, specs).select(projection).iterator(),
                 ReadManifest::contentFileWithType);
           case DELETES:
             return CloseableIterator.transform(
                 ManifestFiles.readDeleteManifest(manifest, io, specs).select(projection).iterator(),
                 ReadManifest::contentFileWithType);
           default:
             throw new IllegalArgumentException("Unsupported manifest content type:" + manifest.content());
         }
       }
   ```



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));
+    }
+
+    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean manifest) {
+      switch (manifest.content()) {
+        case DATA:
+          return CloseableIterator.transform(
+              ManifestFiles.read(manifest, table.getValue().io(), table.getValue().specs()).iterator(),
+              ReadManifest::contentFileType);
+        case DELETES:
+          return CloseableIterator.transform(
+              ManifestFiles.readDeleteManifest(manifest, table.getValue().io(), table.getValue().specs()).iterator(),
+              ReadManifest::contentFileType);
+        default:
+          throw new IllegalArgumentException("Unsupported manifest content type:" + manifest.content());
+      }
+    }
+
+    static Tuple2<String, String> contentFileType(ContentFile file) {

Review Comment:
   nit: `contentFileWithType`?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));
+    }
+
+    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean manifest) {
+      switch (manifest.content()) {
+        case DATA:
+          return CloseableIterator.transform(
+              ManifestFiles.read(manifest, table.getValue().io(), table.getValue().specs()).iterator(),
+              ReadManifest::contentFileType);
+        case DELETES:
+          return CloseableIterator.transform(
+              ManifestFiles.readDeleteManifest(manifest, table.getValue().io(), table.getValue().specs()).iterator(),
+              ReadManifest::contentFileType);
+        default:
+          throw new IllegalArgumentException("Unsupported manifest content type:" + manifest.content());
+      }
+    }
+
+    static Tuple2<String, String> contentFileType(ContentFile file) {
+      return new Tuple2(file.path().toString(), file.content().toString());

Review Comment:
   nit: `Tuple2<>`



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -122,18 +125,25 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  // builds a DF of delete and data file locations by reading all manifests
-  protected Dataset<Row> buildValidContentFileDF(Table table) {
+  // builds a DF of delete and data file path and type by reading all manifests
+  protected Dataset<Row> buildValidContentFileTypeDF(Table table) {

Review Comment:
   nit: what about `buildValidContentFileWithTypeDF`?



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));

Review Comment:
   nit: raw type, let's use `ClosingIterator<>(...)`.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));
+    }
+
+    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean manifest) {
+      switch (manifest.content()) {
+        case DATA:
+          return CloseableIterator.transform(
+              ManifestFiles.read(manifest, table.getValue().io(), table.getValue().specs()).iterator(),
+              ReadManifest::contentFileType);
+        case DELETES:
+          return CloseableIterator.transform(
+              ManifestFiles.readDeleteManifest(manifest, table.getValue().io(), table.getValue().specs()).iterator(),
+              ReadManifest::contentFileType);
+        default:
+          throw new IllegalArgumentException("Unsupported manifest content type:" + manifest.content());
+      }
+    }
+
+    static Tuple2<String, String> contentFileType(ContentFile file) {

Review Comment:
   nit: `ContentFile<?>`



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java:
##########
@@ -176,16 +186,35 @@ protected Dataset<Row> loadMetadataTable(Table table, MetadataTableType type) {
     return SparkTableUtil.loadMetadataTable(spark, table, type);
   }
 
-  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, String> {
-    private final Broadcast<FileIO> io;
+  private static class ReadManifest implements FlatMapFunction<ManifestFileBean, Tuple2<String, String>> {
+    private final Broadcast<Table> table;
 
-    ReadManifest(Broadcast<FileIO> io) {
-      this.io = io;
+    ReadManifest(Broadcast<Table> table) {
+      this.table = table;
     }
 
     @Override
-    public Iterator<String> call(ManifestFileBean manifest) {
-      return new ClosingIterator<>(ManifestFiles.readPaths(manifest, io.getValue()).iterator());
+    public Iterator<Tuple2<String, String>> call(ManifestFileBean manifest) {
+      return new ClosingIterator(entries(manifest));
+    }
+
+    public CloseableIterator<Tuple2<String, String>> entries(ManifestFileBean manifest) {
+      switch (manifest.content()) {
+        case DATA:

Review Comment:
   I am afraid we don't project `content` in manifests-related metadata tables but we should probably fix that. I discovered that in another use case. I can submit a PR for that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org