You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/04/18 00:56:15 UTC

[iceberg] branch master updated: Spark: Use new actions entry point in procedures (#2489)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 49d8ade  Spark: Use new actions entry point in procedures (#2489)
49d8ade is described below

commit 49d8ade09b24fce3b95450f7bd1ba1cb517261b0
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Sat Apr 17 17:56:04 2021 -0700

    Spark: Use new actions entry point in procedures (#2489)
---
 .../iceberg/spark/procedures/BaseProcedure.java    | 10 ++++++
 .../spark/procedures/ExpireSnapshotsProcedure.java | 24 ++++++++-------
 .../procedures/RemoveOrphanFilesProcedure.java     | 36 +++++++++++++---------
 .../procedures/RewriteManifestsProcedure.java      | 24 ++++++++-------
 4 files changed, 57 insertions(+), 37 deletions(-)

diff --git a/spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java b/spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
index 1e2b239..eafa6cf 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
@@ -24,6 +24,7 @@ import org.apache.arrow.util.Preconditions;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
+import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.spark.source.SparkTable;
 import org.apache.spark.sql.SparkSession;
@@ -47,6 +48,8 @@ abstract class BaseProcedure implements Procedure {
   private final SparkSession spark;
   private final TableCatalog tableCatalog;
 
+  private SparkActions actions;
+
   protected BaseProcedure(TableCatalog tableCatalog) {
     this.spark = SparkSession.active();
     this.tableCatalog = tableCatalog;
@@ -56,6 +59,13 @@ abstract class BaseProcedure implements Procedure {
     return this.spark;
   }
 
+  protected SparkActions actions() {
+    if (actions == null) {
+      this.actions = SparkActions.get(spark);
+    }
+    return actions;
+  }
+
   protected TableCatalog tableCatalog() {
     return this.tableCatalog;
   }
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index f4eac08..ba1e6b0 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -20,8 +20,7 @@
 package org.apache.iceberg.spark.procedures;
 
 import org.apache.iceberg.actions.Actions;
-import org.apache.iceberg.actions.ExpireSnapshotsAction;
-import org.apache.iceberg.actions.ExpireSnapshotsActionResult;
+import org.apache.iceberg.actions.ExpireSnapshots;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
@@ -82,9 +81,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
     Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
 
     return modifyIcebergTable(tableIdent, table -> {
-      Actions actions = Actions.forTable(table);
-
-      ExpireSnapshotsAction action = actions.expireSnapshots();
+      ExpireSnapshots action = actions().expireSnapshots(table);
 
       if (olderThanMillis != null) {
         action.expireOlderThan(olderThanMillis);
@@ -94,16 +91,21 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
         action.retainLast(retainLastNum);
       }
 
-      ExpireSnapshotsActionResult result = action.execute();
+      ExpireSnapshots.Result result = action.execute();
 
-      InternalRow outputRow = newInternalRow(
-          result.dataFilesDeleted(),
-          result.manifestFilesDeleted(),
-          result.manifestListsDeleted());
-      return new InternalRow[]{outputRow};
+      return toOutputRows(result);
     });
   }
 
+  private InternalRow[] toOutputRows(ExpireSnapshots.Result result) {
+    InternalRow row = newInternalRow(
+        result.deletedDataFilesCount(),
+        result.deletedManifestsCount(),
+        result.deletedManifestListsCount()
+    );
+    return new InternalRow[]{row};
+  }
+
   @Override
   public String description() {
     return "ExpireSnapshotProcedure";
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
index 9b95e26..eea4951 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java
@@ -19,12 +19,11 @@
 
 package org.apache.iceberg.spark.procedures;
 
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.iceberg.actions.Actions;
-import org.apache.iceberg.actions.RemoveOrphanFilesAction;
+import org.apache.iceberg.actions.RemoveOrphanFiles;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
-import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -85,13 +84,10 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
     boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
 
     return withIcebergTable(tableIdent, table -> {
-      SparkSession spark = SparkSession.active();
-      Actions actions = Actions.forTable(spark, table);
-
-      RemoveOrphanFilesAction action = actions.removeOrphanFiles();
+      RemoveOrphanFiles action = actions().removeOrphanFiles(table);
 
       if (olderThanMillis != null) {
-        boolean isTesting = Boolean.parseBoolean(spark.conf().get("spark.testing", "false"));
+        boolean isTesting = Boolean.parseBoolean(spark().conf().get("spark.testing", "false"));
         if (!isTesting) {
           validateInterval(olderThanMillis);
         }
@@ -106,17 +102,27 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
         action.deleteWith(file -> { });
       }
 
-      List<String> orphanFileLocations = action.execute();
+      RemoveOrphanFiles.Result result = action.execute();
 
-      InternalRow[] outputRows = new InternalRow[orphanFileLocations.size()];
-      for (int index = 0; index < orphanFileLocations.size(); index++) {
-        String fileLocation = orphanFileLocations.get(index);
-        outputRows[index] = newInternalRow(UTF8String.fromString(fileLocation));
-      }
-      return outputRows;
+      return toOutputRows(result);
     });
   }
 
+  private InternalRow[] toOutputRows(RemoveOrphanFiles.Result result) {
+    Iterable<String> orphanFileLocations = result.orphanFileLocations();
+
+    int orphanFileLocationsCount = Iterables.size(orphanFileLocations);
+    InternalRow[] rows = new InternalRow[orphanFileLocationsCount];
+
+    int index = 0;
+    for (String fileLocation : orphanFileLocations) {
+      rows[index] = newInternalRow(UTF8String.fromString(fileLocation));
+      index++;
+    }
+
+    return rows;
+  }
+
   private void validateInterval(long olderThanMillis) {
     long intervalMillis = System.currentTimeMillis() - olderThanMillis;
     if (intervalMillis < TimeUnit.DAYS.toMillis(1)) {
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
index 93b7923..49e523a 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/procedures/RewriteManifestsProcedure.java
@@ -20,8 +20,8 @@
 package org.apache.iceberg.spark.procedures;
 
 import org.apache.iceberg.actions.Actions;
-import org.apache.iceberg.actions.RewriteManifestsAction;
-import org.apache.iceberg.actions.RewriteManifestsActionResult;
+import org.apache.iceberg.actions.RewriteManifests;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -81,23 +81,25 @@ class RewriteManifestsProcedure extends BaseProcedure {
     Boolean useCaching = args.isNullAt(1) ? null : args.getBoolean(1);
 
     return modifyIcebergTable(tableIdent, table -> {
-      Actions actions = Actions.forTable(table);
-
-      RewriteManifestsAction action = actions.rewriteManifests();
+      RewriteManifests action = actions().rewriteManifests(table);
 
       if (useCaching != null) {
-        action.useCaching(useCaching);
+        action.option("use-caching", "true");
       }
 
-      RewriteManifestsActionResult result = action.execute();
+      RewriteManifests.Result result = action.execute();
 
-      int numRewrittenManifests = result.deletedManifests().size();
-      int numAddedManifests = result.addedManifests().size();
-      InternalRow outputRow = newInternalRow(numRewrittenManifests, numAddedManifests);
-      return new InternalRow[]{outputRow};
+      return toOutputRows(result);
     });
   }
 
+  private InternalRow[] toOutputRows(RewriteManifests.Result result) {
+    int rewrittenManifestsCount = Iterables.size(result.rewrittenManifests());
+    int addedManifestsCount = Iterables.size(result.addedManifests());
+    InternalRow row = newInternalRow(rewrittenManifestsCount, addedManifestsCount);
+    return new InternalRow[]{row};
+  }
+
   @Override
   public String description() {
     return "RewriteManifestsProcedure";