You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/07/10 23:47:56 UTC

[iceberg] branch master updated: REST: Create commit catalog handler fix (#5235)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 643ecc2ca REST: Create commit catalog handler fix (#5235)
643ecc2ca is described below

commit 643ecc2ca777e8268b78adf4a77f5b4ff86fb03a
Author: Bryan Keller <br...@gmail.com>
AuthorDate: Sun Jul 10 16:47:51 2022 -0700

    REST: Create commit catalog handler fix (#5235)
---
 .../java/org/apache/iceberg/TableMetadata.java     | 54 ++++++++++--
 .../org/apache/iceberg/rest/CatalogHandlers.java   |  6 +-
 .../apache/iceberg/rest/RESTSessionCatalog.java    |  8 +-
 .../org/apache/iceberg/catalog/CatalogTests.java   | 96 ++++++++++++++++++++++
 .../rest/responses/TestLoadTableResponse.java      |  1 +
 5 files changed, 151 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 7946ddc24..8c1d8f5db 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -120,13 +120,14 @@ public class TableMetadata implements Serializable {
     // break existing tables.
     MetricsConfig.fromProperties(properties).validateReferencedColumns(schema);
 
-    return new TableMetadata(null, formatVersion, UUID.randomUUID().toString(), location,
-        INITIAL_SEQUENCE_NUMBER, System.currentTimeMillis(),
-        lastColumnId.get(), freshSchema.schemaId(), ImmutableList.of(freshSchema),
-        freshSpec.specId(), ImmutableList.of(freshSpec), freshSpec.lastAssignedFieldId(),
-        freshSortOrderId, ImmutableList.of(freshSortOrder),
-        ImmutableMap.copyOf(properties), -1, ImmutableList.of(),
-        ImmutableList.of(), ImmutableList.of(), ImmutableMap.of(), ImmutableList.of());
+    return new Builder()
+        .upgradeFormatVersion(formatVersion)
+        .setCurrentSchema(freshSchema, lastColumnId.get())
+        .setDefaultPartitionSpec(freshSpec)
+        .setDefaultSortOrder(freshSortOrder)
+        .setLocation(location)
+        .setProperties(properties)
+        .build();
   }
 
   public static class SnapshotLogEntry implements HistoryEntry {
@@ -755,6 +756,10 @@ public class TableMetadata implements Serializable {
     return new Builder(base);
   }
 
+  public static Builder buildFromEmpty() {
+    return new Builder();
+  }
+
   public static class Builder {
     private static final int LAST_ADDED = -1;
 
@@ -797,6 +802,28 @@ public class TableMetadata implements Serializable {
     private final Map<Integer, PartitionSpec> specsById;
     private final Map<Integer, SortOrder> sortOrdersById;
 
+    private Builder() {
+      this.base = null;
+      this.formatVersion = DEFAULT_TABLE_FORMAT_VERSION;
+      this.lastSequenceNumber = INITIAL_SEQUENCE_NUMBER;
+      this.uuid = UUID.randomUUID().toString();
+      this.schemas = Lists.newArrayList();
+      this.specs = Lists.newArrayList();
+      this.sortOrders = Lists.newArrayList();
+      this.properties = Maps.newHashMap();
+      this.snapshots = Lists.newArrayList();
+      this.currentSnapshotId = -1;
+      this.changes = Lists.newArrayList();
+      this.startingChangeCount = 0;
+      this.snapshotLog = Lists.newArrayList();
+      this.previousFiles = Lists.newArrayList();
+      this.refs = Maps.newHashMap();
+      this.snapshotsById = Maps.newHashMap();
+      this.schemasById = Maps.newHashMap();
+      this.specsById = Maps.newHashMap();
+      this.sortOrdersById = Maps.newHashMap();
+    }
+
     private Builder(TableMetadata base) {
       this.base = base;
       this.formatVersion = base.formatVersion;
@@ -985,6 +1012,10 @@ public class TableMetadata implements Serializable {
         return this;
       }
 
+      ValidationException.check(!schemas.isEmpty(), "Attempting to add a snapshot before a schema is added");
+      ValidationException.check(!specs.isEmpty(), "Attempting to add a snapshot before a partition spec is added");
+      ValidationException.check(!sortOrders.isEmpty(), "Attempting to add a snapshot before a sort order is added");
+
       ValidationException.check(!snapshotsById.containsKey(snapshot.snapshotId()),
           "Snapshot already exists for id: %s",
           snapshot.snapshotId());
@@ -1187,8 +1218,13 @@ public class TableMetadata implements Serializable {
       PartitionSpec.checkCompatibility(specsById.get(defaultSpecId), schema);
       SortOrder.checkCompatibility(sortOrdersById.get(defaultSortOrderId), schema);
 
-      List<MetadataLogEntry> metadataHistory = addPreviousFile(
-          previousFiles, previousFileLocation, base.lastUpdatedMillis(), properties);
+      List<MetadataLogEntry> metadataHistory;
+      if (base == null) {
+        metadataHistory = Lists.newArrayList();
+      } else {
+        metadataHistory = addPreviousFile(
+            previousFiles, previousFileLocation, base.lastUpdatedMillis(), properties);
+      }
       List<HistoryEntry> newSnapshotLog = updateSnapshotLog(snapshotLog, snapshotsById, currentSnapshotId, changes);
 
       return new TableMetadata(
diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index 19a308606..4a96a26b1 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -244,7 +244,7 @@ public class CatalogHandlers {
       Transaction transaction = catalog.buildTable(ident, EMPTY_SCHEMA).createOrReplaceTransaction();
       if (transaction instanceof BaseTransaction) {
         BaseTransaction baseTransaction = (BaseTransaction) transaction;
-        finalMetadata = create(baseTransaction.underlyingOps(), baseTransaction.startMetadata(), request);
+        finalMetadata = create(baseTransaction.underlyingOps(), request);
       } else {
         throw new IllegalStateException("Cannot wrap catalog that does not produce BaseTransaction");
       }
@@ -283,11 +283,11 @@ public class CatalogHandlers {
     return isCreate;
   }
 
-  private static TableMetadata create(TableOperations ops, TableMetadata start, UpdateTableRequest request) {
+  private static TableMetadata create(TableOperations ops, UpdateTableRequest request) {
     // the only valid requirement is that the table will be created
     request.requirements().forEach(requirement -> requirement.validate(ops.current()));
 
-    TableMetadata.Builder builder = TableMetadata.buildFrom(start);
+    TableMetadata.Builder builder = TableMetadata.buildFromEmpty();
     request.updates().forEach(update -> update.applyTo(builder));
 
     // create transactions do not retry. if the table exists, retrying is not a solution
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 4a5c8a3a7..e232908c4 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -579,14 +579,18 @@ public class RESTSessionCatalog extends BaseSessionCatalog implements Configurab
     PartitionSpec spec = meta.spec();
     if (spec != null && spec.isPartitioned()) {
       changes.add(new MetadataUpdate.AddPartitionSpec(spec));
-      changes.add(new MetadataUpdate.SetDefaultPartitionSpec(-1));
+    } else {
+      changes.add(new MetadataUpdate.AddPartitionSpec(PartitionSpec.unpartitioned()));
     }
+    changes.add(new MetadataUpdate.SetDefaultPartitionSpec(-1));
 
     SortOrder order = meta.sortOrder();
     if (order != null && order.isSorted()) {
       changes.add(new MetadataUpdate.AddSortOrder(order));
-      changes.add(new MetadataUpdate.SetDefaultSortOrder(-1));
+    } else {
+      changes.add(new MetadataUpdate.AddSortOrder(SortOrder.unsorted()));
     }
+    changes.add(new MetadataUpdate.SetDefaultSortOrder(-1));
 
     String location = meta.location();
     if (location != null) {
diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
index b5a458ecd..fbc871831 100644
--- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
+++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
@@ -1276,6 +1276,79 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
       Assert.assertEquals("Table location should match requested", "file:/tmp/ns/table", table.location());
     }
     assertFiles(table, FILE_A);
+    assertFilesPartitionSpec(table);
+    assertPreviousMetadataFileCount(table, 0);
+  }
+
+  @Test
+  public void testCompleteCreateTransactionMultipleSchemas() {
+    C catalog = catalog();
+
+    Map<String, String> properties = ImmutableMap.of("user", "someone", "created-at", "2022-02-25T00:38:19");
+    Transaction create = catalog.buildTable(TABLE, SCHEMA)
+        .withLocation("file:/tmp/ns/table")
+        .withPartitionSpec(SPEC)
+        .withSortOrder(WRITE_ORDER)
+        .withProperties(properties)
+        .createTransaction();
+
+    Assert.assertFalse("Table should not exist after createTransaction", catalog.tableExists(TABLE));
+
+    create.newFastAppend().appendFile(FILE_A).commit();
+
+    UpdateSchema updateSchema = create.updateSchema().addColumn("new_col", Types.LongType.get());
+    Schema newSchema = updateSchema.apply();
+    updateSchema.commit();
+
+    UpdatePartitionSpec updateSpec = create.updateSpec().addField("new_col");
+    PartitionSpec newSpec = updateSpec.apply();
+    updateSpec.commit();
+
+    ReplaceSortOrder replaceSortOrder = create.replaceSortOrder().asc("new_col");
+    SortOrder newSortOrder = replaceSortOrder.apply();
+    replaceSortOrder.commit();
+
+    DataFile anotherFile = DataFiles.builder(newSpec)
+        .withPath("/path/to/data-b.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("id_bucket=0/new_col=0") // easy way to set partition data for now
+        .withRecordCount(2) // needs at least one record or else metrics will filter it out
+        .build();
+
+    create.newFastAppend().appendFile(anotherFile).commit();
+
+    Assert.assertFalse("Table should not exist after append commit", catalog.tableExists(TABLE));
+
+    create.commitTransaction();
+
+    Assert.assertTrue("Table should exist after append commit", catalog.tableExists(TABLE));
+    Table table = catalog.loadTable(TABLE);
+
+    // initial IDs taken from TableMetadata constants
+    final int initialSchemaId = 0;
+    final int initialSpecId = 0;
+    final int initialOrderId = 1;
+    final int updateSchemaId = initialSchemaId + 1;
+    final int updateSpecId = initialSpecId + 1;
+    final int updateOrderId = initialOrderId + 1;
+
+    Assert.assertEquals("Table schema should match the new schema",
+        newSchema.asStruct(), table.schema().asStruct());
+    Assert.assertEquals("Table schema should match the new schema ID",
+        updateSchemaId, table.schema().schemaId());
+    Assert.assertEquals("Table should have updated partition spec", newSpec.fields(), table.spec().fields());
+    Assert.assertEquals("Table should have updated partition spec ID", updateSpecId, table.spec().specId());
+    Assert.assertEquals("Table should have updated sort order", newSortOrder.fields(), table.sortOrder().fields());
+    Assert.assertEquals("Table should have updated sort order ID", updateOrderId, table.sortOrder().orderId());
+    Assert.assertEquals("Table properties should be a superset of the requested properties",
+        properties.entrySet(),
+        Sets.intersection(properties.entrySet(), table.properties().entrySet()));
+    if (!overridesRequestedLocation()) {
+      Assert.assertEquals("Table location should match requested", "file:/tmp/ns/table", table.location());
+    }
+    assertFiles(table, FILE_A, anotherFile);
+    assertFilePartitionSpec(table, FILE_A, initialSpecId);
+    assertFilePartitionSpec(table, anotherFile, updateSpecId);
     assertPreviousMetadataFileCount(table, 0);
   }
 
@@ -1322,6 +1395,7 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
       Assert.assertEquals("Table location should match requested", "file:/tmp/ns/table", table.location());
     }
     assertFiles(table, FILE_A);
+    assertFilesPartitionSpec(table);
     assertPreviousMetadataFileCount(table, 0);
   }
 
@@ -1412,6 +1486,7 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
       Assert.assertEquals("Table location should match requested", "file:/tmp/ns/table", table.location());
     }
     assertFiles(table, FILE_A);
+    assertFilesPartitionSpec(table);
     assertPreviousMetadataFileCount(table, 0);
   }
 
@@ -2069,6 +2144,27 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {
     }
   }
 
+  public void assertFilePartitionSpec(Table table, DataFile dataFile, int specId) {
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      Streams.stream(tasks)
+          .map(FileScanTask::file)
+          .filter(file -> file.path().equals(dataFile.path()))
+          .forEach(file -> Assert.assertEquals(specId, file.specId()));
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  public void assertFilesPartitionSpec(Table table) {
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      Streams.stream(tasks)
+          .map(FileScanTask::file)
+          .forEach(file -> Assert.assertEquals(table.spec().specId(), file.specId()));
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
   private List<Namespace> concat(List<Namespace> starting, Namespace... additional) {
     List<Namespace> namespaces = Lists.newArrayList();
     namespaces.addAll(starting);
diff --git a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponse.java b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponse.java
index 34ae22f10..3fa1bdb0b 100644
--- a/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponse.java
+++ b/core/src/test/java/org/apache/iceberg/rest/responses/TestLoadTableResponse.java
@@ -78,6 +78,7 @@ public class TestLoadTableResponse extends RequestResponseTestBase<LoadTableResp
         TableMetadata
             .buildFrom(
                 TableMetadata.newTableMetadata(SCHEMA_7, SPEC_5, SORT_ORDER_3, TEST_TABLE_LOCATION, TABLE_PROPS))
+            .discardChanges()
             .withMetadataLocation(TEST_METADATA_LOCATION)
             .build();