You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2022/07/22 00:37:52 UTC
[iceberg] branch master updated: Core: Add MetadataLogs metadata table (#5063)
This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new ae96bdfef3 Core: Add MetadataLogs metadata table (#5063)
ae96bdfef3 is described below
commit ae96bdfef3099f8f24347204823d786444e0b432
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Fri Jul 22 06:07:45 2022 +0530
Core: Add MetadataLogs metadata table (#5063)
---
.../java/org/apache/iceberg/MetadataLogsTable.java | 115 +++++++++++++++++++++
.../java/org/apache/iceberg/MetadataTableType.java | 1 +
.../org/apache/iceberg/MetadataTableUtils.java | 2 +
.../spark/extensions/TestMetadataTables.java | 87 ++++++++++++++++
.../spark/extensions/TestMetadataTables.java | 87 ++++++++++++++++
5 files changed, 292 insertions(+)
diff --git a/core/src/main/java/org/apache/iceberg/MetadataLogsTable.java b/core/src/main/java/org/apache/iceberg/MetadataLogsTable.java
new file mode 100644
index 0000000000..4f29894ed9
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/MetadataLogsTable.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.util.List;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.SnapshotUtil;
+
+public class MetadataLogsTable extends BaseMetadataTable {
+
+ private static final Schema METADATA_LOGS_SCHEMA = new Schema(
+ Types.NestedField.required(1, "timestamp_millis", Types.LongType.get()),
+ Types.NestedField.required(2, "file", Types.StringType.get()),
+ Types.NestedField.optional(3, "latest_snapshot_id", Types.LongType.get()),
+ Types.NestedField.optional(4, "latest_schema_id", Types.IntegerType.get()),
+ Types.NestedField.optional(5, "latest_sequence_number", Types.LongType.get())
+ );
+
+ MetadataLogsTable(TableOperations ops, Table table) {
+ this(ops, table, table.name() + ".metadata_logs");
+ }
+
+ MetadataLogsTable(TableOperations ops, Table table, String name) {
+ super(ops, table, name);
+ }
+
+ @Override
+ MetadataTableType metadataTableType() {
+ return MetadataTableType.METADATA_LOGS;
+ }
+
+ @Override
+ public TableScan newScan() {
+ return new MetadataLogScan(operations(), table());
+ }
+
+ @Override
+ public Schema schema() {
+ return METADATA_LOGS_SCHEMA;
+ }
+
+ private DataTask task(TableScan scan) {
+ TableOperations ops = operations();
+ List<TableMetadata.MetadataLogEntry> metadataLogEntries =
+ Lists.newArrayList(ops.current().previousFiles().listIterator());
+ metadataLogEntries.add(new TableMetadata.MetadataLogEntry(ops.current().lastUpdatedMillis(),
+ ops.current().metadataFileLocation()));
+ return StaticDataTask.of(
+ ops.io().newInputFile(ops.current().metadataFileLocation()),
+ schema(),
+ scan.schema(),
+ metadataLogEntries,
+ metadataLogEntry -> MetadataLogsTable.metadataLogToRow(metadataLogEntry, table())
+ );
+ }
+
+ private class MetadataLogScan extends StaticTableScan {
+ MetadataLogScan(TableOperations ops, Table table) {
+ super(ops, table, METADATA_LOGS_SCHEMA, MetadataTableType.METADATA_LOGS, MetadataLogsTable.this::task);
+ }
+
+ MetadataLogScan(TableOperations ops, Table table, TableScanContext context) {
+ super(ops, table, METADATA_LOGS_SCHEMA, MetadataTableType.METADATA_LOGS, MetadataLogsTable.this::task, context);
+ }
+
+ @Override
+ protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
+ return new MetadataLogScan(ops, table, context);
+ }
+
+ @Override
+ public CloseableIterable<FileScanTask> planFiles() {
+ return CloseableIterable.withNoopClose(MetadataLogsTable.this.task(this));
+ }
+ }
+
+ private static StaticDataTask.Row metadataLogToRow(TableMetadata.MetadataLogEntry metadataLogEntry, Table table) {
+ Long latestSnapshotId = null;
+ Snapshot latestSnapshot = null;
+ try {
+ latestSnapshotId = SnapshotUtil.snapshotIdAsOfTime(table, metadataLogEntry.timestampMillis());
+ latestSnapshot = table.snapshot(latestSnapshotId);
+ } catch (IllegalArgumentException ignored) {
+ // implies this metadata file was created at table creation
+ }
+
+ return StaticDataTask.Row.of(
+ metadataLogEntry.timestampMillis(),
+ metadataLogEntry.file(),
+ // latest snapshot in this file corresponding to the log entry
+ latestSnapshotId,
+ latestSnapshot != null ? latestSnapshot.schemaId() : null,
+ latestSnapshot != null ? latestSnapshot.sequenceNumber() : null
+ );
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableType.java b/core/src/main/java/org/apache/iceberg/MetadataTableType.java
index c1c4bf9c75..ce3ee2202b 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataTableType.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataTableType.java
@@ -27,6 +27,7 @@ public enum MetadataTableType {
DATA_FILES,
DELETE_FILES,
HISTORY,
+ METADATA_LOGS,
SNAPSHOTS,
MANIFESTS,
PARTITIONS,
diff --git a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
index 847b415178..7246d18a31 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataTableUtils.java
@@ -64,6 +64,8 @@ public class MetadataTableUtils {
return new HistoryTable(ops, baseTable, metadataTableName);
case SNAPSHOTS:
return new SnapshotsTable(ops, baseTable, metadataTableName);
+ case METADATA_LOGS:
+ return new MetadataLogsTable(ops, baseTable, metadataTableName);
case MANIFESTS:
return new ManifestsTable(ops, baseTable, metadataTableName);
case PARTITIONS:
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index 60c57e12a3..c33cbfcce0 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -28,12 +28,16 @@ import java.util.stream.Stream;
import org.apache.avro.generic.GenericData.Record;
import org.apache.commons.collections.ListUtils;
import org.apache.iceberg.FileContent;
+import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.data.TestHelpers;
@@ -319,6 +323,89 @@ public class TestMetadataTables extends SparkExtensionsTestBase {
TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles, actualFiles);
}
+ @Test
+ public void testMetadataLogs() throws Exception {
+ // Create table and insert data
+ sql("CREATE TABLE %s (id bigint, data string) " +
+ "USING iceberg " +
+ "PARTITIONED BY (data) " +
+ "TBLPROPERTIES " +
+ "('format-version'='2')", tableName);
+
+ List<SimpleRecord> recordsA = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "a")
+ );
+ spark.createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+ .writeTo(tableName)
+ .append();
+
+ List<SimpleRecord> recordsB = Lists.newArrayList(
+ new SimpleRecord(1, "b"),
+ new SimpleRecord(2, "b")
+ );
+ spark.createDataset(recordsB, Encoders.bean(SimpleRecord.class))
+ .writeTo(tableName)
+ .append();
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ Long currentSnapshotId = table.currentSnapshot().snapshotId();
+ TableMetadata tableMetadata = ((HasTableOperations) table).operations().current();
+ Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+ Snapshot parentSnapshot = table.snapshot(currentSnapshot.parentId());
+ List<TableMetadata.MetadataLogEntry> metadataLogEntries = Lists.newArrayList(tableMetadata.previousFiles());
+
+ // Check metadataLog table
+ List<Object[]> metadataLogs = sql("SELECT * FROM %s.metadata_logs", tableName);
+ assertEquals("MetadataLogsTable result should match the metadataLog entries",
+ ImmutableList.of(
+ row(
+ metadataLogEntries.get(0).timestampMillis(),
+ metadataLogEntries.get(0).file(),
+ null,
+ null,
+ null
+ ),
+ row(
+ metadataLogEntries.get(1).timestampMillis(),
+ metadataLogEntries.get(1).file(),
+ parentSnapshot.snapshotId(),
+ parentSnapshot.schemaId(),
+ parentSnapshot.sequenceNumber()
+ ),
+ row(
+ currentSnapshot.timestampMillis(),
+ tableMetadata.metadataFileLocation(),
+ currentSnapshot.snapshotId(),
+ currentSnapshot.schemaId(),
+ currentSnapshot.sequenceNumber()
+ )),
+ metadataLogs);
+
+ // test filtering
+ List<Object[]> metadataLogWithFilters =
+ sql("SELECT * FROM %s.metadata_logs WHERE latest_snapshot_id = %s", tableName, currentSnapshotId);
+ Assert.assertEquals("metadataLog table should return 1 row", 1, metadataLogWithFilters.size());
+ assertEquals("Result should match the latest snapshot entry",
+ ImmutableList.of(row(
+ tableMetadata.currentSnapshot().timestampMillis(),
+ tableMetadata.metadataFileLocation(),
+ tableMetadata.currentSnapshot().snapshotId(),
+ tableMetadata.currentSnapshot().schemaId(),
+ tableMetadata.currentSnapshot().sequenceNumber())),
+ metadataLogWithFilters);
+
+ // test projection
+ List<String> metadataFiles =
+ metadataLogEntries.stream().map(TableMetadata.MetadataLogEntry::file).collect(Collectors.toList());
+ metadataFiles.add(tableMetadata.metadataFileLocation());
+ List<Object[]> metadataLogWithProjection = sql("SELECT file FROM %s.metadata_logs", tableName);
+ Assert.assertEquals("metadataLog table should return 3 rows", 3, metadataLogWithProjection.size());
+ assertEquals("metadataLog entry should be of same file",
+ metadataFiles.stream().map(this::row).collect(Collectors.toList()),
+ metadataLogWithProjection);
+ }
+
/**
* Find matching manifest entries of an Iceberg table
* @param table iceberg table
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index 60c57e12a3..c33cbfcce0 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -28,12 +28,16 @@ import java.util.stream.Stream;
import org.apache.avro.generic.GenericData.Record;
import org.apache.commons.collections.ListUtils;
import org.apache.iceberg.FileContent;
+import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.data.TestHelpers;
@@ -319,6 +323,89 @@ public class TestMetadataTables extends SparkExtensionsTestBase {
TestHelpers.assertEqualsSafe(filesTableSchema.asStruct(), expectedFiles, actualFiles);
}
+ @Test
+ public void testMetadataLogs() throws Exception {
+ // Create table and insert data
+ sql("CREATE TABLE %s (id bigint, data string) " +
+ "USING iceberg " +
+ "PARTITIONED BY (data) " +
+ "TBLPROPERTIES " +
+ "('format-version'='2')", tableName);
+
+ List<SimpleRecord> recordsA = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "a")
+ );
+ spark.createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+ .writeTo(tableName)
+ .append();
+
+ List<SimpleRecord> recordsB = Lists.newArrayList(
+ new SimpleRecord(1, "b"),
+ new SimpleRecord(2, "b")
+ );
+ spark.createDataset(recordsB, Encoders.bean(SimpleRecord.class))
+ .writeTo(tableName)
+ .append();
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ Long currentSnapshotId = table.currentSnapshot().snapshotId();
+ TableMetadata tableMetadata = ((HasTableOperations) table).operations().current();
+ Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+ Snapshot parentSnapshot = table.snapshot(currentSnapshot.parentId());
+ List<TableMetadata.MetadataLogEntry> metadataLogEntries = Lists.newArrayList(tableMetadata.previousFiles());
+
+ // Check metadataLog table
+ List<Object[]> metadataLogs = sql("SELECT * FROM %s.metadata_logs", tableName);
+ assertEquals("MetadataLogsTable result should match the metadataLog entries",
+ ImmutableList.of(
+ row(
+ metadataLogEntries.get(0).timestampMillis(),
+ metadataLogEntries.get(0).file(),
+ null,
+ null,
+ null
+ ),
+ row(
+ metadataLogEntries.get(1).timestampMillis(),
+ metadataLogEntries.get(1).file(),
+ parentSnapshot.snapshotId(),
+ parentSnapshot.schemaId(),
+ parentSnapshot.sequenceNumber()
+ ),
+ row(
+ currentSnapshot.timestampMillis(),
+ tableMetadata.metadataFileLocation(),
+ currentSnapshot.snapshotId(),
+ currentSnapshot.schemaId(),
+ currentSnapshot.sequenceNumber()
+ )),
+ metadataLogs);
+
+ // test filtering
+ List<Object[]> metadataLogWithFilters =
+ sql("SELECT * FROM %s.metadata_logs WHERE latest_snapshot_id = %s", tableName, currentSnapshotId);
+ Assert.assertEquals("metadataLog table should return 1 row", 1, metadataLogWithFilters.size());
+ assertEquals("Result should match the latest snapshot entry",
+ ImmutableList.of(row(
+ tableMetadata.currentSnapshot().timestampMillis(),
+ tableMetadata.metadataFileLocation(),
+ tableMetadata.currentSnapshot().snapshotId(),
+ tableMetadata.currentSnapshot().schemaId(),
+ tableMetadata.currentSnapshot().sequenceNumber())),
+ metadataLogWithFilters);
+
+ // test projection
+ List<String> metadataFiles =
+ metadataLogEntries.stream().map(TableMetadata.MetadataLogEntry::file).collect(Collectors.toList());
+ metadataFiles.add(tableMetadata.metadataFileLocation());
+ List<Object[]> metadataLogWithProjection = sql("SELECT file FROM %s.metadata_logs", tableName);
+ Assert.assertEquals("metadataLog table should return 3 rows", 3, metadataLogWithProjection.size());
+ assertEquals("metadataLog entry should be of same file",
+ metadataFiles.stream().map(this::row).collect(Collectors.toList()),
+ metadataLogWithProjection);
+ }
+
/**
* Find matching manifest entries of an Iceberg table
* @param table iceberg table