You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/12/19 23:58:40 UTC

[iceberg] branch master updated: Core: Fix file cleaning in transactions with unknown commit state (#3733)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6453245  Core: Fix file cleaning in transactions with unknown commit state (#3733)
6453245 is described below

commit 6453245af821f6dbdb3c46a18d89e4677784c7d3
Author: Tim Jiang <13...@users.noreply.github.com>
AuthorDate: Sun Dec 19 15:58:28 2021 -0800

    Core: Fix file cleaning in transactions with unknown commit state (#3733)
---
 .../java/org/apache/iceberg/BaseTransaction.java   | 10 +++
 .../java/org/apache/iceberg/TableTestBase.java     |  5 ++
 .../org/apache/iceberg/TestReplaceTransaction.java | 81 ++++++++++++++++++++++
 .../test/java/org/apache/iceberg/TestTables.java   | 28 ++++++--
 .../java/org/apache/iceberg/TestTransaction.java   | 23 ++++++
 5 files changed, 143 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 7e779bc..50550ab 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.LocationProvider;
@@ -245,6 +246,9 @@ class BaseTransaction implements Transaction {
     try {
       ops.commit(null, current);
 
+    } catch (CommitStateUnknownException e) {
+      throw e;
+
     } catch (RuntimeException e) {
       // the commit failed and no files were committed. clean up each update.
       Tasks.foreach(updates)
@@ -298,6 +302,9 @@ class BaseTransaction implements Transaction {
             underlyingOps.commit(base, current);
           });
 
+    } catch (CommitStateUnknownException e) {
+      throw e;
+
     } catch (RuntimeException e) {
       // the commit failed and no files were committed. clean up each update.
       Tasks.foreach(updates)
@@ -356,6 +363,9 @@ class BaseTransaction implements Transaction {
             underlyingOps.commit(base, current);
           });
 
+    } catch (CommitStateUnknownException e) {
+      throw e;
+
     } catch (RuntimeException e) {
       // the commit failed and no files were committed. clean up each update.
       Tasks.foreach(updates)
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 1b8a3ca..8fce621 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -193,6 +194,10 @@ public class TableTestBase {
         !name.startsWith("snap") && Files.getFileExtension(name).equalsIgnoreCase("avro")));
   }
 
