You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/07/10 23:47:56 UTC
[iceberg] branch master updated: REST: Create commit catalog handler fix (#5235)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 643ecc2ca REST: Create commit catalog handler fix (#5235)
643ecc2ca is described below
commit 643ecc2ca777e8268b78adf4a77f5b4ff86fb03a
Author: Bryan Keller <br...@gmail.com>
AuthorDate: Sun Jul 10 16:47:51 2022 -0700
REST: Create commit catalog handler fix (#5235)
---
.../java/org/apache/iceberg/TableMetadata.java | 54 ++++++++++--
.../org/apache/iceberg/rest/CatalogHandlers.java | 6 +-
.../apache/iceberg/rest/RESTSessionCatalog.java | 8 +-
.../org/apache/iceberg/catalog/CatalogTests.java | 96 ++++++++++++++++++++++
.../rest/responses/TestLoadTableResponse.java | 1 +
5 files changed, 151 insertions(+), 14 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 7946ddc24..8c1d8f5db 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -120,13 +120,14 @@ public class TableMetadata implements Serializable {
// break existing tables.
MetricsConfig.fromProperties(properties).validateReferencedColumns(schema);
- return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
- INITIAL_SEQUENCE_NUMBER, System.currentTimeMillis(),
- lastColumnId.get(), freshSchema.schemaId(), ImmutableList.of(freshSchema),
- freshSpec.specId(), ImmutableList.of(freshSpec), freshSpec.lastAssignedFieldId(),
- freshSortOrderId, ImmutableList.of(freshSortOrder),
- ImmutableMap.copyOf(properties), -1, ImmutableList.of(),
- ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(), ImmutableList.of());
+ return new Builder()
+ .upgradeFormatVersion(formatVersion)
+ .setCurrentSchema(freshSchema, lastColumnId.get())
+ .setDefaultPartitionSpec(freshSpec)
+ .setDefaultSortOrder(freshSortOrder)
+ .setLocation(location)
+ .setProperties(properties)
+ .build();
}
public static class SnapshotLogEntry implements HistoryEntry {
@@ -755,6 +756,10 @@ public class TableMetadata implements Serializable {
return new Builder(base);
}
+ public static Builder buildFromEmpty() {
+ return new Builder();
+ }
+
public static class Builder {
private static final int LAST_ADDED = -1;
@@ -797,6 +802,28 @@ public class TableMetadata implements Serializable {
private final Map<Integer, PartitionSpec> specsById;
private final Map<Integer, SortOrder> sortOrdersById;
+ private Builder() {
+ this.base = null;
+ this.formatVersion = DEFAULT_TABLE_FORMAT_VERSION;
+ this.lastSequenceNumber = INITIAL_SEQUENCE_NUMBER;
+ this.uuid = UUID.randomUUID().toString();
+ this.schemas = Lists.newArrayList();
+ this.specs = Lists.newArrayList();
+ this.sortOrders = Lists.newArrayList();
+ this.properties = Maps.newHashMap();
+ this.snapshots = Lists.newArrayList();
+ this.currentSnapshotId = -1;
+ this.changes = Lists.newArrayList();
+ this.startingChangeCount = 0;
+ this.snapshotLog = Lists.newArrayList();
+ this.previousFiles = Lists.newArrayList();
+ this.refs = Maps.newHashMap();
+ this.snapshotsById = Maps.newHashMap();
+ this.schemasById = Maps.newHashMap();
+ this.specsById = Maps.newHashMap();
+ this.sortOrdersById = Maps.newHashMap();
+ }
+
private Builder(TableMetadata base) {
this.base = base;
this.formatVersion = base.formatVersion;
@@ -985,6 +1012,10 @@ public class TableMetadata implements Serializable {
return this;
}
+ ValidationException.check(!schemas.isEmpty(), "Attempting to add a snapshot before a schema is added");
+ ValidationException.check(!specs.isEmpty(), "Attempting to add a snapshot before a partition spec is added");
+ ValidationException.check(!sortOrders.isEmpty(), "Attempting to add a snapshot before a sort order is added");
+
ValidationException.check(!snapshotsById.containsKey(snapshot.snapshotId()),
"Snapshot already exists for id: %s",
snapshot.snapshotId());
@@ -1187,8 +1218,13 @@ public class TableMetadata implements Serializable {
PartitionSpec.checkCompatibility(specsById.get(defaultSpecId), schema);
SortOrder.checkCompatibility(sortOrdersById.get(defaultSortOrderId), schema);
- List<MetadataLogEntry> metadataHistory = addPreviousFile(
- previousFiles, previousFileLocation, base.lastUpdatedMillis(), properties);
+ List<MetadataLogEntry> metadataHistory;
+ if (base == null) {
+ metadataHistory = Lists.newArrayList();
+ } else {
+ metadataHistory = addPreviousFile(
+ previousFiles, previousFileLocation, base.lastUpdatedMillis(), properties);
+ }
List<HistoryEntry> newSnapshotLog = updateSnapshotLog(snapshotLog, snapshotsById, currentSnapshotId, changes);
return new TableMetadata(
diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index 19a308606..4a96a26b1 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -244,7 +244,7 @@ public class CatalogHandlers {
Transaction transaction = catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
if (transaction instanceof BaseTransaction) {
BaseTransaction baseTransaction = (BaseTransaction) transaction;
- finalMetadata = create(baseTransaction.underlyingOps(), baseTransaction.startMetadata(), request);
+ finalMetadata = create(baseTransaction.underlyingOps(), request);
} else {
throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTransaction");
}
@@ -283,11 +283,11 @@ public class CatalogHandlers {
return isCreate;
}
- private static TableMetadata create(TableOperations ops, TableMetadata start, UpdateTableRequest request) {
+ private static TableMetadata create(TableOperations ops, UpdateTableRequest request) {
// the only valid requirement is that the table will be created
request.requirements().forEach(requirement -> requirement.validate(ops.current()));
- TableMetadata.Builder builder = TableMetadata.buildFrom(start);
+ TableMetadata.Builder builder = TableMetadata.buildFromEmpty();
request.updates().forEach(update -> update.applyTo(builder));
// create transactions do not retry. if the table exists, retrying is not a solution
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 4a5c8a3a7..e232908c4 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -579,14 +579,18 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
PartitionSpec spec = meta.spec();
if (spec != null && spec.isPartitioned()) {
changes.add(new MetadataUpdate.AddPartitionSpec(spec));
- changes.add(new MetadataUpdate.SetDefaultPartitionSpec(-1));
+ } else {
+ changes.add(new MetadataUpdate.AddPartitionSpec(PartitionSpec.unpartitioned()));
}
+ changes.add(new MetadataUpdate.SetDefaultPartitionSpec(-1));
SortOrder order = meta.sortOrder();
if (order != null && order.isSorted()) {
changes.add(new MetadataUpdate.AddSortOrder(order));
- changes.add(new MetadataUpdate.SetDefaultSortOrder(-1));
+ } else {
+ changes.add(new MetadataUpdate.AddSortOrder(SortOrder.unsorted()));
}
+ changes.add(new MetadataUpdate.SetDefaultSortOrder(-1));
String location = meta.location();
if (location != null) {
diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
index b5a458ecd..fbc871831 100644
--- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
+++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
@@ -1276,6 +1276,79 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
Assert.assertEquals("Table location should match requested", "file:/tmp/ns/table", table.location());
}
assertFiles(table, FILE_A);
+ assertFilesPartitionSpec(table);
+ assertPreviousMetadataFileCount(table, 0);
+ }
+
+ @Test
+ public void testCompleteCreateTransactionMultipleSchemas() {
+ C catalog = catalog();
+
+ Map<String, String> properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19");
+ Transaction create = catalog.buildTable(TABLE, SCHEMA)
+ .withLocation("file:/tmp/ns/table")
+ .withPartitionSpec(SPEC)
+ .withSortOrder(WRITE_ORDER)
+ .withProperties(properties)
+ .createTransaction();
+
+ Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE));
+
+ create.newFastAppend().appendFile(FILE_A).commit();
+
+ UpdateSchema updateSchema = create.updateSchema().addColumn("new_col", Types.LongType.get());
+ Schema newSchema = updateSchema.apply();
+ updateSchema.commit();
+
+ UpdatePartitionSpec updateSpec = create.updateSpec().addField("new_col");
+ PartitionSpec newSpec = updateSpec.apply();
+ updateSpec.commit();
+
+ ReplaceSortOrder replaceSortOrder = create.replaceSortOrder().asc("new_col");
+ SortOrder newSortOrder = replaceSortOrder.apply();
+ replaceSortOrder.commit();
+
+ DataFile anotherFile = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-b.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("id_bucket=0/new_col=0") // easy way to set partition data for now
+ .withRecordCount(2) // needs at least one record or else metrics will filter it out
+ .build();
+
+ create.newFastAppend().appendFile(anotherFile).commit();
+
+ Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE));
+
+ create.commitTransaction();
+
+ Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE));
+ Table table = catalog.loadTable(TABLE);
+
+ // initial IDs taken from TableMetadata constants
+ final int initialSchemaId = 0;
+ final int initialSpecId = 0;
+ final int initialOrderId = 1;
+ final int updateSchemaId = initialSchemaId + 1;
+ final int updateSpecId = initialSpecId + 1;
+ final int updateOrderId = initialOrderId + 1;
+
+ Assert.assertEquals("Table schema should match the new schema",
+ newSchema.asStruct(), table.schema().asStruct());
+ Assert.assertEquals("Table schema should match the new schema ID",
+ updateSchemaId, table.schema().schemaId());
+ Assert.assertEquals("Table should have updated partition spec", newSpec.fields(), table.spec().fields());
+ Assert.assertEquals("Table should have updated partition spec ID", updateSpecId, table.spec().specId());
+ Assert.assertEquals("Table should have updated sort order", newSortOrder.fields(), table.sortOrder().fields());
+ Assert.assertEquals("Table should have updated sort order ID", updateOrderId, table.sortOrder().orderId());
+ Assert.assertEquals("Table properties should be a superset of the requested properties",
+ properties.entrySet(),
+ Sets.intersection(properties.entrySet(), table.properties().entrySet()));
+ if (!overridesRequestedLocation()) {
+ Assert.assertEquals("Table location should match requested", "file:/tmp/ns/table", table.location());
+ }
+ assertFiles(table, FILE_A, anotherFile);
+ assertFilePartitionSpec(table, FILE_A, initialSpecId);
+ assertFilePartitionSpec(table, anotherFile, updateSpecId);
assertPreviousMetadataFileCount(table, 0);
}
@@ -1322,6 +1395,7 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
Assert.assertEquals("Table location should match requested", "file:/tmp/ns/table", table.location());
}
assertFiles(table, FILE_A);
+ assertFilesPartitionSpec(table);
assertPreviousMetadataFileCount(table, 0);
}
@@ -1412,6 +1486,7 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
Assert.assertEquals("Table location should match requested", "file:/tmp/ns/table", table.location());
}
assertFiles(table, FILE_A);
+ assertFilesPartitionSpec(table);
assertPreviousMetadataFileCount(table, 0);
}
@@ -2069,6 +2144,27 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
}
}
+ public void assertFilePartitionSpec(Table table, DataFile dataFile, int specId) {
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ Streams.stream(tasks)
+ .map(FileScanTask::file)
+ .filter(file -> file.path().equals(dataFile.path()))
+ .forEach(file -> Assert.assertEquals(specId, file.specId()));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public void assertFilesPartitionSpec(Table table) {
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ Streams.stream(tasks)
+ .map(FileScanTask::file)
+ .forEach(file -> Assert.assertEquals(table.spec().specId(), file.specId()));
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
private List<Namespace> concat(List<Namespace> starting, Namespace... additional) {
List<Namespace> namespaces = Lists.newArrayList();
namespaces.addAll(starting);
diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponse.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponse.java
index 34ae22f10..3fa1bdb0b 100644
--- a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponse.java
+++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponse.java
@@ -78,6 +78,7 @@ public class TestLoadTableResponse extends RequestResponseTestBase<LoadTableResp
TableMetadata
.buildFrom(
TableMetadata.newTableMetadata(SCHEMA_7, SPEC_5, SORT_ORDER_3, TEST_TABLE_LOCATION, TABLE_PROPS))
+ .discardChanges()
.withMetadataLocation(TEST_METADATA_LOCATION)
.build();