You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/12/19 23:58:40 UTC
[iceberg] branch master updated: Core: Fix file cleaning in transactions with unknown commit state (#3733)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6453245 Core: Fix file cleaning in transactions with unknown commit state (#3733)
6453245 is described below
commit 6453245af821f6dbdb3c46a18d89e4677784c7d3
Author: Tim Jiang <13...@users.noreply.github.com>
AuthorDate: Sun Dec 19 15:58:28 2021 -0800
Core: Fix file cleaning in transactions with unknown commit state (#3733)
---
.../java/org/apache/iceberg/BaseTransaction.java | 10 +++
.../java/org/apache/iceberg/TableTestBase.java | 5 ++
.../org/apache/iceberg/TestReplaceTransaction.java | 81 ++++++++++++++++++++++
.../test/java/org/apache/iceberg/TestTables.java | 28 ++++++--
.../java/org/apache/iceberg/TestTransaction.java | 23 ++++++
5 files changed, 143 insertions(+), 4 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
index 7e779bc..50550ab 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.LocationProvider;
@@ -245,6 +246,9 @@ class BaseTransaction implements Transaction {
try {
ops.commit(null, current);
+ } catch (CommitStateUnknownException e) {
+ throw e;
+
} catch (RuntimeException e) {
// the commit failed and no files were committed. clean up each update.
Tasks.foreach(updates)
@@ -298,6 +302,9 @@ class BaseTransaction implements Transaction {
underlyingOps.commit(base, current);
});
+ } catch (CommitStateUnknownException e) {
+ throw e;
+
} catch (RuntimeException e) {
// the commit failed and no files were committed. clean up each update.
Tasks.foreach(updates)
@@ -356,6 +363,9 @@ class BaseTransaction implements Transaction {
underlyingOps.commit(base, current);
});
+ } catch (CommitStateUnknownException e) {
+ throw e;
+
} catch (RuntimeException e) {
// the commit failed and no files were committed. clean up each update.
Tasks.foreach(updates)
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index 1b8a3ca..8fce621 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -193,6 +194,10 @@ public class TableTestBase {
!name.startsWith("snap") && Files.getFileExtension(name).equalsIgnoreCase("avro")));
}
+ public static long countAllMetadataFiles(File tableDir) {
+ return Arrays.stream(new File(tableDir, "metadata").listFiles()).filter(f -> f.isFile()).count();
+ }
+
protected TestTables.TestTable create(Schema schema, PartitionSpec spec) {
return TestTables.create(tableDir, "test", schema, spec, formatVersion);
}
diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
index e79e297..a27124d 100644
--- a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
@@ -25,6 +25,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.transforms.Transform;
@@ -396,6 +398,85 @@ public class TestReplaceTransaction extends TableTestBase {
validateSnapshot(null, meta.currentSnapshot(), FILE_A, FILE_B);
}
+ @Test
+ public void testReplaceTransactionWithUnknownState() {
+ Schema newSchema = new Schema(
+ required(4, "id", Types.IntegerType.get()),
+ required(5, "data", Types.StringType.get()));
+
+ Snapshot start = table.currentSnapshot();
+ Schema schema = table.schema();
+
+ table.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ Assert.assertEquals("Version should be 1", 1L, (long) version());
+ validateSnapshot(start, table.currentSnapshot(), FILE_A);
+
+ TestTables.TestTableOperations ops = TestTables.opsWithCommitSucceedButStateUnknown(tableDir, "test");
+ Transaction replace = TestTables.beginReplace(tableDir, "test", newSchema, unpartitioned(),
+ SortOrder.unsorted(), ImmutableMap.of(), ops);
+
+ replace.newAppend()
+ .appendFile(FILE_B)
+ .commit();
+
+ AssertHelpers.assertThrows("Transaction commit should fail with CommitStateUnknownException",
+ CommitStateUnknownException.class, "datacenter on fire", () -> replace.commitTransaction());
+
+ table.refresh();
+
+ Assert.assertEquals("Version should be 2", 2L, (long) version());
+ Assert.assertNotNull("Table should have a current snapshot", table.currentSnapshot());
+ Assert.assertEquals("Schema should use new schema, not compatible with previous",
+ schema.asStruct(), table.schema().asStruct());
+ Assert.assertEquals("Should have 4 files in metadata", 4, countAllMetadataFiles(tableDir));
+ validateSnapshot(null, table.currentSnapshot(), FILE_B);
+ }
+
+ @Test
+ public void testCreateTransactionWithUnknownState() throws IOException {
+ File tableDir = temp.newFolder();
+ Assert.assertTrue(tableDir.delete());
+
+ // this table doesn't exist.
+ TestTables.TestTableOperations ops = TestTables.opsWithCommitSucceedButStateUnknown(tableDir, "test_append");
+ Transaction replace = TestTables.beginReplace(tableDir, "test_append", SCHEMA, unpartitioned(),
+ SortOrder.unsorted(), ImmutableMap.of(), ops);
+
+ Assert.assertNull("Starting a create transaction should not commit metadata",
+ TestTables.readMetadata("test_append"));
+ Assert.assertNull("Should have no metadata version", TestTables.metadataVersion("test_append"));
+
+ Assert.assertTrue("Should return a transaction table", replace.table() instanceof BaseTransaction.TransactionTable);
+
+ replace.newAppend()
+ .appendFile(FILE_A)
+ .appendFile(FILE_B)
+ .commit();
+
+ Assert.assertNull("Appending in a transaction should not commit metadata",
+ TestTables.readMetadata("test_append"));
+ Assert.assertNull("Should have no metadata version",
+ TestTables.metadataVersion("test_append"));
+
+ AssertHelpers.assertThrows("Transaction commit should fail with CommitStateUnknownException",
+ CommitStateUnknownException.class, "datacenter on fire", () -> replace.commitTransaction());
+
+ TableMetadata meta = TestTables.readMetadata("test_append");
+ Assert.assertNotNull("Table metadata should be created after transaction commits", meta);
+ Assert.assertEquals("Should have metadata version 0", 0, (int) TestTables.metadataVersion("test_append"));
+ Assert.assertEquals("Should have 1 manifest file", 1, listManifestFiles(tableDir).size());
+ Assert.assertEquals("Should have 2 files in metadata", 2, countAllMetadataFiles(tableDir));
+ Assert.assertEquals("Table schema should match with reassigned IDs", assignFreshIds(SCHEMA).asStruct(),
+ meta.schema().asStruct());
+ Assert.assertEquals("Table spec should match", unpartitioned(), meta.spec());
+ Assert.assertEquals("Table should have one snapshot", 1, meta.snapshots().size());
+
+ validateSnapshot(null, meta.currentSnapshot(), FILE_A, FILE_B);
+ }
+
private static Schema assignFreshIds(Schema schema) {
AtomicInteger lastColumnId = new AtomicInteger(0);
return TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet);
diff --git a/core/src/test/java/org/apache/iceberg/TestTables.java b/core/src/test/java/org/apache/iceberg/TestTables.java
index a24f94f..6f47b0d 100644
--- a/core/src/test/java/org/apache/iceberg/TestTables.java
+++ b/core/src/test/java/org/apache/iceberg/TestTables.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.util.Map;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
@@ -80,14 +81,18 @@ public class TestTables {
}
public static Transaction beginReplace(File temp, String name, Schema schema, PartitionSpec spec) {
- return beginReplace(temp, name, schema, spec, SortOrder.unsorted(), ImmutableMap.of());
+ return beginReplace(temp, name, schema, spec, SortOrder.unsorted(), ImmutableMap.of(),
+ new TestTableOperations(name, temp));
}
public static Transaction beginReplace(File temp, String name, Schema schema, PartitionSpec spec,
- SortOrder sortOrder, Map<String, String> properties) {
- TestTableOperations ops = new TestTableOperations(name, temp);
- TableMetadata current = ops.current();
+ SortOrder sortOrder, Map<String, String> properties) {
+ return beginReplace(temp, name, schema, spec, sortOrder, properties, new TestTableOperations(name, temp));
+ }
+ public static Transaction beginReplace(File temp, String name, Schema schema, PartitionSpec spec,
+ SortOrder sortOrder, Map<String, String> properties, TestTableOperations ops) {
+ TableMetadata current = ops.current();
TableMetadata metadata;
if (current != null) {
metadata = current.buildReplacement(schema, spec, sortOrder, current.location(), properties);
@@ -103,6 +108,21 @@ public class TestTables {
return new TestTable(ops, name);
}
+ public static TestTable tableWithCommitSucceedButStateUnknown(File temp, String name) {
+ TestTableOperations ops = opsWithCommitSucceedButStateUnknown(temp, name);
+ return new TestTable(ops, name);
+ }
+
+ public static TestTableOperations opsWithCommitSucceedButStateUnknown(File temp, String name) {
+ return new TestTableOperations(name, temp) {
+ @Override
+ public void commit(TableMetadata base, TableMetadata updatedMetadata) {
+ super.commit(base, updatedMetadata);
+ throw new CommitStateUnknownException(new RuntimeException("datacenter on fire"));
+ }
+ };
+ }
+
public static class TestTable extends BaseTable {
private final TestTableOperations ops;
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 08b5cf5..2b46118 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.UUID;
import org.apache.iceberg.ManifestEntry.Status;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
@@ -699,4 +700,26 @@ public class TestTransaction extends TableTestBase {
Assert.assertFalse("Append manifest should be deleted on expiry", new File(newManifest.path()).exists());
}
+
+ @Test
+ public void testSimpleTransactionNotDeletingMetadataOnUnknownSate() throws IOException {
+ Table table = TestTables.tableWithCommitSucceedButStateUnknown(tableDir, "test");
+
+ Transaction transaction = table.newTransaction();
+ transaction.newAppend()
+ .appendFile(FILE_A)
+ .commit();
+
+ AssertHelpers.assertThrows(
+ "Transaction commit should fail with CommitStateUnknownException",
+ CommitStateUnknownException.class, "datacenter on fire",
+ () -> transaction.commitTransaction());
+
+ // Make sure metadata files still exist
+ Snapshot current = table.currentSnapshot();
+ List<ManifestFile> manifests = current.allManifests();
+ Assert.assertEquals("Should have 1 manifest file", 1, manifests.size());
+ Assert.assertTrue("Manifest file should exist", new File(manifests.get(0).path()).exists());
+ Assert.assertEquals("Should have 2 files in metadata", 2, countAllMetadataFiles(tableDir));
+ }
}