You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2022/07/22 00:37:52 UTC

[iceberg] branch master updated: Core: Add MetadataLogs metadata table (#5063)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ae96bdfef3 Core: Add MetadataLogs metadata table (#5063)
ae96bdfef3 is described below

commit ae96bdfef3099f8f24347204823d786444e0b432
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Fri Jul 22 06:07:45 2022 +0530

    Core: Add MetadataLogs metadata table (#5063)
---
 .../java/org/apache/iceberg/MetadataLogsTable.java | 115 +++++++++++++++++++++
 .../java/org/apache/iceberg/MetadataTableType.java |   1 +
 .../org/apache/iceberg/MetadataTableUtils.java     |   2 +
 .../spark/extensions/TestMetadataTables.java       |  87 ++++++++++++++++
 .../spark/extensions/TestMetadataTables.java       |  87 ++++++++++++++++
 5 files changed, 292 insertions(+)

diff --git a/core/src/main/java/org/apache/iceberg/MetadataLogsTable.java b/core/src/main/java/org/apache/iceberg/MetadataLogsTable.java
new file mode 100644
index 0000000000..4f29894ed9
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/MetadataLogsTable.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+
+public class MetadataLogsTable extends BaseMetadataTable {
+
+  private static final Schema METADATA_LOGS_SCHEMA = new Schema(
+      Types.NestedField.required(1, "timestamp_millis", Types.LongType.get()),
+      Types.NestedField.required(2, "file", Types.StringType.get()),
+      Types.NestedField.optional(3, "latest_snapshot_id", Types.LongType.get()),
+      Types.NestedField.optional(4, "latest_schema_id", Types.IntegerType.get()),
+      Types.NestedField.optional(5, "latest_sequence_number", Types.LongType.get())
+  );
+
+  MetadataLogsTable(TableOperations ops, Table table) {
+    this(ops, table, table.name() + ".metadata_logs");
+  }
+
+  MetadataLogsTable(TableOperations ops, Table table, String name) {
+    super(ops, table, name);
+  }
+
+  @Override
+  MetadataTableType metadataTableType() {
+    return MetadataTableType.METADATA_LOGS;
+  }
+
+  @Override
+  public TableScan newScan() {
+    return new MetadataLogScan(operations(), table());
+  }
+
+  @Override
+  public Schema schema() {
+    return METADATA_LOGS_SCHEMA;
+  }
+
+  private DataTask task(TableScan scan) {
+    TableOperations ops = operations();
+    List<TableMetadata.MetadataLogEntry> metadataLogEntries =
+        Lists.newArrayList(ops.current().previousFiles().listIterator());
+    metadataLogEntries.add(new TableMetadata.MetadataLogEntry(ops.current().lastUpdatedMillis(),
+        ops.current().metadataFileLocation()));
+    return StaticDataTask.of(
+        ops.io().newInputFile(ops.current().metadataFileLocation()),
+        schema(),
+        scan.schema(),
+        metadataLogEntries,
+        metadataLogEntry -> MetadataLogsTable.metadataLogToRow(metadataLogEntry, table())
+    );
+  }
+
+  private class MetadataLogScan extends StaticTableScan {
+    MetadataLogScan(TableOperations ops, Table table) {
+      super(ops, table, METADATA_LOGS_SCHEMA, MetadataTableType.METADATA_LOGS, MetadataLogsTable.this::task);
+    }
+
+    MetadataLogScan(TableOperations ops, Table table, TableScanContext context) {
+      super(ops, table, METADATA_LOGS_SCHEMA, MetadataTableType.METADATA_LOGS, MetadataLogsTable.this::task, context);
+    }
+
+    @Override
+    protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+      return new MetadataLogScan(ops, table, context);
+    }
+
+    @Override
+    public CloseableIterable<FileScanTask> planFiles() {
+      return CloseableIterable.withNoopClose(MetadataLogsTable.this.task(this));
+    }
+  }
+
+  private static StaticDataTask.Row metadataLogToRow(TableMetadata.MetadataLogEntry metadataLogEntry, Table table) {
+    Long latestSnapshotId = null;
+    Snapshot latestSnapshot = null;
+    try {
+      latestSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, metadataLogEntry.timestampMillis());
+      latestSnapshot = table.snapshot(latestSnapshotId);
+    } catch (IllegalArgumentException ignored) {
+      // implies this metadata file was created at table creation
+    }
+
+    return StaticDataTask.Row.of(
+        metadataLogEntry.timestampMillis(),
+        metadataLogEntry.file(),
+        // latest snapshot in this file corresponding to the log entry
+        latestSnapshotId,
+        latestSnapshot != null ? latestSnapshot.schemaId() : null,
+        latestSnapshot != null ? latestSnapshot.sequenceNumber() : null
+    );
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableType.java b/core/src/main/java/org/apache/iceberg/MetadataTableType.java
index c1c4bf9c75..ce3ee2202b 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataTableType.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataTableType.java
@@ -27,6 +27,7 @@ public enum MetadataTableType {
   DATA_FILES,
   DELETE_FILES,
   HISTORY,
+  METADATA_LOGS,
   SNAPSHOTS,
   MANIFESTS,
   PARTITIONS,
diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
index 847b415178..7246d18a31 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
@@ -64,6 +64,8 @@ public class MetadataTableUtils {
         return new HistoryTable(ops, baseTable, metadataTableName);
       case SNAPSHOTS:
         return new SnapshotsTable(ops, baseTable, metadataTableName);
+      case METADATA_LOGS:
+        return new MetadataLogsTable(ops, baseTable, metadataTableName);
       case MANIFESTS:
         return new ManifestsTable(ops, baseTable, metadataTableName);
       case PARTITIONS:
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index 60c57e12a3..c33cbfcce0 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -28,12 +28,16 @@ import java.util.stream.Stream;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.commons.collections.ListUtils;
 import org.apache.iceberg.FileContent;
+import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.data.TestHelpers;
@@ -319,6 +323,89 @@ public class TestMetadataTables extends SparkExtensionsTestBase {
     TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles, actualFiles);
   }
 
+  @Test
+  public void testMetadataLogs() throws Exception {
+    // Create table and insert data
+    sql("CREATE TABLE %s (id bigint, data string) " +
+        "USING iceberg " +
+        "PARTITIONED BY (data) " +
+        "TBLPROPERTIES " +
+        "('format-version'='2')", tableName);
+
+    List<SimpleRecord> recordsA = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "a")
+    );
+    spark.createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+        .writeTo(tableName)
+        .append();
+
+    List<SimpleRecord> recordsB = Lists.newArrayList(
+        new SimpleRecord(1, "b"),
+        new SimpleRecord(2, "b")
+    );
+    spark.createDataset(recordsB, Encoders.bean(SimpleRecord.class))
+        .writeTo(tableName)
+        .append();
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    Long currentSnapshotId = table.currentSnapshot().snapshotId();
+    TableMetadata tableMetadata = ((HasTableOperations) table).operations().current();
+    Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+    Snapshot parentSnapshot = table.snapshot(currentSnapshot.parentId());
+    List<TableMetadata.MetadataLogEntry> metadataLogEntries = Lists.newArrayList(tableMetadata.previousFiles());
+
+    // Check metadataLog table
+    List<Object[]> metadataLogs = sql("SELECT * FROM %s.metadata_logs", tableName);
+    assertEquals("MetadataLogsTable result should match the metadataLog entries",
+        ImmutableList.of(
+            row(
+                metadataLogEntries.get(0).timestampMillis(),
+                metadataLogEntries.get(0).file(),
+                null,
+                null,
+                null
+            ),
+            row(
+                metadataLogEntries.get(1).timestampMillis(),
+                metadataLogEntries.get(1).file(),
+                parentSnapshot.snapshotId(),
+                parentSnapshot.schemaId(),
+                parentSnapshot.sequenceNumber()
+            ),
+            row(
+                currentSnapshot.timestampMillis(),
+                tableMetadata.metadataFileLocation(),
+                currentSnapshot.snapshotId(),
+                currentSnapshot.schemaId(),
+                currentSnapshot.sequenceNumber()
+            )),
+        metadataLogs);
+
+    // test filtering
+    List<Object[]> metadataLogWithFilters =
+        sql("SELECT * FROM %s.metadata_logs WHERE latest_snapshot_id = %s", tableName, currentSnapshotId);
+    Assert.assertEquals("metadataLog table should return 1 row", 1, metadataLogWithFilters.size());
+    assertEquals("Result should match the latest snapshot entry",
+        ImmutableList.of(row(
+            tableMetadata.currentSnapshot().timestampMillis(),
+            tableMetadata.metadataFileLocation(),
+            tableMetadata.currentSnapshot().snapshotId(),
+            tableMetadata.currentSnapshot().schemaId(),
+            tableMetadata.currentSnapshot().sequenceNumber())),
+        metadataLogWithFilters);
+
+    // test projection
+    List<String> metadataFiles =
+        metadataLogEntries.stream().map(TableMetadata.MetadataLogEntry::file).collect(Collectors.toList());
+    metadataFiles.add(tableMetadata.metadataFileLocation());
+    List<Object[]> metadataLogWithProjection = sql("SELECT file FROM %s.metadata_logs", tableName);
+    Assert.assertEquals("metadataLog table should return 3 rows", 3, metadataLogWithProjection.size());
+    assertEquals("metadataLog entry should be of same file",
+        metadataFiles.stream().map(this::row).collect(Collectors.toList()),
+        metadataLogWithProjection);
+  }
+
   /**
    * Find matching manifest entries of an Iceberg table
    * @param table iceberg table
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index 60c57e12a3..c33cbfcce0 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -28,12 +28,16 @@ import java.util.stream.Stream;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.commons.collections.ListUtils;
 import org.apache.iceberg.FileContent;
+import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.data.TestHelpers;
@@ -319,6 +323,89 @@ public class TestMetadataTables extends SparkExtensionsTestBase {
     TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles, actualFiles);
   }
 
+  @Test
+  public void testMetadataLogs() throws Exception {
+    // Create table and insert data
+    sql("CREATE TABLE %s (id bigint, data string) " +
+        "USING iceberg " +
+        "PARTITIONED BY (data) " +
+        "TBLPROPERTIES " +
+        "('format-version'='2')", tableName);
+
+    List<SimpleRecord> recordsA = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "a")
+    );
+    spark.createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+        .writeTo(tableName)
+        .append();
+
+    List<SimpleRecord> recordsB = Lists.newArrayList(
+        new SimpleRecord(1, "b"),
+        new SimpleRecord(2, "b")
+    );
+    spark.createDataset(recordsB, Encoders.bean(SimpleRecord.class))
+        .writeTo(tableName)
+        .append();
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+    Long currentSnapshotId = table.currentSnapshot().snapshotId();
+    TableMetadata tableMetadata = ((HasTableOperations) table).operations().current();
+    Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+    Snapshot parentSnapshot = table.snapshot(currentSnapshot.parentId());
+    List<TableMetadata.MetadataLogEntry> metadataLogEntries = Lists.newArrayList(tableMetadata.previousFiles());
+
+    // Check metadataLog table
+    List<Object[]> metadataLogs = sql("SELECT * FROM %s.metadata_logs", tableName);
+    assertEquals("MetadataLogsTable result should match the metadataLog entries",
+        ImmutableList.of(
+            row(
+                metadataLogEntries.get(0).timestampMillis(),
+                metadataLogEntries.get(0).file(),
+                null,
+                null,
+                null
+            ),
+            row(
+                metadataLogEntries.get(1).timestampMillis(),
+                metadataLogEntries.get(1).file(),
+                parentSnapshot.snapshotId(),
+                parentSnapshot.schemaId(),
+                parentSnapshot.sequenceNumber()
+            ),
+            row(
+                currentSnapshot.timestampMillis(),
+                tableMetadata.metadataFileLocation(),
+                currentSnapshot.snapshotId(),
+                currentSnapshot.schemaId(),
+                currentSnapshot.sequenceNumber()
+            )),
+        metadataLogs);
+
+    // test filtering
+    List<Object[]> metadataLogWithFilters =
+        sql("SELECT * FROM %s.metadata_logs WHERE latest_snapshot_id = %s", tableName, currentSnapshotId);
+    Assert.assertEquals("metadataLog table should return 1 row", 1, metadataLogWithFilters.size());
+    assertEquals("Result should match the latest snapshot entry",
+        ImmutableList.of(row(
+            tableMetadata.currentSnapshot().timestampMillis(),
+            tableMetadata.metadataFileLocation(),
+            tableMetadata.currentSnapshot().snapshotId(),
+            tableMetadata.currentSnapshot().schemaId(),
+            tableMetadata.currentSnapshot().sequenceNumber())),
+        metadataLogWithFilters);
+
+    // test projection
+    List<String> metadataFiles =
+        metadataLogEntries.stream().map(TableMetadata.MetadataLogEntry::file).collect(Collectors.toList());
+    metadataFiles.add(tableMetadata.metadataFileLocation());
+    List<Object[]> metadataLogWithProjection = sql("SELECT file FROM %s.metadata_logs", tableName);
+    Assert.assertEquals("metadataLog table should return 3 rows", 3, metadataLogWithProjection.size());
+    assertEquals("metadataLog entry should be of same file",
+        metadataFiles.stream().map(this::row).collect(Collectors.toList()),
+        metadataLogWithProjection);
+  }
+
   /**
    * Find matching manifest entries of an Iceberg table
    * @param table iceberg table