You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/08/12 15:43:06 UTC

[iceberg] branch master updated: Spark 3.1: Port #4198 to Spark 3.1 (#5499)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new c21918a919 Spark 3.1: Port #4198 to Spark 3.1 (#5499)
c21918a919 is described below

commit c21918a9195f0caa8ca6edf910dc99b75c0164cd
Author: liliwei <li...@huawei.com>
AuthorDate: Fri Aug 12 23:43:01 2022 +0800

    Spark 3.1: Port #4198 to Spark 3.1 (#5499)
---
 .../spark/actions/BaseDeleteOrphanFilesSparkAction.java        |  8 ++++----
 .../spark/actions/BaseDeleteReachableFilesSparkAction.java     |  8 ++++----
 .../iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java  | 10 +++++-----
 .../java/org/apache/iceberg/spark/actions/BaseSparkAction.java |  3 ++-
 4 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
index a79f075ef4..72b6268026 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
@@ -60,9 +60,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * An action that removes orphan metadata and data files by listing a given location and comparing
- * the actual files in that location with data and metadata files referenced by all valid snapshots.
- * The location must be accessible for listing via the Hadoop {@link FileSystem}.
+ * An action that removes orphan metadata, data and delete files by listing a given location and
+ * comparing the actual files in that location with content and metadata files referenced by all
+ * valid snapshots. The location must be accessible for listing via the Hadoop {@link FileSystem}.
  *
  * <p>By default, this action cleans up the table location returned by {@link Table#location()} and
  * removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can
@@ -169,7 +169,7 @@ public class BaseDeleteOrphanFilesSparkAction
   }
 
   private DeleteOrphanFiles.Result doExecute() {
-    Dataset<Row> validDataFileDF = buildValidDataFileDF(table);
+    Dataset<Row> validDataFileDF = buildValidContentFileDF(table);
     Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
     Dataset<Row> validFileDF = validDataFileDF.union(validMetadataFileDF);
     Dataset<Row> actualFileDF = buildActualFileDF();
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
index 1431ae5d78..a1bc19d7dc 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
@@ -60,7 +60,7 @@ public class BaseDeleteReachableFilesSparkAction
   private static final Logger LOG =
       LoggerFactory.getLogger(BaseDeleteReachableFilesSparkAction.class);
 
-  private static final String DATA_FILE = "Data File";
+  private static final String CONTENT_FILE = "Content File";
   private static final String MANIFEST = "Manifest";
   private static final String MANIFEST_LIST = "Manifest List";
   private static final String OTHERS = "Others";
@@ -140,7 +140,7 @@ public class BaseDeleteReachableFilesSparkAction
 
   private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
     Table staticTable = newStaticTable(metadata, io);
-    return projectFilePathWithType(buildValidDataFileDF(staticTable), DATA_FILE)
+    return projectFilePathWithType(buildValidContentFileDF(staticTable), CONTENT_FILE)
         .union(projectFilePathWithType(buildManifestFileDF(staticTable), MANIFEST))
         .union(projectFilePathWithType(buildManifestListDF(staticTable), MANIFEST_LIST))
         .union(projectFilePathWithType(buildOtherMetadataFileDF(staticTable), OTHERS));
@@ -183,9 +183,9 @@ public class BaseDeleteReachableFilesSparkAction
               String type = fileInfo.getString(1);
               removeFunc.accept(file);
               switch (type) {
-                case DATA_FILE:
+                case CONTENT_FILE:
                   dataFileCount.incrementAndGet();
-                  LOG.trace("Deleted Data File: {}", file);
+                  LOG.trace("Deleted Content File: {}", file);
                   break;
                 case MANIFEST:
                   manifestCount.incrementAndGet();
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
index 2e1f0c079e..da9907fe32 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
  *
  * <p>This action first leverages {@link org.apache.iceberg.ExpireSnapshots} to expire snapshots and
  * then uses metadata tables to find files that can be safely deleted. This is done by anti-joining
- * two Datasets that contain all manifest and data files before and after the expiration. The
+ * two Datasets that contain all manifest and content files before and after the expiration. The
  * snapshot expiration will be fully committed before any deletes are issued.
  *
  * <p>This operation performs a shuffle so the parallelism can be controlled through
@@ -72,7 +72,7 @@ public class BaseExpireSnapshotsSparkAction
 
   public static final String STREAM_RESULTS = "stream-results";
 
-  private static final String DATA_FILE = "Data File";
+  private static final String CONTENT_FILE = "Content File";
   private static final String MANIFEST = "Manifest";
   private static final String MANIFEST_LIST = "Manifest List";
 
@@ -233,7 +233,7 @@ public class BaseExpireSnapshotsSparkAction
 
   private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
     Table staticTable = newStaticTable(metadata, this.table.io());
-    return appendTypeString(buildValidDataFileDF(staticTable), DATA_FILE)
+    return appendTypeString(buildValidContentFileDF(staticTable), CONTENT_FILE)
         .union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST))
         .union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST));
   }
@@ -266,9 +266,9 @@ public class BaseExpireSnapshotsSparkAction
               String type = fileInfo.getString(1);
               deleteFunc.accept(file);
               switch (type) {
-                case DATA_FILE:
+                case CONTENT_FILE:
                   dataFileCount.incrementAndGet();
-                  LOG.trace("Deleted Data File: {}", file);
+                  LOG.trace("Deleted Content File: {}", file);
                   break;
                 case MANIFEST:
                   manifestCount.incrementAndGet();
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index c9d93ce9de..5abfcc4482 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -110,7 +110,8 @@ abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> {
     return new BaseTable(ops, metadataFileLocation);
   }
 
-  protected Dataset<Row> buildValidDataFileDF(Table table) {
+  // builds a DF of delete and data file locations by reading all manifests
+  protected Dataset<Row> buildValidContentFileDF(Table table) {
     JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
     Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));