You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "nastra (via GitHub)" <gi...@apache.org> on 2023/03/13 14:00:05 UTC

[GitHub] [iceberg] nastra opened a new pull request, #6948: API, Core: Multi-Table transactions API and support for REST

nastra opened a new pull request, #6948:
URL: https://github.com/apache/iceberg/pull/6948

   I have written up a design doc, which is available [here](https://docs.google.com/document/d/1UxXifU8iqP_byaW4E2RuKZx1nobxmAvc5urVcWas1B8/edit?usp=sharing)
   
   I think eventually we'd want to split this into 2 PRs, so that the introduced APIs can be reviewed independently from the REST-specific things.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1251278119


##########
core/src/main/java/org/apache/iceberg/BaseScan.java:
##########
@@ -99,7 +99,7 @@ protected Schema tableSchema() {
     return schema;
   }
 
-  protected TableScanContext context() {
+  public TableScanContext context() {

Review Comment:
   This should not be made public. I think we'll need to find a different way to report context.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1244026460


##########
core/src/main/java/org/apache/iceberg/catalog/CatalogTransaction.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+public interface CatalogTransaction {
+
+  enum IsolationLevel {
+
+    /**
+     * All reads that are being made will see the last committed values that existed when the table

Review Comment:
   I have doubt about this definition:
   
   > All reads that are being made will see the last committed values that existed when the table was loaded first inside the catalog transaction.
   
   This is different from the definition in other places like [here](https://learn.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server):
   
   > SNAPSHOT isolation specifies that data read within a transaction will never reflect changes made by other simultaneous transactions.
   
   The key difference is **when the table was loaded first inside the catalog transaction**, which means there will be a time difference between when 2 tables are loaded in the transaction.
   
   Consider the following case of 2 processes:
   
   process 1:
   
   ```
   t0: CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
       Catalog txCatalog = catalogTransaction.asCatalog();
   
   t1: Table table1 = txCatalog.load(TableIdentifier.of("db", "table1"))
   t2: Table table2 = txCatalog.load(TableIdentifier.of("db", "table2"))
   ```
   
   process 2:
   
   ```
   t1.5: table2.newAppend().addFiles(...).commit()
   ```
   
   This means the state of tables in the same transaction in process1 is different.
   
   When translated to a real-life SQL use case, it means:
   
   process 1:
   
   ```
   SELECT * FROM table1 JOIN table2 ON ...
   ```
   
   process 2:
   
   ```
   INSERT INTO table2 ...
   ```
   
   has a chance to cause a phantom read in process 1, and that clearly violates the isolation guarantee
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252368123


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {
+    catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void catalogTxWithSingleOpWithSerializable() {
+    catalogTxWithSingleOp(SERIALIZABLE);
+  }
+
+  private void catalogTxWithSingleOp(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tx-with-single-op");
+    catalog().createTable(identifier, SCHEMA, SPEC);
+
+    Table one = catalog().loadTable(identifier);
+    TableMetadata base = ((BaseTable) one).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    assertThat(base).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(base.currentSnapshot()).isNull();
+
+    catalogTransaction.commitTransaction();
+
+    TableMetadata updated = ((BaseTable) one).operations().refresh();
+    assertThat(base).isNotSameAs(updated);
+
+    Snapshot snapshot = updated.currentSnapshot();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+  }
+
+  @Test
+  public void txAgainstMultipleTables() {
+    txAgainstMultipleTables(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesWithSerializable() {
+    txAgainstMultipleTables(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTables(CatalogTransaction.IsolationLevel isolationLevel) {
+    List<String> tables = Arrays.asList("a", "b", "c");
+    for (String tbl : tables) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    txCatalog.loadTable(first).newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).isEmpty();
+    }
+
+    catalogTransaction.commitTransaction();
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).hasSizeGreaterThanOrEqualTo(1);
+    }
+
+    one = catalog().loadTable(first);
+    two = catalog().loadTable(second);
+    three = catalog().loadTable(third);
+    assertThat(one.currentSnapshot().allManifests(one.io())).hasSize(1);
+    assertThat(two.currentSnapshot().allManifests(two.io())).hasSize(1);
+    assertThat(three.currentSnapshot().allManifests(three.io())).hasSize(1);
+
+    assertThat(one.currentSnapshot().addedDataFiles(one.io())).hasSize(2);
+    assertThat(two.currentSnapshot().addedDataFiles(two.io())).hasSize(2);
+    assertThat(three.currentSnapshot().addedDataFiles(three.io())).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflict() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflictWithSerializable() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneSchemaConflict(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    txCatalog.loadTable(third).updateSchema().renameColumn("data", "new-column").commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // delete the colum we're trying to rename in the catalog TX
+    three.updateSchema().deleteColumn("data").commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");

Review Comment:
   You may need a helper to perform checks on the schema. I think what you'd need to do is to validate that the projected columns (by ID) are still present in the latest schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252363188


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {

Review Comment:
   Style: unnecessary abbreviation makes code harder to read. Generally prefer "transaction" over "tx" or "txn".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252373874


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {
+    catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void catalogTxWithSingleOpWithSerializable() {
+    catalogTxWithSingleOp(SERIALIZABLE);
+  }
+
+  private void catalogTxWithSingleOp(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tx-with-single-op");
+    catalog().createTable(identifier, SCHEMA, SPEC);
+
+    Table one = catalog().loadTable(identifier);
+    TableMetadata base = ((BaseTable) one).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    assertThat(base).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(base.currentSnapshot()).isNull();
+
+    catalogTransaction.commitTransaction();
+
+    TableMetadata updated = ((BaseTable) one).operations().refresh();
+    assertThat(base).isNotSameAs(updated);
+
+    Snapshot snapshot = updated.currentSnapshot();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+  }
+
+  @Test
+  public void txAgainstMultipleTables() {
+    txAgainstMultipleTables(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesWithSerializable() {
+    txAgainstMultipleTables(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTables(CatalogTransaction.IsolationLevel isolationLevel) {
+    List<String> tables = Arrays.asList("a", "b", "c");
+    for (String tbl : tables) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    txCatalog.loadTable(first).newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).isEmpty();
+    }
+
+    catalogTransaction.commitTransaction();
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).hasSizeGreaterThanOrEqualTo(1);
+    }
+
+    one = catalog().loadTable(first);
+    two = catalog().loadTable(second);
+    three = catalog().loadTable(third);
+    assertThat(one.currentSnapshot().allManifests(one.io())).hasSize(1);
+    assertThat(two.currentSnapshot().allManifests(two.io())).hasSize(1);
+    assertThat(three.currentSnapshot().allManifests(three.io())).hasSize(1);
+
+    assertThat(one.currentSnapshot().addedDataFiles(one.io())).hasSize(2);
+    assertThat(two.currentSnapshot().addedDataFiles(two.io())).hasSize(2);
+    assertThat(three.currentSnapshot().addedDataFiles(three.io())).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflict() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflictWithSerializable() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneSchemaConflict(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    txCatalog.loadTable(third).updateSchema().renameColumn("data", "new-column").commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // delete the colum we're trying to rename in the catalog TX
+    three.updateSchema().deleteColumn("data").commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");
+    } else {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(CommitFailedException.class)
+          .hasMessageContaining("Requirement failed: current schema changed: expected id 0 != 1");
+    }
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("new-column"))
+        .isNull();
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+    assertThat(((BaseTable) three).operations().refresh().schema().columns()).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneFails() {
+    txAgainstMultipleTablesLastOneFails(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneFailsWithSerializable() {
+    txAgainstMultipleTablesLastOneFails(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneFails(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // perform updates outside the catalog TX
+    three.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    Snapshot snapshot = ((BaseTable) three).operations().refresh().currentSnapshot();
+    assertThat(snapshot).isNotNull();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");
+    } else {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(CommitFailedException.class)
+          .hasMessageContaining("Requirement failed: branch main was created concurrently");
+    }
+
+    // the third update in the catalog TX fails, so we need to make sure that all changes from the
+    // catalog TX are rolled back
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+
+    assertThat(((BaseTable) one).operations().refresh().currentSnapshot()).isNull();
+    assertThat(((BaseTable) two).operations().refresh().currentSnapshot()).isNull();
+    assertThat(((BaseTable) three).operations().refresh().currentSnapshot()).isEqualTo(snapshot);
+  }
+
+  @Test
+  public void schemaUpdateVisibility() {
+    schemaUpdateVisibility(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void schemaUpdateVisibilityWithSerializable() {
+    schemaUpdateVisibility(SERIALIZABLE);
+  }
+
+  private void schemaUpdateVisibility(CatalogTransaction.IsolationLevel isolationLevel) {
+    Namespace namespace = Namespace.of("test");
+    TableIdentifier identifier = TableIdentifier.of(namespace, "table");
+
+    catalog().createNamespace(namespace);
+    catalog().createTable(identifier, SCHEMA);
+    assertThat(catalog().tableExists(identifier)).isTrue();
+
+    CatalogTransaction catalogTx = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTx.asCatalog();
+
+    String column = "new_col";
+
+    assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNull();
+    txCatalog
+        .loadTable(identifier)
+        .updateSchema()
+        .addColumn(column, Types.BooleanType.get())
+        .commit();
+
+    // changes inside the catalog TX should be visible
+    assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNotNull();
+
+    // changes outside the catalog TX should not be visible
+    assertThat(catalog().loadTable(identifier).schema().findField(column)).isNull();
+
+    catalogTx.commitTransaction();
+
+    assertThat(catalog().loadTable(identifier).schema().findField(column)).isNotNull();
+    assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNotNull();
+  }
+
+  @Test
+  public void readTableAfterLoadTableInsideTx() {
+    readTableAfterLoadTableInsideTx(SNAPSHOT);
+  }
+
+  @Test
+  public void readTableAfterLoadTableInsideTxWithSerializable() {
+    readTableAfterLoadTableInsideTx(SERIALIZABLE);
+  }
+
+  private void readTableAfterLoadTableInsideTx(CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    Table two = catalog().loadTable(second);
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    assertThat(Iterables.size(txCatalog.loadTable(first).newScan().planFiles())).isEqualTo(2);
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0);
+
+    two.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit();
+
+    // this should not be allowed with SERIALIZABLE after the table has been already read
+    // within the catalog TX, but is allowed with SNAPSHOT
+    // catalog TX should still the version of the table it initially read (with 0 files)
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0);
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessage(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.b' after it was read");
+
+      assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(0);
+      assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3);
+    } else {
+      catalogTransaction.commitTransaction();
+
+      assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(2);
+      assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3);
+    }
+  }
+
+  @Test
+  public void concurrentTx() {
+    concurrentTx(SNAPSHOT);
+  }
+
+  @Test
+  public void concurrentTxWithSerializable() {
+    concurrentTx(SERIALIZABLE);
+  }
+
+  private void concurrentTx(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tbl");
+    catalog().createTable(identifier, SCHEMA);
+    Table one = catalog().loadTable(identifier);
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    // perform updates outside catalog TX but before table has been read inside the catalog TX
+    one.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+
+    Snapshot snapshot = ((BaseTable) one).operations().refresh().currentSnapshot();
+    assertThat(snapshot).isNotNull();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)).isEqualTo("2");
+
+    // this should not fail with any isolation level
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    catalogTransaction.commitTransaction();
+
+    snapshot = ((BaseTable) one).operations().refresh().currentSnapshot();
+    assertThat(snapshot).isNotNull();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)).isEqualTo("4");
+  }
+
+  @Test
+  public void readOnlyTxWithSerializableShouldNotFail() {
+    for (String tbl : Arrays.asList("a", "b")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(SERIALIZABLE);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    assertThat(Iterables.size(txCatalog.loadTable(first).newScan().planFiles())).isEqualTo(0);
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0);
+
+    // changes happen outside the catalog TX
+    one.newFastAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    two.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit();
+
+    // catalog TX should still the version of the table it initially read (with 0 files)
+    assertThat(Iterables.size(txCatalog.loadTable(first).newScan().planFiles())).isEqualTo(0);
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0);
+
+    // this ends up being a read-only TX, thus no write skew can happen, and it shouldn't fail
+    catalogTransaction.commitTransaction();
+
+    assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(2);
+    assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3);
+  }
+
+  @Test
+  public void readOnlyTxWithSerializableOnBranchShouldNotFail() {
+    String branch = "branch";
+    for (String tbl : Arrays.asList("a", "b")) {
+      Table table = catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+      table.newFastAppend().appendFile(FILE_A).commit();
+      table.manageSnapshots().createBranch(branch, table.currentSnapshot().snapshotId()).commit();
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(SERIALIZABLE);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    assertThat(Iterables.size(txCatalog.loadTable(first).newScan().useRef(branch).planFiles()))
+        .isEqualTo(1);
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().useRef(branch).planFiles()))
+        .isEqualTo(1);
+
+    // changes happen outside the catalog TX
+    one.newFastAppend().appendFile(FILE_A).appendFile(FILE_D).toBranch(branch).commit();
+    two.newFastAppend()
+        .appendFile(FILE_B)
+        .appendFile(FILE_C)
+        .appendFile(FILE_D)
+        .toBranch(branch)
+        .commit();
+
+    // catalog TX should still the version of the table it initially read (with 0 files)
+    assertThat(Iterables.size(txCatalog.loadTable(first).newScan().useRef(branch).planFiles()))
+        .isEqualTo(1);
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().useRef(branch).planFiles()))
+        .isEqualTo(1);
+
+    // this ends up being a read-only TX, thus no write skew can happen, and it shouldn't fail
+    catalogTransaction.commitTransaction();
+
+    assertThat(Iterables.size(catalog().loadTable(first).newScan().useRef(branch).planFiles()))
+        .isEqualTo(3);
+    assertThat(Iterables.size(catalog().loadTable(second).newScan().useRef(branch).planFiles()))
+        .isEqualTo(4);
+  }
+
+  @Test
+  public void concurrentTxOnBranch() {
+    concurrentTxOnBranch(SNAPSHOT);
+  }
+
+  @Test
+  public void concurrentTxOnBranchWithSerializable() {
+    concurrentTxOnBranch(SERIALIZABLE);
+  }
+
+  private void concurrentTxOnBranch(CatalogTransaction.IsolationLevel isolationLevel) {
+    String branch = "branch";
+    TableIdentifier identifier = TableIdentifier.of("ns", "tbl");
+    Table one = catalog().createTable(identifier, SCHEMA);
+    one.newFastAppend().appendFile(FILE_A).commit();
+    one.manageSnapshots().createBranch(branch, one.currentSnapshot().snapshotId()).commit();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    // perform updates outside catalog TX but before table has been read inside the catalog TX
+    one.newAppend().appendFile(FILE_C).appendFile(FILE_D).toBranch(branch).commit();
+
+    TableMetadata metadata = ((BaseTable) one).operations().refresh();
+    Snapshot snapshotOnBranch = metadata.snapshot(metadata.ref(branch).snapshotId());
+    assertThat(snapshotOnBranch).isNotNull();
+    assertThat(snapshotOnBranch.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshotOnBranch.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+    assertThat(snapshotOnBranch.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP))
+        .isEqualTo("3");
+
+    // this should not fail with any isolation level
+    txCatalog
+        .loadTable(identifier)
+        .newAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .toBranch(branch)
+        .commit();
+
+    catalogTransaction.commitTransaction();
+
+    metadata = ((BaseTable) one).operations().refresh();
+    snapshotOnBranch = metadata.snapshot(metadata.ref(branch).snapshotId());
+    assertThat(snapshotOnBranch).isNotNull();
+    assertThat(snapshotOnBranch.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshotOnBranch.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+    assertThat(snapshotOnBranch.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP))
+        .isEqualTo("5");
+  }
+
+  @Test
+  public void readTableAfterLoadTableInsideTxOnBranch() {
+    readTableAfterLoadTableInsideTxOnBranch(SNAPSHOT);
+  }
+
+  @Test
+  public void readTableAfterLoadTableInsideTxOnBranchWithSerializable() {
+    readTableAfterLoadTableInsideTxOnBranch(SERIALIZABLE);
+  }
+
+  private void readTableAfterLoadTableInsideTxOnBranch(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    String branch = "branch";
+    for (String tbl : Arrays.asList("a", "b")) {
+      Table table = catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+      table.newFastAppend().appendFile(FILE_A).commit();
+      table.manageSnapshots().createBranch(branch, table.currentSnapshot().snapshotId()).commit();
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");

Review Comment:
   Some of these could be moved out of test cases and into constants.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252373364


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {
+    catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void catalogTxWithSingleOpWithSerializable() {
+    catalogTxWithSingleOp(SERIALIZABLE);
+  }
+
+  private void catalogTxWithSingleOp(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tx-with-single-op");
+    catalog().createTable(identifier, SCHEMA, SPEC);
+
+    Table one = catalog().loadTable(identifier);
+    TableMetadata base = ((BaseTable) one).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    assertThat(base).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(base.currentSnapshot()).isNull();
+
+    catalogTransaction.commitTransaction();
+
+    TableMetadata updated = ((BaseTable) one).operations().refresh();
+    assertThat(base).isNotSameAs(updated);
+
+    Snapshot snapshot = updated.currentSnapshot();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+  }
+
+  @Test
+  public void txAgainstMultipleTables() {
+    txAgainstMultipleTables(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesWithSerializable() {
+    txAgainstMultipleTables(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTables(CatalogTransaction.IsolationLevel isolationLevel) {
+    List<String> tables = Arrays.asList("a", "b", "c");
+    for (String tbl : tables) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    txCatalog.loadTable(first).newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).isEmpty();
+    }
+
+    catalogTransaction.commitTransaction();
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).hasSizeGreaterThanOrEqualTo(1);
+    }
+
+    one = catalog().loadTable(first);
+    two = catalog().loadTable(second);
+    three = catalog().loadTable(third);
+    assertThat(one.currentSnapshot().allManifests(one.io())).hasSize(1);
+    assertThat(two.currentSnapshot().allManifests(two.io())).hasSize(1);
+    assertThat(three.currentSnapshot().allManifests(three.io())).hasSize(1);
+
+    assertThat(one.currentSnapshot().addedDataFiles(one.io())).hasSize(2);
+    assertThat(two.currentSnapshot().addedDataFiles(two.io())).hasSize(2);
+    assertThat(three.currentSnapshot().addedDataFiles(three.io())).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflict() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflictWithSerializable() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneSchemaConflict(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    txCatalog.loadTable(third).updateSchema().renameColumn("data", "new-column").commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // delete the colum we're trying to rename in the catalog TX
+    three.updateSchema().deleteColumn("data").commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");
+    } else {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(CommitFailedException.class)
+          .hasMessageContaining("Requirement failed: current schema changed: expected id 0 != 1");
+    }
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("new-column"))
+        .isNull();
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+    assertThat(((BaseTable) three).operations().refresh().schema().columns()).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneFails() {
+    txAgainstMultipleTablesLastOneFails(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneFailsWithSerializable() {
+    txAgainstMultipleTablesLastOneFails(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneFails(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // perform updates outside the catalog TX
+    three.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    Snapshot snapshot = ((BaseTable) three).operations().refresh().currentSnapshot();
+    assertThat(snapshot).isNotNull();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");
+    } else {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(CommitFailedException.class)
+          .hasMessageContaining("Requirement failed: branch main was created concurrently");
+    }
+
+    // the third update in the catalog TX fails, so we need to make sure that all changes from the
+    // catalog TX are rolled back
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+
+    assertThat(((BaseTable) one).operations().refresh().currentSnapshot()).isNull();
+    assertThat(((BaseTable) two).operations().refresh().currentSnapshot()).isNull();
+    assertThat(((BaseTable) three).operations().refresh().currentSnapshot()).isEqualTo(snapshot);
+  }
+
+  @Test
+  public void schemaUpdateVisibility() {
+    schemaUpdateVisibility(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void schemaUpdateVisibilityWithSerializable() {
+    schemaUpdateVisibility(SERIALIZABLE);
+  }
+
+  private void schemaUpdateVisibility(CatalogTransaction.IsolationLevel isolationLevel) {
+    Namespace namespace = Namespace.of("test");
+    TableIdentifier identifier = TableIdentifier.of(namespace, "table");
+
+    catalog().createNamespace(namespace);
+    catalog().createTable(identifier, SCHEMA);
+    assertThat(catalog().tableExists(identifier)).isTrue();
+
+    CatalogTransaction catalogTx = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTx.asCatalog();
+
+    String column = "new_col";
+
+    assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNull();
+    txCatalog
+        .loadTable(identifier)
+        .updateSchema()
+        .addColumn(column, Types.BooleanType.get())
+        .commit();
+
+    // changes inside the catalog TX should be visible
+    assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNotNull();
+
+    // changes outside the catalog TX should not be visible
+    assertThat(catalog().loadTable(identifier).schema().findField(column)).isNull();
+
+    catalogTx.commitTransaction();
+
+    assertThat(catalog().loadTable(identifier).schema().findField(column)).isNotNull();
+    assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNotNull();
+  }
+
+  @Test
+  public void readTableAfterLoadTableInsideTx() {
+    readTableAfterLoadTableInsideTx(SNAPSHOT);
+  }
+
+  @Test
+  public void readTableAfterLoadTableInsideTxWithSerializable() {
+    readTableAfterLoadTableInsideTx(SERIALIZABLE);
+  }
+
+  private void readTableAfterLoadTableInsideTx(CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    Table two = catalog().loadTable(second);
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    assertThat(Iterables.size(txCatalog.loadTable(first).newScan().planFiles())).isEqualTo(2);
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0);
+
+    two.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit();
+
+    // this should not be allowed with SERIALIZABLE after the table has been already read
+    // within the catalog TX, but is allowed with SNAPSHOT
+    // catalog TX should still the version of the table it initially read (with 0 files)
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0);
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessage(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.b' after it was read");
+
+      assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(0);
+      assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3);
+    } else {
+      catalogTransaction.commitTransaction();
+
+      assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(2);
+      assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3);
+    }
+  }
+
+  @Test
+  public void concurrentTx() {
+    concurrentTx(SNAPSHOT);
+  }
+
+  @Test
+  public void concurrentTxWithSerializable() {
+    concurrentTx(SERIALIZABLE);
+  }
+
+  private void concurrentTx(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tbl");
+    catalog().createTable(identifier, SCHEMA);
+    Table one = catalog().loadTable(identifier);
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    // perform updates outside catalog TX but before table has been read inside the catalog TX
+    one.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+
+    Snapshot snapshot = ((BaseTable) one).operations().refresh().currentSnapshot();
+    assertThat(snapshot).isNotNull();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)).isEqualTo("2");
+
+    // this should not fail with any isolation level
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    catalogTransaction.commitTransaction();
+
+    snapshot = ((BaseTable) one).operations().refresh().currentSnapshot();
+    assertThat(snapshot).isNotNull();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)).isEqualTo("4");
+  }
+
+  @Test
+  public void readOnlyTxWithSerializableShouldNotFail() {
+    for (String tbl : Arrays.asList("a", "b")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(SERIALIZABLE);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    assertThat(Iterables.size(txCatalog.loadTable(first).newScan().planFiles())).isEqualTo(0);
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0);
+
+    // changes happen outside the catalog TX
+    one.newFastAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    two.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit();
+
+    // catalog TX should still the version of the table it initially read (with 0 files)
+    assertThat(Iterables.size(txCatalog.loadTable(first).newScan().planFiles())).isEqualTo(0);
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0);
+
+    // this ends up being a read-only TX, thus no write skew can happen, and it shouldn't fail
+    catalogTransaction.commitTransaction();
+
+    assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(2);
+    assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3);
+  }
+
+  @Test
+  public void readOnlyTxWithSerializableOnBranchShouldNotFail() {

Review Comment:
   We may want to use the same approach as some of the branch tests and have a `commit` method that can use a branch or can use main, making it easy to parameterize the whole suite to run on branches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1251319541


##########
core/src/main/java/org/apache/iceberg/catalog/CatalogTransaction.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+public interface CatalogTransaction {
+
+  enum IsolationLevel {
+
+    /**
+     * All reads that are being made will see the last committed values that existed when the table

Review Comment:
   @jackye1995, I think the behavior for the situation you're describing is an open question for the `snapshot` isolation level. First, though, I want to cover some background on how we think about `serializable` isolation to make sure we are thinking about "simultaneous" the same way.
   
   For `serializable`, the requirement is that there **_exists_** some ordering of transactions that produces the correct result. Say transaction 1 starts and reads from a table, then transaction 2 commits to that table, and finally transaction 1 attempts to commit. Iceberg will allow the commit as long as the result of the commit is not affected by the changes made by transaction 2. That is, Iceberg reorders the transactions if the reads would have produced the same result. This relies on checking conflicts in tables that have been updated. And the idea of when a transaction happens or starts is flexible in this model.
   
   Another complication is the idea of a point in time in a catalog. There's no guarantee that catalogs have an order of changes that applies across tables. The only requirement imposed by Iceberg is that there is a linear history for any single table and there isn't a general _happens-before_ relationship between commits across tables. There is, however, a limited relationship between tables involved in transactions.
   
   I think there's a good argument that we can't just use that as a reason not to do anything. After all, I can have situations where completely ignoring the point-in-time concern is clearly wrong:
   
   ```sql
   CREATE TABLE a (id bigint);
   CREATE TABLE b (id bigint);
   INSERT INTO a VALUES (1);
   
   -- Time T1
   
   BEGIN TRANSACTION;
     INSERT INTO b SELECT id FROM a;
     TRUNCATE TABLE a;
   COMMIT;
   
   -- Time T2
   ```
   
   If I were to read from `a` at T1 and read from `b` at T2, then I could see `row(id=1)` twice, even though there was never a "time" when it was in both tables because the transaction was atomic. I don't think I'd call this a "dirty read" because that means _uncommitted_ data was read, but it's still clearly a problem.
   
   I think there are 3 options:
   1. Coordinate table loading with the catalog
   2. Use validations that data read has not changed
   3. Define snapshot isolation differently, or change the name to something else
   
   Right now, the difference between 2 and 3 is basically adding checks so we don't have to decide right away. That's why it's like this today.
   
   ### 1. Coordinate table loading with the catalog
   
   To coordinate table loading, we'd add something like `startTransaction()` to get a transaction state. Then we'd pass that state back to the catalog in order to load at the start time. You can imagine the catalog passing back basically a transaction ID and loading tables using the state of that TID.
   
   I don't think this is a viable option. It's a lot of work to define the APIs and there are still catalogs that can't implement it. The result is that we'd end up with **inconsistent behavior across catalogs**, where some catalogs just load the latest copy of the table because that's all they can do.
   
   ### 2. Use validations that data read has not changed
   
   This option is similar to how `serializable` is handled. The transaction "start" time floats -- you can use data that was read as long as when the transaction commits, the outcome would be the same.
   
   The main drawback of this approach is that it isn't clear whether this is actually distinct from `serializable`, which basically does the same validation.
   
   ### 3. Define `snapshot` isolation differently or rename it
   
   This option may seem crazy, but there's a good argument for it. If there's little difference between `serializable` and `snapshot` isolation with option 2, then `serializable` should be used for cases like the SQL above. However, we clearly want to be able to relax the constraints for certain cases:
   * Simultaneous `DELETE FROM` and `INSERT INTO` -- it makes little sense to fail a delete because a matching row was just inserted, when a slightly different order (delete commits first) would have been perfectly fine
   * External coordination -- running a scheduled `MERGE INTO` that selects from a source table that's constantly updated should generally succeed, even if the source table is modified. Most of the time, consumption will be incremental and coordinated externally. The important thing is knowing what version of the source table was used, not failing if it is updated during the job.
   
   These two cases align with the existing behavior of `snapshot` isolation in single-table commits -- but it makes sense there because "snapshot" applies to a single table. If the table is simultaneously modified, the original version is used and new data is ignored. However, deletions are validated.
   
   ### Last comment
   
   I think that captures the background for the decision that we need to make on this. Right now, I think the best approach is to collect use cases and think about how they should be handled, rather than making a decision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1251280270


##########
core/src/main/java/org/apache/iceberg/catalog/SupportsCatalogTransactions.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel;
+
+public interface SupportsCatalogTransactions {
+
+  /**
+   * Create a new {@link CatalogTransaction} with the given {@link IsolationLevel}.
+   *
+   * @param isolationLevel The isolation level to use.

Review Comment:
   We usually omit `.` from situations like this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252363310


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {
+    catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT);

Review Comment:
   Did you intend to import SNAPSHOT as well as SERIALIZABLE?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252375162


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;

Review Comment:
   Many of these tests are good validations that the transaction is isolated. You might want to separate these into more specific suites, like tests of isolation for uncommitted transactions. This is going to get pretty large otherwise.
   
   What's missing are tests of real-world cases that will help us look at the behavior of transactions. For that, I think it would be helpful to refactor and introduce methods with high-level names, like `createTestTable(A_IDENT)` or `removeColumn(t.load(A_IDENT), "data")`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252364174


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {
+    catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void catalogTxWithSingleOpWithSerializable() {
+    catalogTxWithSingleOp(SERIALIZABLE);
+  }
+
+  private void catalogTxWithSingleOp(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tx-with-single-op");
+    catalog().createTable(identifier, SCHEMA, SPEC);
+
+    Table one = catalog().loadTable(identifier);
+    TableMetadata base = ((BaseTable) one).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    assertThat(base).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(base.currentSnapshot()).isNull();
+
+    catalogTransaction.commitTransaction();
+
+    TableMetadata updated = ((BaseTable) one).operations().refresh();
+    assertThat(base).isNotSameAs(updated);
+
+    Snapshot snapshot = updated.currentSnapshot();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+  }
+
+  @Test
+  public void txAgainstMultipleTables() {
+    txAgainstMultipleTables(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesWithSerializable() {
+    txAgainstMultipleTables(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTables(CatalogTransaction.IsolationLevel isolationLevel) {
+    List<String> tables = Arrays.asList("a", "b", "c");
+    for (String tbl : tables) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    txCatalog.loadTable(first).newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).isEmpty();

Review Comment:
   I don't think that this should cast to `BaseTable` so much. You could use the `Table` API for this assertion. The `Table` API is much more reliable than casting to an assumed implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252363310


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {
+    catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT);

Review Comment:
   Did you intend to import SNAPSHOT as well as SERIALIZABLE? Usually, we'd import `IsolationLevel` so there is a bit more context for both: `IsolationLevel.SERIALIZABLE`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252365023


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {
+    catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void catalogTxWithSingleOpWithSerializable() {
+    catalogTxWithSingleOp(SERIALIZABLE);
+  }
+
+  private void catalogTxWithSingleOp(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tx-with-single-op");
+    catalog().createTable(identifier, SCHEMA, SPEC);
+
+    Table one = catalog().loadTable(identifier);
+    TableMetadata base = ((BaseTable) one).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    assertThat(base).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(base.currentSnapshot()).isNull();
+
+    catalogTransaction.commitTransaction();
+
+    TableMetadata updated = ((BaseTable) one).operations().refresh();
+    assertThat(base).isNotSameAs(updated);
+
+    Snapshot snapshot = updated.currentSnapshot();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+  }
+
+  @Test
+  public void txAgainstMultipleTables() {
+    txAgainstMultipleTables(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesWithSerializable() {
+    txAgainstMultipleTables(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTables(CatalogTransaction.IsolationLevel isolationLevel) {
+    List<String> tables = Arrays.asList("a", "b", "c");
+    for (String tbl : tables) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    txCatalog.loadTable(first).newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).isEmpty();
+    }
+
+    catalogTransaction.commitTransaction();
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).hasSizeGreaterThanOrEqualTo(1);
+    }
+
+    one = catalog().loadTable(first);
+    two = catalog().loadTable(second);
+    three = catalog().loadTable(third);
+    assertThat(one.currentSnapshot().allManifests(one.io())).hasSize(1);
+    assertThat(two.currentSnapshot().allManifests(two.io())).hasSize(1);
+    assertThat(three.currentSnapshot().allManifests(three.io())).hasSize(1);
+
+    assertThat(one.currentSnapshot().addedDataFiles(one.io())).hasSize(2);
+    assertThat(two.currentSnapshot().addedDataFiles(two.io())).hasSize(2);
+    assertThat(three.currentSnapshot().addedDataFiles(three.io())).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflict() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflictWithSerializable() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneSchemaConflict(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();

Review Comment:
   I generally prefer for tests to be more targeted, with just enough changes to exercise the behavior you want to demonstrate. I get wanting to have a multi-table transaction here, but this doesn't require other modifications to `third` and it doesn't require 3 tables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252372997


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {
+    catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void catalogTxWithSingleOpWithSerializable() {
+    catalogTxWithSingleOp(SERIALIZABLE);
+  }
+
+  private void catalogTxWithSingleOp(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tx-with-single-op");
+    catalog().createTable(identifier, SCHEMA, SPEC);
+
+    Table one = catalog().loadTable(identifier);
+    TableMetadata base = ((BaseTable) one).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    assertThat(base).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(base.currentSnapshot()).isNull();
+
+    catalogTransaction.commitTransaction();
+
+    TableMetadata updated = ((BaseTable) one).operations().refresh();
+    assertThat(base).isNotSameAs(updated);
+
+    Snapshot snapshot = updated.currentSnapshot();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+  }
+
+  @Test
+  public void txAgainstMultipleTables() {
+    txAgainstMultipleTables(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesWithSerializable() {
+    txAgainstMultipleTables(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTables(CatalogTransaction.IsolationLevel isolationLevel) {
+    List<String> tables = Arrays.asList("a", "b", "c");
+    for (String tbl : tables) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    txCatalog.loadTable(first).newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).isEmpty();
+    }
+
+    catalogTransaction.commitTransaction();
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).hasSizeGreaterThanOrEqualTo(1);
+    }
+
+    one = catalog().loadTable(first);
+    two = catalog().loadTable(second);
+    three = catalog().loadTable(third);
+    assertThat(one.currentSnapshot().allManifests(one.io())).hasSize(1);
+    assertThat(two.currentSnapshot().allManifests(two.io())).hasSize(1);
+    assertThat(three.currentSnapshot().allManifests(three.io())).hasSize(1);
+
+    assertThat(one.currentSnapshot().addedDataFiles(one.io())).hasSize(2);
+    assertThat(two.currentSnapshot().addedDataFiles(two.io())).hasSize(2);
+    assertThat(three.currentSnapshot().addedDataFiles(three.io())).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflict() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflictWithSerializable() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneSchemaConflict(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    txCatalog.loadTable(third).updateSchema().renameColumn("data", "new-column").commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // delete the colum we're trying to rename in the catalog TX
+    three.updateSchema().deleteColumn("data").commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");
+    } else {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(CommitFailedException.class)
+          .hasMessageContaining("Requirement failed: current schema changed: expected id 0 != 1");
+    }
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("new-column"))
+        .isNull();
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+    assertThat(((BaseTable) three).operations().refresh().schema().columns()).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneFails() {
+    txAgainstMultipleTablesLastOneFails(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneFailsWithSerializable() {
+    txAgainstMultipleTablesLastOneFails(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneFails(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // perform updates outside the catalog TX
+    three.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    Snapshot snapshot = ((BaseTable) three).operations().refresh().currentSnapshot();
+    assertThat(snapshot).isNotNull();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");
+    } else {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(CommitFailedException.class)
+          .hasMessageContaining("Requirement failed: branch main was created concurrently");
+    }
+
+    // the third update in the catalog TX fails, so we need to make sure that all changes from the
+    // catalog TX are rolled back
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+
+    assertThat(((BaseTable) one).operations().refresh().currentSnapshot()).isNull();
+    assertThat(((BaseTable) two).operations().refresh().currentSnapshot()).isNull();
+    assertThat(((BaseTable) three).operations().refresh().currentSnapshot()).isEqualTo(snapshot);
+  }
+
+  @Test
+  public void schemaUpdateVisibility() {
+    schemaUpdateVisibility(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void schemaUpdateVisibilityWithSerializable() {
+    schemaUpdateVisibility(SERIALIZABLE);
+  }
+
+  private void schemaUpdateVisibility(CatalogTransaction.IsolationLevel isolationLevel) {
+    Namespace namespace = Namespace.of("test");
+    TableIdentifier identifier = TableIdentifier.of(namespace, "table");
+
+    catalog().createNamespace(namespace);
+    catalog().createTable(identifier, SCHEMA);
+    assertThat(catalog().tableExists(identifier)).isTrue();
+
+    CatalogTransaction catalogTx = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTx.asCatalog();
+
+    String column = "new_col";
+
+    assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNull();
+    txCatalog
+        .loadTable(identifier)
+        .updateSchema()
+        .addColumn(column, Types.BooleanType.get())
+        .commit();
+
+    // changes inside the catalog TX should be visible
+    assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNotNull();
+
+    // changes outside the catalog TX should not be visible
+    assertThat(catalog().loadTable(identifier).schema().findField(column)).isNull();
+
+    catalogTx.commitTransaction();
+
+    assertThat(catalog().loadTable(identifier).schema().findField(column)).isNotNull();
+    assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNotNull();
+  }
+
+  @Test
+  public void readTableAfterLoadTableInsideTx() {
+    readTableAfterLoadTableInsideTx(SNAPSHOT);
+  }
+
+  @Test
+  public void readTableAfterLoadTableInsideTxWithSerializable() {
+    readTableAfterLoadTableInsideTx(SERIALIZABLE);
+  }
+
+  private void readTableAfterLoadTableInsideTx(CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    Table two = catalog().loadTable(second);
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    assertThat(Iterables.size(txCatalog.loadTable(first).newScan().planFiles())).isEqualTo(2);
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0);
+
+    two.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit();
+
+    // this should not be allowed with SERIALIZABLE after the table has been already read
+    // within the catalog TX, but is allowed with SNAPSHOT
+    // catalog TX should still the version of the table it initially read (with 0 files)
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0);
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessage(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.b' after it was read");
+
+      assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(0);
+      assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3);
+    } else {
+      catalogTransaction.commitTransaction();
+
+      assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(2);
+      assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3);
+    }
+  }
+
+  @Test
+  public void concurrentTx() {
+    concurrentTx(SNAPSHOT);
+  }
+
+  @Test
+  public void concurrentTxWithSerializable() {
+    concurrentTx(SERIALIZABLE);
+  }
+
+  private void concurrentTx(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tbl");
+    catalog().createTable(identifier, SCHEMA);
+    Table one = catalog().loadTable(identifier);
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    // perform updates outside catalog TX but before table has been read inside the catalog TX
+    one.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+
+    Snapshot snapshot = ((BaseTable) one).operations().refresh().currentSnapshot();
+    assertThat(snapshot).isNotNull();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP)).isEqualTo("2");
+
+    // this should not fail with any isolation level
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();

Review Comment:
   It would be helpful to use names that tie tables and identifiers together, like `oneIdent` so that it is clear that this is loading table `one`.
   
   Also, this adds the same file to the table twice, which is probably not a good thing to do in tests because it doesn't match real-world behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252371121


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {
+    catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void catalogTxWithSingleOpWithSerializable() {
+    catalogTxWithSingleOp(SERIALIZABLE);
+  }
+
+  private void catalogTxWithSingleOp(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tx-with-single-op");
+    catalog().createTable(identifier, SCHEMA, SPEC);
+
+    Table one = catalog().loadTable(identifier);
+    TableMetadata base = ((BaseTable) one).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    assertThat(base).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(base.currentSnapshot()).isNull();
+
+    catalogTransaction.commitTransaction();
+
+    TableMetadata updated = ((BaseTable) one).operations().refresh();
+    assertThat(base).isNotSameAs(updated);
+
+    Snapshot snapshot = updated.currentSnapshot();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+  }
+
+  @Test
+  public void txAgainstMultipleTables() {
+    txAgainstMultipleTables(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesWithSerializable() {
+    txAgainstMultipleTables(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTables(CatalogTransaction.IsolationLevel isolationLevel) {
+    List<String> tables = Arrays.asList("a", "b", "c");
+    for (String tbl : tables) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    txCatalog.loadTable(first).newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).isEmpty();
+    }
+
+    catalogTransaction.commitTransaction();
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).hasSizeGreaterThanOrEqualTo(1);
+    }
+
+    one = catalog().loadTable(first);
+    two = catalog().loadTable(second);
+    three = catalog().loadTable(third);
+    assertThat(one.currentSnapshot().allManifests(one.io())).hasSize(1);
+    assertThat(two.currentSnapshot().allManifests(two.io())).hasSize(1);
+    assertThat(three.currentSnapshot().allManifests(three.io())).hasSize(1);
+
+    assertThat(one.currentSnapshot().addedDataFiles(one.io())).hasSize(2);
+    assertThat(two.currentSnapshot().addedDataFiles(two.io())).hasSize(2);
+    assertThat(three.currentSnapshot().addedDataFiles(three.io())).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflict() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflictWithSerializable() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneSchemaConflict(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    txCatalog.loadTable(third).updateSchema().renameColumn("data", "new-column").commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // delete the colum we're trying to rename in the catalog TX
+    three.updateSchema().deleteColumn("data").commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");
+    } else {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(CommitFailedException.class)
+          .hasMessageContaining("Requirement failed: current schema changed: expected id 0 != 1");
+    }
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("new-column"))
+        .isNull();
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+    assertThat(((BaseTable) three).operations().refresh().schema().columns()).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneFails() {
+    txAgainstMultipleTablesLastOneFails(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneFailsWithSerializable() {
+    txAgainstMultipleTablesLastOneFails(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneFails(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // perform updates outside the catalog TX
+    three.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    Snapshot snapshot = ((BaseTable) three).operations().refresh().currentSnapshot();
+    assertThat(snapshot).isNotNull();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");
+    } else {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(CommitFailedException.class)
+          .hasMessageContaining("Requirement failed: branch main was created concurrently");
+    }
+
+    // the third update in the catalog TX fails, so we need to make sure that all changes from the
+    // catalog TX are rolled back
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+
+    assertThat(((BaseTable) one).operations().refresh().currentSnapshot()).isNull();
+    assertThat(((BaseTable) two).operations().refresh().currentSnapshot()).isNull();
+    assertThat(((BaseTable) three).operations().refresh().currentSnapshot()).isEqualTo(snapshot);
+  }
+
+  @Test
+  public void schemaUpdateVisibility() {
+    schemaUpdateVisibility(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void schemaUpdateVisibilityWithSerializable() {
+    schemaUpdateVisibility(SERIALIZABLE);
+  }
+
+  private void schemaUpdateVisibility(CatalogTransaction.IsolationLevel isolationLevel) {
+    Namespace namespace = Namespace.of("test");
+    TableIdentifier identifier = TableIdentifier.of(namespace, "table");
+
+    catalog().createNamespace(namespace);
+    catalog().createTable(identifier, SCHEMA);
+    assertThat(catalog().tableExists(identifier)).isTrue();
+
+    CatalogTransaction catalogTx = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTx.asCatalog();
+
+    String column = "new_col";
+
+    assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNull();
+    txCatalog
+        .loadTable(identifier)
+        .updateSchema()
+        .addColumn(column, Types.BooleanType.get())
+        .commit();
+
+    // changes inside the catalog TX should be visible
+    assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNotNull();
+
+    // changes outside the catalog TX should not be visible
+    assertThat(catalog().loadTable(identifier).schema().findField(column)).isNull();
+
+    catalogTx.commitTransaction();
+
+    assertThat(catalog().loadTable(identifier).schema().findField(column)).isNotNull();
+    assertThat(txCatalog.loadTable(identifier).schema().findField(column)).isNotNull();
+  }
+
+  @Test
+  public void readTableAfterLoadTableInsideTx() {
+    readTableAfterLoadTableInsideTx(SNAPSHOT);
+  }
+
+  @Test
+  public void readTableAfterLoadTableInsideTxWithSerializable() {
+    readTableAfterLoadTableInsideTx(SERIALIZABLE);
+  }
+
+  private void readTableAfterLoadTableInsideTx(CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    Table two = catalog().loadTable(second);
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    assertThat(Iterables.size(txCatalog.loadTable(first).newScan().planFiles())).isEqualTo(2);
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0);
+
+    two.newFastAppend().appendFile(FILE_B).appendFile(FILE_C).appendFile(FILE_D).commit();
+
+    // this should not be allowed with SERIALIZABLE after the table has been already read
+    // within the catalog TX, but is allowed with SNAPSHOT
+    // catalog TX should still the version of the table it initially read (with 0 files)
+    assertThat(Iterables.size(txCatalog.loadTable(second).newScan().planFiles())).isEqualTo(0);
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessage(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.b' after it was read");
+
+      assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(0);
+      assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3);
+    } else {
+      catalogTransaction.commitTransaction();
+
+      assertThat(Iterables.size(catalog().loadTable(first).newScan().planFiles())).isEqualTo(2);
+      assertThat(Iterables.size(catalog().loadTable(second).newScan().planFiles())).isEqualTo(3);
+    }
+  }
+
+  @Test
+  public void concurrentTx() {
+    concurrentTx(SNAPSHOT);
+  }
+
+  @Test
+  public void concurrentTxWithSerializable() {
+    concurrentTx(SERIALIZABLE);
+  }
+
+  private void concurrentTx(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tbl");
+    catalog().createTable(identifier, SCHEMA);
+    Table one = catalog().loadTable(identifier);
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    // perform updates outside catalog TX but before table has been read inside the catalog TX
+    one.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+
+    Snapshot snapshot = ((BaseTable) one).operations().refresh().currentSnapshot();

Review Comment:
   This can use the `Table` API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252368649


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {
+    catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void catalogTxWithSingleOpWithSerializable() {
+    catalogTxWithSingleOp(SERIALIZABLE);
+  }
+
+  private void catalogTxWithSingleOp(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tx-with-single-op");
+    catalog().createTable(identifier, SCHEMA, SPEC);
+
+    Table one = catalog().loadTable(identifier);
+    TableMetadata base = ((BaseTable) one).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    assertThat(base).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(base.currentSnapshot()).isNull();
+
+    catalogTransaction.commitTransaction();
+
+    TableMetadata updated = ((BaseTable) one).operations().refresh();
+    assertThat(base).isNotSameAs(updated);
+
+    Snapshot snapshot = updated.currentSnapshot();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+  }
+
+  @Test
+  public void txAgainstMultipleTables() {
+    txAgainstMultipleTables(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesWithSerializable() {
+    txAgainstMultipleTables(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTables(CatalogTransaction.IsolationLevel isolationLevel) {
+    List<String> tables = Arrays.asList("a", "b", "c");
+    for (String tbl : tables) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    txCatalog.loadTable(first).newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).isEmpty();
+    }
+
+    catalogTransaction.commitTransaction();
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).hasSizeGreaterThanOrEqualTo(1);
+    }
+
+    one = catalog().loadTable(first);
+    two = catalog().loadTable(second);
+    three = catalog().loadTable(third);
+    assertThat(one.currentSnapshot().allManifests(one.io())).hasSize(1);
+    assertThat(two.currentSnapshot().allManifests(two.io())).hasSize(1);
+    assertThat(three.currentSnapshot().allManifests(three.io())).hasSize(1);
+
+    assertThat(one.currentSnapshot().addedDataFiles(one.io())).hasSize(2);
+    assertThat(two.currentSnapshot().addedDataFiles(two.io())).hasSize(2);
+    assertThat(three.currentSnapshot().addedDataFiles(three.io())).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflict() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflictWithSerializable() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneSchemaConflict(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    txCatalog.loadTable(third).updateSchema().renameColumn("data", "new-column").commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // delete the colum we're trying to rename in the catalog TX
+    three.updateSchema().deleteColumn("data").commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");
+    } else {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(CommitFailedException.class)
+          .hasMessageContaining("Requirement failed: current schema changed: expected id 0 != 1");
+    }
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("new-column"))
+        .isNull();
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+    assertThat(((BaseTable) three).operations().refresh().schema().columns()).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneFails() {
+    txAgainstMultipleTablesLastOneFails(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneFailsWithSerializable() {
+    txAgainstMultipleTablesLastOneFails(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneFails(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // perform updates outside the catalog TX
+    three.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    Snapshot snapshot = ((BaseTable) three).operations().refresh().currentSnapshot();
+    assertThat(snapshot).isNotNull();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");

Review Comment:
   Are these tests just checking that the table was modified at all?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1251319541


##########
core/src/main/java/org/apache/iceberg/catalog/CatalogTransaction.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+public interface CatalogTransaction {
+
+  enum IsolationLevel {
+
+    /**
+     * All reads that are being made will see the last committed values that existed when the table

Review Comment:
   @jackye1995, I think the behavior for the situation you're describing is an open question for the `snapshot` isolation level. First, though, I want to cover some background on how we think about `serializable` isolation to make sure we are thinking about "simultaneous" the same way.
   
   For `serializable`, the requirement is that there **_exists_** some ordering of transactions that produces the correct result. Say transaction 1 starts and reads from a table, then transaction 2 commits to that table, and finally transaction 1 attempts to commit. Iceberg will allow the commit as long as the result of the commit is not affected by the changes made by transaction 2. That is, Iceberg reorders the transactions if the reads would have produced the same result. This relies on checking conflicts in tables that have been updated. And the idea of when a transaction happens or starts is flexible in this model.
   
   Another complication is the idea of a point in time in a catalog. There's no guarantee that catalogs have an order of changes that applies across tables. The only requirement imposed by Iceberg is that there is a linear history for any single table and there isn't a general _happens-before_ relationship between commits across tables. There is, however, a limited relationship between tables involved in transactions.
   
   I think there's a good argument that we can't just use that as a reason not to do anything. After all, I can have situations where completely ignoring the point-in-time concern is clearly wrong:
   
   ```sql
   CREATE TABLE a (id bigint);
   CREATE TABLE b (id bigint);
   INSERT INTO a VALUES (1);
   
   -- Time T1
   
   BEGIN TRANSACTION;
     INSERT INTO b SELECT id FROM a;
     TRUNCATE TABLE a;
   COMMIT;
   
   -- Time T2
   ```
   
   If I were to read from `a` at T1 and read from `b` at T2, then I could see `row(id=1)` twice, even though there was never a "time" when it was in both tables because the transaction was atomic. I don't think I'd call this a "dirty read" because that means _uncommitted_ data was read, but it's still clearly a problem.
   
   I think there are 3 options:
   1. Coordinate table loading with the catalog
   2. Use validations that data read has not changed
   3. Define snapshot isolation differently, or change the name to something else
   
   Right now, the difference between 2 and 3 is basically adding checks so we don't have to decide right away. That's why it's like this today.
   
   ### 1. Coordinate table loading with the catalog
   
   To coordinate table loading, we'd add something like `startTransaction()` to get a transaction state. Then we'd pass that state back to the catalog in order to load at the start time. You can imagine the catalog passing back basically a transaction ID and loading tables using the state of that TID.
   
   I don't think this is a viable option. It's a lot of work to define the APIs and there are still catalogs that can't implement it. The result is that we'd end up with **inconsistent behavior across catalogs**, where some catalogs just load the latest copy of the table because that's all they can do.
   
   ### 2. Use validations that data read has not changed
   
   This option is similar to how `serializable` is handled. The transaction "start" time floats -- you can use data that was read as long as when the transaction commits, the outcome would be the same.
   
   The main drawback of this approach is that it isn't clear whether this is actually distinct from `serializable`, which basically does the same validation.
   
   ### 3. Define `snapshot` isolation differently or rename it
   
   This option may seem crazy, but there's a good argument for it. If there's little difference between `serializable` and `snapshot` isolation with option 2, then `serializable` should be used for cases like the SQL above. However, we clearly want to be able to relax the constraints for certain cases:
   * Simultaneous `DELETE FROM` and `INSERT INTO` -- it makes little sense to fail a delete because a matching row was just inserted, when a slightly different order (delete commits first) would have been perfectly fine
   * External coordination -- running scheduled a `MERGE INTO` that selects from a source table that's constantly updated should generally succeed, even if the source table is modified. Most of the time, consumption will be incremental and coordinated externally. The important thing is knowing what version of the source table was used, not failing if it is updated during the job.
   
   These two cases align with the existing behavior of `snapshot` isolation in single-table commits -- but it makes sense there because "snapshot" applies to a single table. If the table is simultaneously modified, the original version is used and new data is ignored. However, deletions are validated.
   
   ### Last comment
   
   I think that captures the background for the decision that we need to make on this. Right now, I think the best approach is to collect use cases and think about how they should be handled, rather than making a decision.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] nastra closed pull request #6948: API, Core: Multi-Table transactions API and support for REST

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra closed pull request #6948: API, Core: Multi-Table transactions API and support for REST
URL: https://github.com/apache/iceberg/pull/6948


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252367063


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {
+    catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void catalogTxWithSingleOpWithSerializable() {
+    catalogTxWithSingleOp(SERIALIZABLE);
+  }
+
+  private void catalogTxWithSingleOp(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tx-with-single-op");
+    catalog().createTable(identifier, SCHEMA, SPEC);
+
+    Table one = catalog().loadTable(identifier);
+    TableMetadata base = ((BaseTable) one).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    assertThat(base).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(base.currentSnapshot()).isNull();
+
+    catalogTransaction.commitTransaction();
+
+    TableMetadata updated = ((BaseTable) one).operations().refresh();
+    assertThat(base).isNotSameAs(updated);
+
+    Snapshot snapshot = updated.currentSnapshot();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+  }
+
+  @Test
+  public void txAgainstMultipleTables() {
+    txAgainstMultipleTables(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesWithSerializable() {
+    txAgainstMultipleTables(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTables(CatalogTransaction.IsolationLevel isolationLevel) {
+    List<String> tables = Arrays.asList("a", "b", "c");
+    for (String tbl : tables) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    txCatalog.loadTable(first).newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).isEmpty();
+    }
+
+    catalogTransaction.commitTransaction();
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).hasSizeGreaterThanOrEqualTo(1);
+    }
+
+    one = catalog().loadTable(first);
+    two = catalog().loadTable(second);
+    three = catalog().loadTable(third);
+    assertThat(one.currentSnapshot().allManifests(one.io())).hasSize(1);
+    assertThat(two.currentSnapshot().allManifests(two.io())).hasSize(1);
+    assertThat(three.currentSnapshot().allManifests(three.io())).hasSize(1);
+
+    assertThat(one.currentSnapshot().addedDataFiles(one.io())).hasSize(2);
+    assertThat(two.currentSnapshot().addedDataFiles(two.io())).hasSize(2);
+    assertThat(three.currentSnapshot().addedDataFiles(three.io())).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflict() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflictWithSerializable() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneSchemaConflict(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    txCatalog.loadTable(third).updateSchema().renameColumn("data", "new-column").commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // delete the colum we're trying to rename in the catalog TX
+    three.updateSchema().deleteColumn("data").commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");
+    } else {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(CommitFailedException.class)
+          .hasMessageContaining("Requirement failed: current schema changed: expected id 0 != 1");
+    }
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("new-column"))
+        .isNull();
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+    assertThat(((BaseTable) three).operations().refresh().schema().columns()).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneFails() {
+    txAgainstMultipleTablesLastOneFails(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneFailsWithSerializable() {
+    txAgainstMultipleTablesLastOneFails(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneFails(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // perform updates outside the catalog TX
+    three.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
+    Snapshot snapshot = ((BaseTable) three).operations().refresh().currentSnapshot();
+    assertThat(snapshot).isNotNull();

Review Comment:
   I think you can assume that normal operations that do not go through `CatalogTransaction` behave the way you expect. That will allow you to avoid longer tests by omitting those assertions. It shouldn't be necessary to assert that the latest snapshot is non-null, nor should it be necessary to check that the `data` column was deleted in the example above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252366342


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_B =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-b.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=1") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_C =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-c.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=2") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+  protected static final DataFile FILE_D =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-d.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=3") // easy way to set partition data for now
+          .withRecordCount(1)
+          .build();
+
+  protected abstract C catalog();
+
+  @Test
+  public void testNulls() {
+    assertThatThrownBy(() -> new BaseCatalogTransaction(null, null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid origin catalog: null");
+
+    assertThatThrownBy(() -> new BaseCatalogTransaction(catalog(), null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTransactionSupport() {
+    assertThatThrownBy(
+            () -> new BaseCatalogTransaction(new TestCatalogUtil.TestCatalog(), SERIALIZABLE))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Origin catalog does not support catalog transactions");
+  }
+
+  @Test
+  public void multipleCommits() {
+    CatalogTransaction catalogTx = catalog().createTransaction(SERIALIZABLE);
+    catalogTx.commitTransaction();
+    assertThatThrownBy(catalogTx::commitTransaction)
+        .isInstanceOf(IllegalStateException.class)
+        .hasMessage("Transaction has already committed changes");
+  }
+
+  @Test
+  public void invalidIsolationLevel() {
+    assertThatThrownBy(() -> catalog().createTransaction(null))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid isolation level: null");
+  }
+
+  @Test
+  public void catalogTxWithSingleOp() {
+    catalogTxWithSingleOp(CatalogTransaction.IsolationLevel.SNAPSHOT);
+  }
+
+  @Test
+  public void catalogTxWithSingleOpWithSerializable() {
+    catalogTxWithSingleOp(SERIALIZABLE);
+  }
+
+  private void catalogTxWithSingleOp(CatalogTransaction.IsolationLevel isolationLevel) {
+    TableIdentifier identifier = TableIdentifier.of("ns", "tx-with-single-op");
+    catalog().createTable(identifier, SCHEMA, SPEC);
+
+    Table one = catalog().loadTable(identifier);
+    TableMetadata base = ((BaseTable) one).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(identifier).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+
+    assertThat(base).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(base.currentSnapshot()).isNull();
+
+    catalogTransaction.commitTransaction();
+
+    TableMetadata updated = ((BaseTable) one).operations().refresh();
+    assertThat(base).isNotSameAs(updated);
+
+    Snapshot snapshot = updated.currentSnapshot();
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_FILES_PROP)).isEqualTo("2");
+    assertThat(snapshot.summary().get(SnapshotSummary.ADDED_RECORDS_PROP)).isEqualTo("2");
+  }
+
+  @Test
+  public void txAgainstMultipleTables() {
+    txAgainstMultipleTables(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesWithSerializable() {
+    txAgainstMultipleTables(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTables(CatalogTransaction.IsolationLevel isolationLevel) {
+    List<String> tables = Arrays.asList("a", "b", "c");
+    for (String tbl : tables) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+
+    txCatalog.loadTable(first).newFastAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).isEmpty();
+    }
+
+    catalogTransaction.commitTransaction();
+
+    for (String tbl : tables) {
+      TableMetadata current =
+          ((BaseTable) catalog().loadTable(TableIdentifier.of("ns", tbl))).operations().refresh();
+      assertThat(current.snapshots()).hasSizeGreaterThanOrEqualTo(1);
+    }
+
+    one = catalog().loadTable(first);
+    two = catalog().loadTable(second);
+    three = catalog().loadTable(third);
+    assertThat(one.currentSnapshot().allManifests(one.io())).hasSize(1);
+    assertThat(two.currentSnapshot().allManifests(two.io())).hasSize(1);
+    assertThat(three.currentSnapshot().allManifests(three.io())).hasSize(1);
+
+    assertThat(one.currentSnapshot().addedDataFiles(one.io())).hasSize(2);
+    assertThat(two.currentSnapshot().addedDataFiles(two.io())).hasSize(2);
+    assertThat(three.currentSnapshot().addedDataFiles(three.io())).hasSize(1);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflict() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SNAPSHOT);
+  }
+
+  @Test
+  public void txAgainstMultipleTablesLastOneSchemaConflictWithSerializable() {
+    txAgainstMultipleTablesLastOneSchemaConflict(SERIALIZABLE);
+  }
+
+  private void txAgainstMultipleTablesLastOneSchemaConflict(
+      CatalogTransaction.IsolationLevel isolationLevel) {
+    for (String tbl : Arrays.asList("a", "b", "c")) {
+      catalog().createTable(TableIdentifier.of("ns", tbl), SCHEMA);
+    }
+
+    TableIdentifier first = TableIdentifier.of("ns", "a");
+    TableIdentifier second = TableIdentifier.of("ns", "b");
+    TableIdentifier third = TableIdentifier.of("ns", "c");
+    Table one = catalog().loadTable(first);
+    Table two = catalog().loadTable(second);
+    Table three = catalog().loadTable(third);
+
+    TableMetadata baseMetadataOne = ((BaseTable) one).operations().current();
+    TableMetadata baseMetadataTwo = ((BaseTable) two).operations().current();
+    TableMetadata baseMetadataThree = ((BaseTable) three).operations().current();
+
+    CatalogTransaction catalogTransaction = catalog().createTransaction(isolationLevel);
+    Catalog txCatalog = catalogTransaction.asCatalog();
+    txCatalog.loadTable(first).newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+
+    txCatalog.loadTable(second).newFastAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    txCatalog.loadTable(second).newDelete().deleteFile(FILE_C).commit();
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+
+    txCatalog.loadTable(third).newDelete().deleteFile(FILE_A).commit();
+    txCatalog.loadTable(third).newAppend().appendFile(FILE_D).commit();
+
+    txCatalog.loadTable(third).updateSchema().renameColumn("data", "new-column").commit();
+
+    assertThat(baseMetadataThree).isSameAs(((BaseTable) three).operations().refresh());
+
+    // delete the colum we're trying to rename in the catalog TX
+    three.updateSchema().deleteColumn("data").commit();
+
+    assertThat(baseMetadataOne).isSameAs(((BaseTable) one).operations().refresh());
+    assertThat(baseMetadataTwo).isSameAs(((BaseTable) two).operations().refresh());
+    assertThat(baseMetadataThree).isNotSameAs(((BaseTable) three).operations().refresh());
+    assertThat(((BaseTable) three).operations().refresh().schema().findField("data")).isNull();
+
+    if (SERIALIZABLE == isolationLevel) {
+      Assertions.assertThatThrownBy(catalogTransaction::commitTransaction)
+          .isInstanceOf(ValidationException.class)
+          .hasMessageContaining(
+              "SERIALIZABLE isolation violation: Found table metadata updates to table 'ns.c' after it was read");

Review Comment:
   Why is this different than the `CommitFailedException`?
   
   There are two possible failures:
   1. The schema update conflicts, so the REST catalog sends back `CommitFailedException`
   2. A scan projected the dropped column _before_ the concurrent schema update, so a `CommitFailedException` is sent by the REST catalog and the commit fails on retry because the underlying data has changed
   
   I think the exception here is too broad and should be one of those cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6948: Core: Add Catalog Transactions API

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1252362711


##########
core/src/test/java/org/apache/iceberg/catalog/CatalogTransactionTests.java:
##########
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SERIALIZABLE;
+import static org.apache.iceberg.catalog.CatalogTransaction.IsolationLevel.SNAPSHOT;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TestCatalogUtil;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class CatalogTransactionTests<
+    C extends SupportsCatalogTransactions & SupportsNamespaces & Catalog> {
+
+  @TempDir protected Path metadataDir;
+
+  protected static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get()), required(4, "data", Types.StringType.get()));
+
+  // Partition spec used to create tables
+  protected static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
+
+  protected static final DataFile FILE_A =
+      DataFiles.builder(SPEC)
+          .withPath("/path/to/data-a.parquet")
+          .withFileSizeInBytes(10)
+          .withPartitionPath("data_bucket=0") // easy way to set partition data for now

Review Comment:
   I don't think the comment is still true, although there's no need to change the code. You should now be able to do something like this:
   
   ```java
     .withPartition(TestHelpers.Row.of(0))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #6948: Core: Add Catalog Transactions API

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#issuecomment-1619156336

   Discussed offline with Ryan, Daniel and Eduard. I think we should be good here. 
   
   The requirement to have all table snapshots consistent at the starting time of transaction is not a hard requirement in the definition of snapshot or serializable isolation.
   
   For the non-repeatable or phantom read issue raised, the issue has to happen in a repeated read of the same table in the same transaction. So in the example I gave, table2 is only read once, so whatever state is at the catalog load time could be considered the valid status and not a non-repeatable or phantom read.
   
   There are 2 cases which could cause phantom read:
   
   1. a self join query
   2. multiple reads to the same table in a multi-statement transaction
   
   In the first case, although not strictly enforced, but most query engines cache the metadata when fetching table from catalog, so it is not possible to read new data in the second scan of the table. This is something that ideally engines should enforce, but definitely not something that catalog should enforce.
   
   In the second case, a multi-statement transaction must implement the proposed transaction API, so we should be good there.
   
   I will continue to think about any other edge cases, but at least we are not blocked by this.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


Re: [PR] Core: Add Catalog Transactions API [iceberg]

Posted by "nastra (via GitHub)" <gi...@apache.org>.
nastra commented on code in PR #6948:
URL: https://github.com/apache/iceberg/pull/6948#discussion_r1425557919


##########
core/src/main/java/org/apache/iceberg/catalog/CatalogTransaction.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.catalog;
+
+public interface CatalogTransaction {
+
+  enum IsolationLevel {
+
+    /**
+     * All reads that are being made will see the last committed values that existed when the table

Review Comment:
   Sorry for my late reply on this topic. 
   
   The current implementation does perform **option 2**, where it detects that read data hasn't been changed. This is being done for `snapshot` as well as `serializable` isolation.
   
   The only difference between `snapshot` and `serializable` isolation in the current implementation is that `serializable` also detects and avoids `write skew`.
   
   @jackye1995 did you have a chance to look at the implementation and the tests or is the definition of `snapshot` isolation confusing?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org