You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2021/10/30 03:07:12 UTC

[iceberg] branch master updated: spark: CALL ancestors_of get all the snapshot ancestors (#3317)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 51e2ec3  spark: CALL ancestors_of get all the snapshot ancestors (#3317)
51e2ec3 is described below

commit 51e2ec310e2b76f3d7260ffbb789755d87a5ef9d
Author: KnightChess <wl...@163.com>
AuthorDate: Sat Oct 30 11:06:55 2021 +0800

    spark: CALL ancestors_of get all the snapshot ancestors (#3317)
---
 site/docs/spark-procedures.md                      |  43 ++++++
 .../spark/extensions/TestAncestorsOfProcedure.java | 163 +++++++++++++++++++++
 .../spark/procedures/AncestorsOfProcedure.java     | 105 +++++++++++++
 .../iceberg/spark/procedures/SparkProcedures.java  |   1 +
 4 files changed, 312 insertions(+)

diff --git a/site/docs/spark-procedures.md b/site/docs/spark-procedures.md
index 1aea344..ea78760 100644
--- a/site/docs/spark-procedures.md
+++ b/site/docs/spark-procedures.md
@@ -407,3 +407,46 @@ CALL spark_catalog.system.add_files(
   source_table => '`parquet`.`path/to/table`'
 )
 ```
+
+## `Metadata information`
+
+### `ancestors_of`
+
+Report the live snapshot IDs of parents of a specified snapshot
+
+#### Usage
+
+| Argument Name | Required? | Type | Description |
+|---------------|-----------|------|-------------|
+| `table`       | ✔️  | string | Name of the table to report live snapshot IDs |
+| `snapshot_id` |  ️  | long | Use a specified snapshot to get the live snapshot IDs of parents |
+
+> tip : Using snapshot_id
+> 
+> Given snapshots history with roll back to B and addition of C' -> D'
+> ```shell
+> A -> B - > C -> D
+>       \ -> C' -> (D')
+> ```
+> Not specifying the snapshot ID would return A -> B -> C' -> D', while providing the snapshot ID of
+> D as an argument would return A-> B -> C -> D
+
+#### Output
+
+| Output Name | Type | Description |
+| ------------|------|-------------|
+| `snapshot_id` | long | the ancestor snapshot id |
+| `timestamp` | long | snapshot creation time |
+
+#### Examples
+
+Get all the snapshot ancestors of current snapshots(default)
+```sql
+CALL spark_catalog.system.ancestors_of('db.tbl')
+```
+
+Get all the snapshot ancestors by a particular snapshot
+```sql
+CALL spark_catalog.system.ancestors_of('db.tbl', 1)
+CALL spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl')
+```
diff --git a/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java
new file mode 100644
index 0000000..baf464d
--- /dev/null
+++ b/spark/v3.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAncestorsOfProcedure.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.spark.sql.AnalysisException;
+import org.junit.After;
+import org.junit.Test;
+
+public class TestAncestorsOfProcedure extends SparkExtensionsTestBase {
+
+  public TestAncestorsOfProcedure(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testAncestorOfUsingEmptyArgs() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Long currentSnapshotId = table.currentSnapshot().snapshotId();
+    Long currentTimestamp = table.currentSnapshot().timestampMillis();
+    Long preSnapshotId = table.currentSnapshot().parentId();
+    Long preTimeStamp = table.snapshot(table.currentSnapshot().parentId()).timestampMillis();
+
+    List<Object[]> output = sql("CALL %s.system.ancestors_of('%s')",
+        catalogName, tableIdent);
+
+    assertEquals(
+        "Procedure output must match",
+        ImmutableList.of(
+            row(currentSnapshotId, currentTimestamp),
+            row(preSnapshotId, preTimeStamp)),
+        output);
+  }
+
+  @Test
+  public void testAncestorOfUsingSnapshotId() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Long currentSnapshotId = table.currentSnapshot().snapshotId();
+    Long currentTimestamp = table.currentSnapshot().timestampMillis();
+    Long preSnapshotId = table.currentSnapshot().parentId();
+    Long preTimeStamp = table.snapshot(table.currentSnapshot().parentId()).timestampMillis();
+
+    assertEquals(
+        "Procedure output must match",
+        ImmutableList.of(
+            row(currentSnapshotId, currentTimestamp),
+            row(preSnapshotId, preTimeStamp)),
+        sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, currentSnapshotId));
+
+    assertEquals(
+        "Procedure output must match",
+        ImmutableList.of(row(preSnapshotId, preTimeStamp)),
+        sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, preSnapshotId));
+  }
+
+  @Test
+  public void testAncestorOfWithRollBack() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+    table.refresh();
+    Long firstSnapshotId = table.currentSnapshot().snapshotId();
+    Long firstTimestamp = table.currentSnapshot().timestampMillis();
+    sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Long secondSnapshotId = table.currentSnapshot().snapshotId();
+    Long secondTimestamp = table.currentSnapshot().timestampMillis();
+    sql("INSERT INTO TABLE %s VALUES (3, 'c')", tableName);
+    table.refresh();
+    Long thirdSnapshotId = table.currentSnapshot().snapshotId();
+    Long thirdTimestamp = table.currentSnapshot().timestampMillis();
+
+    // roll back
+    sql("CALL %s.system.rollback_to_snapshot('%s', %dL)",
+        catalogName, tableIdent, secondSnapshotId);
+
+    sql("INSERT INTO TABLE %s VALUES (4, 'd')", tableName);
+    table.refresh();
+    Long fourthSnapshotId = table.currentSnapshot().snapshotId();
+    Long fourthTimestamp = table.currentSnapshot().timestampMillis();
+
+    assertEquals(
+        "Procedure output must match",
+        ImmutableList.of(
+            row(fourthSnapshotId, fourthTimestamp),
+            row(secondSnapshotId, secondTimestamp),
+            row(firstSnapshotId, firstTimestamp)),
+        sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, fourthSnapshotId));
+
+    assertEquals(
+        "Procedure output must match",
+        ImmutableList.of(
+            row(thirdSnapshotId, thirdTimestamp),
+            row(secondSnapshotId, secondTimestamp),
+            row(firstSnapshotId, firstTimestamp)),
+        sql("CALL %s.system.ancestors_of('%s', %dL)", catalogName, tableIdent, thirdSnapshotId));
+  }
+
+  @Test
+  public void testAncestorOfUsingNamedArgs() {
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Long firstSnapshotId = table.currentSnapshot().snapshotId();
+    Long firstTimestamp = table.currentSnapshot().timestampMillis();
+
+    assertEquals(
+        "Procedure output must match",
+        ImmutableList.of(row(firstSnapshotId, firstTimestamp)),
+        sql("CALL %s.system.ancestors_of(snapshot_id => %dL, table => '%s')",
+            catalogName, firstSnapshotId, tableIdent));
+  }
+
+  @Test
+  public void testInvalidAncestorOfCases() {
+    AssertHelpers.assertThrows("Should reject calls without all required args",
+        AnalysisException.class, "Missing required parameters",
+        () -> sql("CALL %s.system.ancestors_of()", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with empty table identifier",
+        IllegalArgumentException.class, "Cannot handle an empty identifier for argument table",
+        () -> sql("CALL %s.system.ancestors_of('')", catalogName));
+
+    AssertHelpers.assertThrows("Should reject calls with invalid arg types",
+        AnalysisException.class, "Wrong arg type for snapshot_id: cannot cast",
+        () -> sql("CALL %s.system.ancestors_of('%s', 1.1)", catalogName, tableIdent));
+  }
+}
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java
new file mode 100644
index 0000000..d912769
--- /dev/null
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/AncestorsOfProcedure.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.procedures;
+
+import java.util.List;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class AncestorsOfProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] {
+      ProcedureParameter.required("table", DataTypes.StringType),
+      ProcedureParameter.optional("snapshot_id", DataTypes.LongType),
+      };
+
+  private static final StructType OUTPUT_TYPE = new StructType(new StructField[] {
+      new StructField("snapshot_id", DataTypes.LongType, true, Metadata.empty()),
+      new StructField("timestamp", DataTypes.LongType, true, Metadata.empty())
+  });
+
+  private AncestorsOfProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<AncestorsOfProcedure>() {
+      @Override
+      protected AncestorsOfProcedure doBuild() {
+        return new AncestorsOfProcedure(tableCatalog());
+      }
+    };
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
+    Long toSnapshotId = args.isNullAt(1) ? null : args.getLong(1);
+
+    SparkTable sparkTable = loadSparkTable(tableIdent);
+    Table icebergTable = sparkTable.table();
+
+    if (toSnapshotId == null) {
+      toSnapshotId = icebergTable.currentSnapshot() != null ? icebergTable.currentSnapshot().snapshotId() : -1;
+    }
+
+    List<Long> snapshotIds = SnapshotUtil.snapshotIdsBetween(icebergTable, 0L, toSnapshotId);
+
+    return toOutputRow(icebergTable, snapshotIds);
+  }
+
+  @Override
+  public String description() {
+    return "AncestorsOf";
+  }
+
+  private InternalRow[] toOutputRow(Table table, List<Long> snapshotIds) {
+    if (snapshotIds.isEmpty()) {
+      return new InternalRow[0];
+    }
+
+    InternalRow[] internalRows = new InternalRow[snapshotIds.size()];
+    for (int i = 0; i < snapshotIds.size(); i++) {
+      Long snapshotId = snapshotIds.get(i);
+      internalRows[i] = newInternalRow(snapshotId, table.snapshot(snapshotId).timestampMillis());
+    }
+
+    return internalRows;
+  }
+}
diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index fc9d4ae..42545ab 100644
--- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -51,6 +51,7 @@ public class SparkProcedures {
     mapBuilder.put("migrate", MigrateTableProcedure::builder);
     mapBuilder.put("snapshot", SnapshotTableProcedure::builder);
     mapBuilder.put("add_files", AddFilesProcedure::builder);
+    mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
     return mapBuilder.build();
   }