You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/08/12 15:43:06 UTC
[iceberg] branch master updated: Spark 3.1: Port #4198 to Spark 3.1 (#5499)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new c21918a919 Spark 3.1: Port #4198 to Spark 3.1 (#5499)
c21918a919 is described below
commit c21918a9195f0caa8ca6edf910dc99b75c0164cd
Author: liliwei <li...@huawei.com>
AuthorDate: Fri Aug 12 23:43:01 2022 +0800
Spark 3.1: Port #4198 to Spark 3.1 (#5499)
---
.../spark/actions/BaseDeleteOrphanFilesSparkAction.java | 8 ++++----
.../spark/actions/BaseDeleteReachableFilesSparkAction.java | 8 ++++----
.../iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java | 10 +++++-----
.../java/org/apache/iceberg/spark/actions/BaseSparkAction.java | 3 ++-
4 files changed, 15 insertions(+), 14 deletions(-)
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
index a79f075ef4..72b6268026 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java
@@ -60,9 +60,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * An action that removes orphan metadata and data files by listing a given location and comparing
- * the actual files in that location with data and metadata files referenced by all valid snapshots.
- * The location must be accessible for listing via the Hadoop {@link FileSystem}.
+ * An action that removes orphan metadata, data and delete files by listing a given location and
+ * comparing the actual files in that location with content and metadata files referenced by all
+ * valid snapshots. The location must be accessible for listing via the Hadoop {@link FileSystem}.
*
* <p>By default, this action cleans up the table location returned by {@link Table#location()} and
* removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can
@@ -169,7 +169,7 @@ public class BaseDeleteOrphanFilesSparkAction
}
private DeleteOrphanFiles.Result doExecute() {
- Dataset<Row> validDataFileDF = buildValidDataFileDF(table);
+ Dataset<Row> validDataFileDF = buildValidContentFileDF(table);
Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
Dataset<Row> validFileDF = validDataFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = buildActualFileDF();
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
index 1431ae5d78..a1bc19d7dc 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteReachableFilesSparkAction.java
@@ -60,7 +60,7 @@ public class BaseDeleteReachableFilesSparkAction
private static final Logger LOG =
LoggerFactory.getLogger(BaseDeleteReachableFilesSparkAction.class);
- private static final String DATA_FILE = "Data File";
+ private static final String CONTENT_FILE = "Content File";
private static final String MANIFEST = "Manifest";
private static final String MANIFEST_LIST = "Manifest List";
private static final String OTHERS = "Others";
@@ -140,7 +140,7 @@ public class BaseDeleteReachableFilesSparkAction
private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, io);
- return projectFilePathWithType(buildValidDataFileDF(staticTable), DATA_FILE)
+ return projectFilePathWithType(buildValidContentFileDF(staticTable), CONTENT_FILE)
.union(projectFilePathWithType(buildManifestFileDF(staticTable), MANIFEST))
.union(projectFilePathWithType(buildManifestListDF(staticTable), MANIFEST_LIST))
.union(projectFilePathWithType(buildOtherMetadataFileDF(staticTable), OTHERS));
@@ -183,9 +183,9 @@ public class BaseDeleteReachableFilesSparkAction
String type = fileInfo.getString(1);
removeFunc.accept(file);
switch (type) {
- case DATA_FILE:
+ case CONTENT_FILE:
dataFileCount.incrementAndGet();
- LOG.trace("Deleted Data File: {}", file);
+ LOG.trace("Deleted Content File: {}", file);
break;
case MANIFEST:
manifestCount.incrementAndGet();
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
index 2e1f0c079e..da9907fe32 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
@@ -57,7 +57,7 @@ import org.slf4j.LoggerFactory;
*
* <p>This action first leverages {@link org.apache.iceberg.ExpireSnapshots} to expire snapshots and
* then uses metadata tables to find files that can be safely deleted. This is done by anti-joining
- * two Datasets that contain all manifest and data files before and after the expiration. The
+ * two Datasets that contain all manifest and content files before and after the expiration. The
* snapshot expiration will be fully committed before any deletes are issued.
*
* <p>This operation performs a shuffle so the parallelism can be controlled through
@@ -72,7 +72,7 @@ public class BaseExpireSnapshotsSparkAction
public static final String STREAM_RESULTS = "stream-results";
- private static final String DATA_FILE = "Data File";
+ private static final String CONTENT_FILE = "Content File";
private static final String MANIFEST = "Manifest";
private static final String MANIFEST_LIST = "Manifest List";
@@ -233,7 +233,7 @@ public class BaseExpireSnapshotsSparkAction
private Dataset<Row> buildValidFileDF(TableMetadata metadata) {
Table staticTable = newStaticTable(metadata, this.table.io());
- return appendTypeString(buildValidDataFileDF(staticTable), DATA_FILE)
+ return appendTypeString(buildValidContentFileDF(staticTable), CONTENT_FILE)
.union(appendTypeString(buildManifestFileDF(staticTable), MANIFEST))
.union(appendTypeString(buildManifestListDF(staticTable), MANIFEST_LIST));
}
@@ -266,9 +266,9 @@ public class BaseExpireSnapshotsSparkAction
String type = fileInfo.getString(1);
deleteFunc.accept(file);
switch (type) {
- case DATA_FILE:
+ case CONTENT_FILE:
dataFileCount.incrementAndGet();
- LOG.trace("Deleted Data File: {}", file);
+ LOG.trace("Deleted Content File: {}", file);
break;
case MANIFEST:
manifestCount.incrementAndGet();
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
index c9d93ce9de..5abfcc4482 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java
@@ -110,7 +110,8 @@ abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> {
return new BaseTable(ops, metadataFileLocation);
}
- protected Dataset<Row> buildValidDataFileDF(Table table) {
+ // builds a DF of delete and data file locations by reading all manifests
+ protected Dataset<Row> buildValidContentFileDF(Table table) {
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
Broadcast<FileIO> ioBroadcast = context.broadcast(SparkUtil.serializableFileIO(table));