You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/04/18 00:56:15 UTC
[iceberg] branch master updated: Spark: Use new actions entry point
in procedures (#2489)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 49d8ade Spark: Use new actions entry point in procedures (#2489)
49d8ade is described below
commit 49d8ade09b24fce3b95450f7bd1ba1cb517261b0
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Sat Apr 17 17:56:04 2021 -0700
Spark: Use new actions entry point in procedures (#2489)
---
.../iceberg/spark/procedures/BaseProcedure.java | 10 ++++++
.../spark/procedures/ExpireSnapshotsProcedure.java | 24 ++++++++-------
.../procedures/RemoveOrphanFilesProcedure.java | 36 +++++++++++++---------
.../procedures/RewriteManifestsProcedure.java | 24 ++++++++-------
4 files changed, 57 insertions(+), 37 deletions(-)
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java b/spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
index 1e2b239..eafa6cf 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
@@ -24,6 +24,7 @@ import org.apache.arrow.util.Preconditions;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.spark.source.SparkTable;
import org.apache.spark.sql.SparkSession;
@@ -47,6 +48,8 @@ abstract class BaseProcedure implements Procedure {
private final SparkSession spark;
private final TableCatalog tableCatalog;
+ private SparkActions actions;
+
protected BaseProcedure(TableCatalog tableCatalog) {
this.spark = SparkSession.active();
this.tableCatalog = tableCatalog;
@@ -56,6 +59,13 @@ abstract class BaseProcedure implements Procedure {
return this.spark;
}
+ protected SparkActions actions() {
+ if (actions == null) {
+ this.actions = SparkActions.get(spark);
+ }
+ return actions;
+ }
+
protected TableCatalog tableCatalog() {
return this.tableCatalog;
}
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index f4eac08..ba1e6b0 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -20,8 +20,7 @@
package org.apache.iceberg.spark.procedures;
import org.apache.iceberg.actions.Actions;
-import org.apache.iceberg.actions.ExpireSnapshotsAction;
-import org.apache.iceberg.actions.ExpireSnapshotsActionResult;
+import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -82,9 +81,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
return modifyIcebergTable(tableIdent, table -> {
- Actions actions = Actions.forTable(table);
-
- ExpireSnapshotsAction action = actions.expireSnapshots();
+ ExpireSnapshots action = actions().expireSnapshots(table);
if (olderThanMillis != null) {
action.expireOlderThan(olderThanMillis);
@@ -94,16 +91,21 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
action.retainLast(retainLastNum);
}
- ExpireSnapshotsActionResult result = action.execute();
+ ExpireSnapshots.Result result = action.execute();
- InternalRow outputRow = newInternalRow(
- result.dataFilesDeleted(),
- result.manifestFilesDeleted(),
- result.manifestListsDeleted());
- return new InternalRow[]{outputRow};
+ return toOutputRows(result);
});
}
+ private InternalRow[] toOutputRows(ExpireSnapshots.Result result) {
+ InternalRow row = newInternalRow(
+ result.deletedDataFilesCount(),
+ result.deletedManifestsCount(),
+ result.deletedManifestListsCount()
+ );
+ return new InternalRow[]{row};
+ }
+
@Override
public String description() {
return "ExpireSnapshotProcedure";
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index 9b95e26..eea4951 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -19,12 +19,11 @@
package org.apache.iceberg.spark.procedures;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.actions.Actions;
-import org.apache.iceberg.actions.RemoveOrphanFilesAction;
+import org.apache.iceberg.actions.RemoveOrphanFiles;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
-import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -85,13 +84,10 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
return withIcebergTable(tableIdent, table -> {
- SparkSession spark = SparkSession.active();
- Actions actions = Actions.forTable(spark, table);
-
- RemoveOrphanFilesAction action = actions.removeOrphanFiles();
+ RemoveOrphanFiles action = actions().removeOrphanFiles(table);
if (olderThanMillis != null) {
- boolean isTesting = Boolean.parseBoolean(spark.conf().get("spark.testing", "false"));
+ boolean isTesting = Boolean.parseBoolean(spark().conf().get("spark.testing", "false"));
if (!isTesting) {
validateInterval(olderThanMillis);
}
@@ -106,17 +102,27 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
action.deleteWith(file -> { });
}
- List<String> orphanFileLocations = action.execute();
+ RemoveOrphanFiles.Result result = action.execute();
- InternalRow[] outputRows = new InternalRow[orphanFileLocations.size()];
- for (int index = 0; index < orphanFileLocations.size(); index++) {
- String fileLocation = orphanFileLocations.get(index);
- outputRows[index] = newInternalRow(UTF8String.fromString(fileLocation));
- }
- return outputRows;
+ return toOutputRows(result);
});
}
+ private InternalRow[] toOutputRows(RemoveOrphanFiles.Result result) {
+ Iterable<String> orphanFileLocations = result.orphanFileLocations();
+
+ int orphanFileLocationsCount = Iterables.size(orphanFileLocations);
+ InternalRow[] rows = new InternalRow[orphanFileLocationsCount];
+
+ int index = 0;
+ for (String fileLocation : orphanFileLocations) {
+ rows[index] = newInternalRow(UTF8String.fromString(fileLocation));
+ index++;
+ }
+
+ return rows;
+ }
+
private void validateInterval(long olderThanMillis) {
long intervalMillis = System.currentTimeMillis() - olderThanMillis;
if (intervalMillis < TimeUnit.DAYS.toMillis(1)) {
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
index 93b7923..49e523a 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
@@ -20,8 +20,8 @@
package org.apache.iceberg.spark.procedures;
import org.apache.iceberg.actions.Actions;
-import org.apache.iceberg.actions.RewriteManifestsAction;
-import org.apache.iceberg.actions.RewriteManifestsActionResult;
+import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -81,23 +81,25 @@ class RewriteManifestsProcedure extends BaseProcedure {
Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
return modifyIcebergTable(tableIdent, table -> {
- Actions actions = Actions.forTable(table);
-
- RewriteManifestsAction action = actions.rewriteManifests();
+ RewriteManifests action = actions().rewriteManifests(table);
if (useCaching != null) {
- action.useCaching(useCaching);
+ action.option("use-caching", "true");
}
- RewriteManifestsActionResult result = action.execute();
+ RewriteManifests.Result result = action.execute();
- int numRewrittenManifests = result.deletedManifests().size();
- int numAddedManifests = result.addedManifests().size();
- InternalRow outputRow = newInternalRow(numRewrittenManifests, numAddedManifests);
- return new InternalRow[]{outputRow};
+ return toOutputRows(result);
});
}
+ private InternalRow[] toOutputRows(RewriteManifests.Result result) {
+ int rewrittenManifestsCount = Iterables.size(result.rewrittenManifests());
+ int addedManifestsCount = Iterables.size(result.addedManifests());
+ InternalRow row = newInternalRow(rewrittenManifestsCount, addedManifestsCount);
+ return new InternalRow[]{row};
+ }
+
@Override
public String description() {
return "RewriteManifestsProcedure";