+  public static long countAllMetadataFiles(File tableDir) {
+    return Arrays.stream(new File(tableDir, "metadata").listFiles()).filter(f -> f.isFile()).count();
+  }
+
   protected TestTables.TestTable create(Schema schema, PartitionSpec spec) {
     return TestTables.create(tableDir, "test", schema, spec, formatVersion);
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
index e79e297..a27124d 100644
--- a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
@@ -25,6 +25,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.transforms.Transform;
@@ -396,6 +398,85 @@ public class TestReplaceTransaction extends TableTestBase {
     validateSnapshot(null, meta.currentSnapshot(), FILE_A, FILE_B);
   }
 
+  @Test
+  public void testReplaceTransactionWithUnknownState() {
+    Schema newSchema = new Schema(
+        required(4, "id", Types.IntegerType.get()),
+        required(5, "data", Types.StringType.get()));
+
+    Snapshot start = table.currentSnapshot();
+    Schema schema = table.schema();
+
+    table.newAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    Assert.assertEquals("Version should be 1", 1L, (long) version());
+    validateSnapshot(start, table.currentSnapshot(), FILE_A);
+
+    TestTables.TestTableOperations ops = TestTables.opsWithCommitSucceedButStateUnknown(tableDir, "test");
+    Transaction replace = TestTables.beginReplace(tableDir, "test", newSchema, unpartitioned(),
+        SortOrder.unsorted(),  ImmutableMap.of(), ops);
+
+    replace.newAppend()
+        .appendFile(FILE_B)
+        .commit();
+
+    AssertHelpers.assertThrows("Transaction commit should fail with CommitStateUnknownException",
+        CommitStateUnknownException.class, "datacenter on fire", () -> replace.commitTransaction());
+
+    table.refresh();
+
+    Assert.assertEquals("Version should be 2", 2L, (long) version());
+    Assert.assertNotNull("Table should have a current snapshot", table.currentSnapshot());
+    Assert.assertEquals("Schema should use new schema, not compatible with previous",
+        schema.asStruct(), table.schema().asStruct());
+    Assert.assertEquals("Should have 4 files in metadata", 4, countAllMetadataFiles(tableDir));
+    validateSnapshot(null, table.currentSnapshot(), FILE_B);
+  }
+
+  @Test
+  public void testCreateTransactionWithUnknownState() throws IOException {
+    File tableDir = temp.newFolder();
+    Assert.assertTrue(tableDir.delete());
+
+    // this table doesn't exist.
+    TestTables.TestTableOperations ops = TestTables.opsWithCommitSucceedButStateUnknown(tableDir, "test_append");
+    Transaction replace = TestTables.beginReplace(tableDir, "test_append", SCHEMA, unpartitioned(),
+        SortOrder.unsorted(), ImmutableMap.of(), ops);
+
+    Assert.assertNull("Starting a create transaction should not commit metadata",
+        TestTables.readMetadata("test_append"));
+    Assert.assertNull("Should have no metadata version", TestTables.metadataVersion("test_append"));
+
+    Assert.assertTrue("Should return a transaction table", replace.table() instanceof BaseTransaction.TransactionTable);
+
+    replace.newAppend()
+        .appendFile(FILE_A)
+        .appendFile(FILE_B)
+        .commit();
+
+    Assert.assertNull("Appending in a transaction should not commit metadata",
+                      TestTables.readMetadata("test_append"));
+    Assert.assertNull("Should have no metadata version",
+                      TestTables.metadataVersion("test_append"));
+
+    AssertHelpers.assertThrows("Transaction commit should fail with CommitStateUnknownException",
+        CommitStateUnknownException.class, "datacenter on fire", () -> replace.commitTransaction());
+
+    TableMetadata meta = TestTables.readMetadata("test_append");
+    Assert.assertNotNull("Table metadata should be created after transaction commits", meta);
+    Assert.assertEquals("Should have metadata version 0", 0, (int) TestTables.metadataVersion("test_append"));
+    Assert.assertEquals("Should have 1 manifest file", 1, listManifestFiles(tableDir).size());
+    Assert.assertEquals("Should have 2 files in metadata", 2, countAllMetadataFiles(tableDir));
+    Assert.assertEquals("Table schema should match with reassigned IDs", assignFreshIds(SCHEMA).asStruct(),
+        meta.schema().asStruct());
+    Assert.assertEquals("Table spec should match", unpartitioned(), meta.spec());
+    Assert.assertEquals("Table should have one snapshot", 1, meta.snapshots().size());
+
+    validateSnapshot(null, meta.currentSnapshot(), FILE_A, FILE_B);
+  }
+
   private static Schema assignFreshIds(Schema schema) {
     AtomicInteger lastColumnId = new AtomicInteger(0);
     return TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java
index a24f94f..6f47b0d 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.util.Map;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
@@ -80,14 +81,18 @@ public class TestTables {
   }
 
   public static Transaction beginReplace(File temp, String name, Schema schema, PartitionSpec spec) {
-    return beginReplace(temp, name, schema, spec, SortOrder.unsorted(), ImmutableMap.of());
+    return beginReplace(temp, name, schema, spec, SortOrder.unsorted(), ImmutableMap.of(),
+                        new TestTableOperations(name, temp));
   }
 
   public static Transaction beginReplace(File temp, String name, Schema schema, PartitionSpec spec,
-                                         SortOrder sortOrder, Map<String, String> properties) {
-    TestTableOperations ops = new TestTableOperations(name, temp);
-    TableMetadata current = ops.current();
+      SortOrder sortOrder, Map<String, String> properties) {
+    return beginReplace(temp, name, schema, spec, sortOrder, properties, new TestTableOperations(name, temp));
+  }
 
+  public static Transaction beginReplace(File temp, String name, Schema schema, PartitionSpec spec,
+                                         SortOrder sortOrder, Map<String, String> properties, TestTableOperations ops) {
+    TableMetadata current = ops.current();
     TableMetadata metadata;
     if (current != null) {
       metadata = current.buildReplacement(schema, spec, sortOrder, current.location(), properties);
@@ -103,6 +108,21 @@ public class TestTables {
     return new TestTable(ops, name);
   }
 
+  public static TestTable tableWithCommitSucceedButStateUnknown(File temp, String name) {
+    TestTableOperations ops = opsWithCommitSucceedButStateUnknown(temp, name);
+    return new TestTable(ops, name);
+  }
+
+  public static TestTableOperations opsWithCommitSucceedButStateUnknown(File temp, String name) {
+    return new TestTableOperations(name, temp) {
+      @Override
+      public void commit(TableMetadata base, TableMetadata updatedMetadata) {
+        super.commit(base, updatedMetadata);
+        throw new CommitStateUnknownException(new RuntimeException("datacenter on fire"));
+      }
+    };
+  }
+
   public static class TestTable extends BaseTable {
     private final TestTableOperations ops;
 
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 08b5cf5..2b46118 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.UUID;
 import org.apache.iceberg.ManifestEntry.Status;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -699,4 +700,26 @@ public class TestTransaction extends TableTestBase {
 
     Assert.assertFalse("Append manifest should be deleted on expiry", new File(newManifest.path()).exists());
   }
+
+  @Test
+  public void testSimpleTransactionNotDeletingMetadataOnUnknownSate() throws IOException {
+    Table table = TestTables.tableWithCommitSucceedButStateUnknown(tableDir, "test");
+
+    Transaction transaction = table.newTransaction();
+    transaction.newAppend()
+        .appendFile(FILE_A)
+        .commit();
+
+    AssertHelpers.assertThrows(
+        "Transaction commit should fail with CommitStateUnknownException",
+        CommitStateUnknownException.class, "datacenter on fire",
+        () -> transaction.commitTransaction());
+
+    // Make sure metadata files still exist
+    Snapshot current = table.currentSnapshot();
+    List<ManifestFile> manifests = current.allManifests();
+    Assert.assertEquals("Should have 1 manifest file", 1, manifests.size());
+    Assert.assertTrue("Manifest file should exist", new File(manifests.get(0).path()).exists());
+    Assert.assertEquals("Should have 2 files in metadata", 2, countAllMetadataFiles(tableDir));
+  }
 }