You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/03/01 10:13:55 UTC

[iceberg] branch master updated: Spark 3.2: Add stream-results param to expire_snapshots procedure (#4152)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 72e9b50  Spark 3.2: Add stream-results param to expire_snapshots procedure (#4152)
72e9b50 is described below

commit 72e9b50bec768d49b1787d0a285b578ba5d0532e
Author: 0xffmeta <98...@users.noreply.github.com>
AuthorDate: Tue Mar 1 18:13:31 2022 +0800

    Spark 3.2: Add stream-results param to expire_snapshots procedure (#4152)
---
 .../extensions/TestExpireSnapshotsProcedure.java   | 25 ++++++++++++++++++++++
 .../actions/BaseExpireSnapshotsSparkAction.java    |  4 ++--
 .../spark/procedures/ExpireSnapshotsProcedure.java |  9 +++++++-
 .../spark/actions/TestExpireSnapshotsAction.java   | 22 +++++++++----------
 4 files changed, 46 insertions(+), 14 deletions(-)

diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index eb97e37..2443fb1 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -283,4 +283,29 @@ public class TestExpireSnapshotsProcedure extends SparkExtensionsTestBase {
     Assert.assertFalse("Delete manifest should be removed", localFs.exists(deleteManifestPath));
     Assert.assertFalse("Delete file should be removed", localFs.exists(deleteFilePath));
   }
+
+  @Test
+  public void testExpireSnapshotWithStreamResultsEnabled() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Assert.assertEquals("Should be 2 snapshots", 2, Iterables.size(table.snapshots()));
+
+    waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+    Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+
+    List<Object[]> output = sql(
+        "CALL %s.system.expire_snapshots(" +
+            "older_than => TIMESTAMP '%s'," +
+            "table => '%s'," +
+            "retain_last => 1, " +
+            "stream_results => true)",
+        catalogName, currentTimestamp, tableIdent);
+    assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 1L)), output);
+  }
 }
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
index 7541653..ff919cd 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
@@ -70,12 +70,12 @@ public class BaseExpireSnapshotsSparkAction
     extends BaseSparkAction<ExpireSnapshots, ExpireSnapshots.Result> implements ExpireSnapshots {
   private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class);
 
+  public static final String STREAM_RESULTS = "stream-results";
+
   private static final String CONTENT_FILE = "Content File";
   private static final String MANIFEST = "Manifest";
   private static final String MANIFEST_LIST = "Manifest List";
 
-  private static final String STREAM_RESULTS = "stream-results";
-
   // Creates an executor service that runs each task in the thread that invokes execute/submit.
   private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;
 
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index 2ebff0d..8cf4d27 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.spark.procedures;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.actions.ExpireSnapshots;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.actions.BaseExpireSnapshotsSparkAction;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.util.DateTimeUtil;
@@ -45,7 +46,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
       ProcedureParameter.required("table", DataTypes.StringType),
       ProcedureParameter.optional("older_than", DataTypes.TimestampType),
       ProcedureParameter.optional("retain_last", DataTypes.IntegerType),
-      ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType)
+      ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType),
+      ProcedureParameter.optional("stream_results", DataTypes.BooleanType)
   };
 
   private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
@@ -83,6 +85,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
     Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1));
     Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
     Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
+    Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
 
     Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
         "max_concurrent_deletes should have value > 0,  value: " + maxConcurrentDeletes);
@@ -102,6 +105,10 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
         action.executeDeleteWith(executorService(maxConcurrentDeletes, "expire-snapshots"));
       }
 
+      if (streamResult != null) {
+        action.option(BaseExpireSnapshotsSparkAction.STREAM_RESULTS, Boolean.toString(streamResult));
+      }
+
       ExpireSnapshots.Result result = action.execute();
 
       return toOutputRows(result);
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index 92792eb..c4445e9 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.ExpireSnapshots;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -1041,20 +1042,19 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
 
     long end = rightAfterSnapshot();
 
-    int jobsBefore = spark.sparkContext().dagScheduler().nextJobId().get();
+    int jobsBeforeStreamResults = spark.sparkContext().dagScheduler().nextJobId().get();
 
-    ExpireSnapshots.Result results =
-        SparkActions.get().expireSnapshots(table).expireOlderThan(end).execute();
+    withSQLConf(ImmutableMap.of("spark.sql.adaptive.enabled", "false"), () -> {
+      ExpireSnapshots.Result results = SparkActions.get().expireSnapshots(table).expireOlderThan(end)
+          .option("stream-results", "true").execute();
 
-    Assert.assertEquals("Table does not have 1 snapshot after expiration", 1, Iterables.size(table.snapshots()));
-
-    int jobsAfter = spark.sparkContext().dagScheduler().nextJobId().get();
-    int totalJobsRun = jobsAfter - jobsBefore;
+      int jobsAfterStreamResults = spark.sparkContext().dagScheduler().nextJobId().get();
+      int jobsRunDuringStreamResults = jobsAfterStreamResults - jobsBeforeStreamResults;
 
-    checkExpirationResults(1L, 1L, 2L, results);
+      checkExpirationResults(1L, 1L, 2L, results);
 
-    Assert.assertTrue(
-        String.format("Expected more than %d jobs when using local iterator, ran %d", SHUFFLE_PARTITIONS, totalJobsRun),
-        totalJobsRun > SHUFFLE_PARTITIONS);
+      Assert.assertEquals("Expected total number of jobs with stream-results should match the expected number",
+          5L, jobsRunDuringStreamResults);
+    });
   }
 }