You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2021/10/30 03:07:12 UTC
[iceberg] branch master updated: spark: CALL ancestors_of get all
the snapshot ancestors (#3317)
This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 51e2ec3 spark: CALL ancestors_of get all the snapshot ancestors (#3317)
51e2ec3 is described below
commit 51e2ec310e2b76f3d7260ffbb789755d87a5ef9d
Author: KnightChess <wl...@163.com>
AuthorDate: Sat Oct 30 11:06:55 2021 +0800
spark: CALL ancestors_of get all the snapshot ancestors (#3317)
---
site/docs/spark-procedures.md | 43 ++++++
.../spark/extensions/TestAncestorsOfProcedure.java | 163 +++++++++++++++++++++
.../spark/procedures/AncestorsOfProcedure.java | 105 +++++++++++++
.../iceberg/spark/procedures/SparkProcedures.java | 1 +
4 files changed, 312 insertions(+)
diff --git a/site/docs/spark-procedures.md b/site/docs/spark-procedures.md
index 1aea344..ea78760 100644
--- a/site/docs/spark-procedures.md
+++ b/site/docs/spark-procedures.md
@@ -407,3 +407,46 @@ CALL spark_catalog.system.add_files(
source_table => '`parquet`.`path/to/table`'
)
```
+
+## `Metadata information`
+
+### `ancestors_of`
+
+Report the live snapshot IDs of parents of a specified snapshot
+
+#### Usage
+
+| Argument Name | Required? | Type | Description |
+|---------------|-----------|------|-------------|
+| `table` | ✔️ | string | Name of the table to report live snapshot IDs |
+| `snapshot_id` | ️ | long | Use a specified snapshot to get the live snapshot IDs of parents |
+
+> tip : Using snapshot_id
+>
+> Given snapshots history with roll back to B and addition of C' -> D'
+> ```shell
+> A -> B - > C -> D
+> \ -> C' -> (D')
+> ```
+> Not specifying the snapshot ID would return A -> B -> C' -> D', while providing the snapshot ID of
+> D as an argument would return A-> B -> C -> D
+
+#### Output
+
+| Output Name | Type | Description |
+| ------------|------|-------------|
+| `snapshot_id` | long | the ancestor snapshot id |
+| `timestamp` | long | snapshot creation time |
+
+#### Examples
+
+Get all the snapshot ancestors of current snapshots(default)
+```sql
+CALL spark_catalog.system.ancestors_of('db.tbl')
+```
+
+Get all the snapshot ancestors by a particular snapshot
+```sql
+CALL spark_catalog.system.ancestors_of('db.tbl', 1)
+CALL spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl')
+```
diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java
new file mode 100644
index 0000000..baf464d
--- /dev/null
+++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestAncestorsOfProcedure extends SparkExtensionsTestBase {
+
+ public TestAncestorsOfProcedure(String catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testAncestorOfUsingEmptyArgs() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Long currentSnapshotId = table.currentSnapshot().snapshotId();
+ Long currentTimestamp = table.currentSnapshot().timestampMillis();
+ Long preSnapshotId = table.currentSnapshot().parentId();
+ Long preTimeStamp = table.snapshot(table.currentSnapshot().parentId()).timestampMillis();
+
+ List<Object[]> output = sql("CALL %s.system.ancestors_of('%s')",
+ catalogName, tableIdent);
+
+ assertEquals(
+ "Procedure output must match",
+ ImmutableList.of(
+ row(currentSnapshotId, currentTimestamp),
+ row(preSnapshotId, preTimeStamp)),
+ output);
+ }
+
+ @Test
+ public void testAncestorOfUsingSnapshotId() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Long currentSnapshotId = table.currentSnapshot().snapshotId();
+ Long currentTimestamp = table.currentSnapshot().timestampMillis();
+ Long preSnapshotId = table.currentSnapshot().parentId();
+ Long preTimeStamp = table.snapshot(table.currentSnapshot().parentId()).timestampMillis();
+
+ assertEquals(
+ "Procedure output must match",
+ ImmutableList.of(
+ row(currentSnapshotId, currentTimestamp),
+ row(preSnapshotId, preTimeStamp)),
+ sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, currentSnapshotId));
+
+ assertEquals(
+ "Procedure output must match",
+ ImmutableList.of(row(preSnapshotId, preTimeStamp)),
+ sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, preSnapshotId));
+ }
+
+ @Test
+ public void testAncestorOfWithRollBack() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+ table.refresh();
+ Long firstSnapshotId = table.currentSnapshot().snapshotId();
+ Long firstTimestamp = table.currentSnapshot().timestampMillis();
+ sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+ table.refresh();
+ Long secondSnapshotId = table.currentSnapshot().snapshotId();
+ Long secondTimestamp = table.currentSnapshot().timestampMillis();
+ sql("INSERT INTO TABLE %s VALUES (3, 'c')", tableName);
+ table.refresh();
+ Long thirdSnapshotId = table.currentSnapshot().snapshotId();
+ Long thirdTimestamp = table.currentSnapshot().timestampMillis();
+
+ // roll back
+ sql("CALL %s.system.rollback_to_snapshot('%s', %dL)",
+ catalogName, tableIdent, secondSnapshotId);
+
+ sql("INSERT INTO TABLE %s VALUES (4, 'd')", tableName);
+ table.refresh();
+ Long fourthSnapshotId = table.currentSnapshot().snapshotId();
+ Long fourthTimestamp = table.currentSnapshot().timestampMillis();
+
+ assertEquals(
+ "Procedure output must match",
+ ImmutableList.of(
+ row(fourthSnapshotId, fourthTimestamp),
+ row(secondSnapshotId, secondTimestamp),
+ row(firstSnapshotId, firstTimestamp)),
+ sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, fourthSnapshotId));
+
+ assertEquals(
+ "Procedure output must match",
+ ImmutableList.of(
+ row(thirdSnapshotId, thirdTimestamp),
+ row(secondSnapshotId, secondTimestamp),
+ row(firstSnapshotId, firstTimestamp)),
+ sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, thirdSnapshotId));
+ }
+
+ @Test
+ public void testAncestorOfUsingNamedArgs() {
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ Long firstSnapshotId = table.currentSnapshot().snapshotId();
+ Long firstTimestamp = table.currentSnapshot().timestampMillis();
+
+ assertEquals(
+ "Procedure output must match",
+ ImmutableList.of(row(firstSnapshotId, firstTimestamp)),
+ sql("CALL %s.system.ancestors_of(snapshot_id => %dL, table => '%s')",
+ catalogName, firstSnapshotId, tableIdent));
+ }
+
+ @Test
+ public void testInvalidAncestorOfCases() {
+ AssertHelpers.assertThrows("Should reject calls without all required args",
+ AnalysisException.class, "Missing required parameters",
+ () -> sql("CALL %s.system.ancestors_of()", catalogName));
+
+ AssertHelpers.assertThrows("Should reject calls with empty table identifier",
+ IllegalArgumentException.class, "Cannot handle an empty identifier for argument table",
+ () -> sql("CALL %s.system.ancestors_of('')", catalogName));
+
+ AssertHelpers.assertThrows("Should reject calls with invalid arg types",
+ AnalysisException.class, "Wrong arg type for snapshot_id: cannot cast",
+ () -> sql("CALL %s.system.ancestors_of('%s', 1.1)", catalogName, tableIdent));
+ }
+}
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java
new file mode 100644
index 0000000..d912769
--- /dev/null
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.List;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class AncestorsOfProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] {
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.optional("snapshot_id", DataTypes.LongType),
+ };
+
+ private static final StructType OUTPUT_TYPE = new StructType(new StructField[] {
+ new StructField("snapshot_id", DataTypes.LongType, true, Metadata.empty()),
+ new StructField("timestamp", DataTypes.LongType, true, Metadata.empty())
+ });
+
+ private AncestorsOfProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ public static SparkProcedures.ProcedureBuilder builder() {
+ return new BaseProcedure.Builder<AncestorsOfProcedure>() {
+ @Override
+ protected AncestorsOfProcedure doBuild() {
+ return new AncestorsOfProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
+ Long toSnapshotId = args.isNullAt(1) ? null : args.getLong(1);
+
+ SparkTable sparkTable = loadSparkTable(tableIdent);
+ Table icebergTable = sparkTable.table();
+
+ if (toSnapshotId == null) {
+ toSnapshotId = icebergTable.currentSnapshot() != null ? icebergTable.currentSnapshot().snapshotId() : -1;
+ }
+
+ List<Long> snapshotIds = SnapshotUtil.snapshotIdsBetween(icebergTable, 0L, toSnapshotId);
+
+ return toOutputRow(icebergTable, snapshotIds);
+ }
+
+ @Override
+ public String description() {
+ return "AncestorsOf";
+ }
+
+ private InternalRow[] toOutputRow(Table table, List<Long> snapshotIds) {
+ if (snapshotIds.isEmpty()) {
+ return new InternalRow[0];
+ }
+
+ InternalRow[] internalRows = new InternalRow[snapshotIds.size()];
+ for (int i = 0; i < snapshotIds.size(); i++) {
+ Long snapshotId = snapshotIds.get(i);
+ internalRows[i] = newInternalRow(snapshotId, table.snapshot(snapshotId).timestampMillis());
+ }
+
+ return internalRows;
+ }
+}
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index fc9d4ae..42545ab 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -51,6 +51,7 @@ public class SparkProcedures {
mapBuilder.put("migrate", MigrateTableProcedure::builder);
mapBuilder.put("snapshot", SnapshotTableProcedure::builder);
mapBuilder.put("add_files", AddFilesProcedure::builder);
+ mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
return mapBuilder.build();
}