You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/04/26 12:14:41 UTC

[GitHub] [iceberg] coolderli opened a new pull request #2518: Spark: Remove deletefiles when expiring snapshots.

coolderli opened a new pull request #2518:
URL: https://github.com/apache/iceberg/pull/2518


   When expiring snapshots, the deletefiles should be deleted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] coolderli commented on a change in pull request #2518: Spark: Remove deletefiles when expiring snapshots.

Posted by GitBox <gi...@apache.org>.
coolderli commented on a change in pull request #2518:
URL: https://github.com/apache/iceberg/pull/2518#discussion_r620873348



##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java
##########
@@ -1053,7 +1055,7 @@ public void testUseLocalIterator() {
     int jobsAfter = spark.sparkContext().dagScheduler().nextJobId().get();
     int totalJobsRun = jobsAfter - jobsBefore;
 
-    checkExpirationResults(1L, 1L, 2L, results);
+    checkExpirationResults(1L, 0L, 1L, 2L, results);

Review comment:
       Thanks, I will add the unit tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a change in pull request #2518: Spark: Remove deletefiles when expiring snapshots.

Posted by GitBox <gi...@apache.org>.
jackye1995 commented on a change in pull request #2518:
URL: https://github.com/apache/iceberg/pull/2518#discussion_r620837748



##########
File path: spark/src/main/java/org/apache/iceberg/actions/ExpireSnapshotsActionResult.java
##########
@@ -23,18 +23,22 @@
 public class ExpireSnapshotsActionResult {
 
   private final Long dataFilesDeleted;
+  private final Long deleteFilesDeleted;
   private final Long manifestFilesDeleted;
   private final Long manifestListsDeleted;
 
   static ExpireSnapshotsActionResult wrap(ExpireSnapshots.Result result) {
     return new ExpireSnapshotsActionResult(
         result.deletedDataFilesCount(),
+        result.deletedDeleteFilesCount(),
         result.deletedManifestsCount(),
         result.deletedManifestListsCount());
   }
 
-  public ExpireSnapshotsActionResult(Long dataFilesDeleted, Long manifestFilesDeleted, Long manifestListsDeleted) {
+  public ExpireSnapshotsActionResult(Long dataFilesDeleted, Long deleteFilesDeleted,

Review comment:
       for public class, should keep existing constructor and add new ones.

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
##########
@@ -152,14 +154,17 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
   protected Dataset<Row> buildValidDataFileDF(Table table) {
     JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
     Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));
+    return loadAllManifestFileBean(table).filter((FilterFunction<ManifestFileBean>) manifest ->
+        manifest.content() == ManifestContent.DATA)

Review comment:
       should compare using `MnaifestContent.DATA.equals(...)`

##########
File path: spark/src/main/java/org/apache/iceberg/actions/ManifestFileBean.java
##########
@@ -27,6 +27,7 @@
   private String path = null;
   private Long length = null;
   private Integer partitionSpecId = null;
+  private Integer content = null;

Review comment:
       why not just use `ManifestContent` instead of `Integer`?

##########
File path: spark/src/test/java/org/apache/iceberg/actions/TestExpireSnapshotsAction.java
##########
@@ -1053,7 +1055,7 @@ public void testUseLocalIterator() {
     int jobsAfter = spark.sparkContext().dagScheduler().nextJobId().get();
     int totalJobsRun = jobsAfter - jobsBefore;
 
-    checkExpirationResults(1L, 1L, 2L, results);
+    checkExpirationResults(1L, 0L, 1L, 2L, results);

Review comment:
       I don't see a test that actually tests delete file count > 0, could you add one?

##########
File path: spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
##########
@@ -152,14 +154,17 @@ protected Table newStaticTable(TableMetadata metadata, FileIO io) {
   protected Dataset<Row> buildValidDataFileDF(Table table) {
     JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
     Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));
+    return loadAllManifestFileBean(table).filter((FilterFunction<ManifestFileBean>) manifest ->
+        manifest.content() == ManifestContent.DATA)
+        .flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path");
+  }
 
-    Dataset<ManifestFileBean> allManifests = loadMetadataTable(table, ALL_MANIFESTS)
-        .selectExpr("path", "length", "partition_spec_id as partitionSpecId", "added_snapshot_id as addedSnapshotId")
-        .dropDuplicates("path")
-        .repartition(spark.sessionState().conf().numShufflePartitions()) // avoid adaptive execution combining tasks
-        .as(Encoders.bean(ManifestFileBean.class));
-
-    return allManifests.flatMap(new ReadManifest(ioBroadcast), Encoders.STRING()).toDF("file_path");
+  protected Dataset<Row> buildValidDeleteFileDF(Table table) {
+    JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
+    Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));
+    return loadAllManifestFileBean(table).filter((FilterFunction<ManifestFileBean>) manifest ->
+        manifest.content() == ManifestContent.DELETES)

Review comment:
       should compare using `MnaifestContent.DELETES.equals(...)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org