You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/03/01 10:13:55 UTC
[iceberg] branch master updated: Spark 3.2: Add stream-results param to expire_snapshots procedure (#4152)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 72e9b50 Spark 3.2: Add stream-results param to expire_snapshots procedure (#4152)
72e9b50 is described below
commit 72e9b50bec768d49b1787d0a285b578ba5d0532e
Author: 0xffmeta <98...@users.noreply.github.com>
AuthorDate: Tue Mar 1 18:13:31 2022 +0800
Spark 3.2: Add stream-results param to expire_snapshots procedure (#4152)
---
.../extensions/TestExpireSnapshotsProcedure.java | 25 ++++++++++++++++++++++
.../actions/BaseExpireSnapshotsSparkAction.java | 4 ++--
.../spark/procedures/ExpireSnapshotsProcedure.java | 9 +++++++-
.../spark/actions/TestExpireSnapshotsAction.java | 22 +++++++++----------
4 files changed, 46 insertions(+), 14 deletions(-)
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
index eb97e37..2443fb1 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestExpireSnapshotsProcedure.java
@@ -283,4 +283,29 @@ public class TestExpireSnapshotsProcedure extends SparkExtensionsTestBase {
Assert.assertFalse("Delete manifest should be removed", localFs.exists(deleteManifestPath));
Assert.assertFalse("Delete file should be removed", localFs.exists(deleteFilePath));
}
+
+ @Test
+ public void testExpireSnapshotWithStreamResultsEnabled() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Assert.assertEquals("Should be 2 snapshots", 2, Iterables.size(table.snapshots()));
+
+ waitUntilAfter(table.currentSnapshot().timestampMillis());
+
+ Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
+
+ List<Object[]> output = sql(
+ "CALL %s.system.expire_snapshots(" +
+ "older_than => TIMESTAMP '%s'," +
+ "table => '%s'," +
+ "retain_last => 1, " +
+ "stream_results => true)",
+ catalogName, currentTimestamp, tableIdent);
+ assertEquals("Procedure output must match", ImmutableList.of(row(0L, 0L, 1L)), output);
+ }
}
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
index 7541653..ff919cd 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseExpireSnapshotsSparkAction.java
@@ -70,12 +70,12 @@ public class BaseExpireSnapshotsSparkAction
extends BaseSparkAction<ExpireSnapshots, ExpireSnapshots.Result> implements ExpireSnapshots {
private static final Logger LOG = LoggerFactory.getLogger(BaseExpireSnapshotsSparkAction.class);
+ public static final String STREAM_RESULTS = "stream-results";
+
private static final String CONTENT_FILE = "Content File";
private static final String MANIFEST = "Manifest";
private static final String MANIFEST_LIST = "Manifest List";
- private static final String STREAM_RESULTS = "stream-results";
-
// Creates an executor service that runs each task in the thread that invokes execute/submit.
private static final ExecutorService DEFAULT_DELETE_EXECUTOR_SERVICE = null;
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
index 2ebff0d..8cf4d27 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/ExpireSnapshotsProcedure.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.spark.procedures;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.actions.BaseExpireSnapshotsSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.DateTimeUtil;
@@ -45,7 +46,8 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
ProcedureParameter.required("table", DataTypes.StringType),
ProcedureParameter.optional("older_than", DataTypes.TimestampType),
ProcedureParameter.optional("retain_last", DataTypes.IntegerType),
- ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType)
+ ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType),
+ ProcedureParameter.optional("stream_results", DataTypes.BooleanType)
};
private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
@@ -83,6 +85,7 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1));
Integer retainLastNum = args.isNullAt(2) ? null : args.getInt(2);
Integer maxConcurrentDeletes = args.isNullAt(3) ? null : args.getInt(3);
+ Boolean streamResult = args.isNullAt(4) ? null : args.getBoolean(4);
Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
"max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes);
@@ -102,6 +105,10 @@ public class ExpireSnapshotsProcedure extends BaseProcedure {
action.executeDeleteWith(executorService(maxConcurrentDeletes, "expire-snapshots"));
}
+ if (streamResult != null) {
+ action.option(BaseExpireSnapshotsSparkAction.STREAM_RESULTS, Boolean.toString(streamResult));
+ }
+
ExpireSnapshots.Result result = action.execute();
return toOutputRows(result);
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index 92792eb..c4445e9 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -41,6 +41,7 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -1041,20 +1042,19 @@ public class TestExpireSnapshotsAction extends SparkTestBase {
long end = rightAfterSnapshot();
- int jobsBefore = spark.sparkContext().dagScheduler().nextJobId().get();
+ int jobsBeforeStreamResults = spark.sparkContext().dagScheduler().nextJobId().get();
- ExpireSnapshots.Result results =
- SparkActions.get().expireSnapshots(table).expireOlderThan(end).execute();
+ withSQLConf(ImmutableMap.of("spark.sql.adaptive.enabled", "false"), () -> {
+ ExpireSnapshots.Result results = SparkActions.get().expireSnapshots(table).expireOlderThan(end)
+ .option("stream-results", "true").execute();
- Assert.assertEquals("Table does not have 1 snapshot after expiration", 1, Iterables.size(table.snapshots()));
-
- int jobsAfter = spark.sparkContext().dagScheduler().nextJobId().get();
- int totalJobsRun = jobsAfter - jobsBefore;
+ int jobsAfterStreamResults = spark.sparkContext().dagScheduler().nextJobId().get();
+ int jobsRunDuringStreamResults = jobsAfterStreamResults - jobsBeforeStreamResults;
- checkExpirationResults(1L, 1L, 2L, results);
+ checkExpirationResults(1L, 1L, 2L, results);
- Assert.assertTrue(
- String.format("Expected more than %d jobs when using local iterator, ran %d", SHUFFLE_PARTITIONS, totalJobsRun),
- totalJobsRun > SHUFFLE_PARTITIONS);
+ Assert.assertEquals("Expected total number of jobs with stream-results should match the expected number",
+ 5L, jobsRunDuringStreamResults);
+ });
}
}