You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/04/09 00:16:32 UTC
[incubator-iceberg] branch master updated: Delete temporary
metadata file when rename fails in HadoopTableOperations (#144)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 84675a8 Delete temporary metadata file when rename fails in HadoopTableOperations (#144)
84675a8 is described below
commit 84675a8ed5d600921adb886d50d42acc6d5252f8
Author: Xiao <xi...@users.noreply.github.com>
AuthorDate: Mon Apr 8 17:16:28 2019 -0700
Delete temporary metadata file when rename fails in HadoopTableOperations (#144)
---
.../iceberg/hadoop/HadoopTableOperations.java | 57 +++++++++++++---
.../apache/iceberg/hadoop/HadoopTableTestBase.java | 4 ++
.../apache/iceberg/hadoop/TestHadoopCommits.java | 76 +++++++++++++++++++++-
3 files changed, 126 insertions(+), 11 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index 9d4bb68..c5033be 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -136,16 +136,8 @@ public class HadoopTableOperations implements TableOperations {
"Failed to check if next version exists: " + finalMetadataFile);
}
- try {
- // this rename operation is the atomic commit operation
- if (!fs.rename(tempMetadataFile, finalMetadataFile)) {
- throw new CommitFailedException(
- "Failed to commit changes using rename: %s", finalMetadataFile);
- }
- } catch (IOException e) {
- throw new CommitFailedException(e,
- "Failed to commit changes using rename: %s", finalMetadataFile);
- }
+ // this rename operation is the atomic commit operation
+ renameToFinal(fs, tempMetadataFile, finalMetadataFile);
// update the best-effort version pointer
writeVersionHint(nextVersion);
@@ -212,6 +204,51 @@ public class HadoopTableOperations implements TableOperations {
}
}
+ /**
+ * Renames the source file to destination, using the provided file system. If the rename failed,
+ * an attempt will be made to delete the source file.
+ *
+ * @param fs the filesystem used for the rename
+ * @param src the source file
+ * @param dst the destination file
+ */
+ private void renameToFinal(FileSystem fs, Path src, Path dst) {
+ try {
+ if (!fs.rename(src, dst)) {
+ CommitFailedException cfe = new CommitFailedException(
+ "Failed to commit changes using rename: %s", dst);
+ RuntimeException re = tryDelete(src);
+ if (re != null) {
+ cfe.addSuppressed(re);
+ }
+ throw cfe;
+ }
+ } catch (IOException e) {
+ CommitFailedException cfe = new CommitFailedException(e,
+ "Failed to commit changes using rename: %s", dst);
+ RuntimeException re = tryDelete(src);
+ if (re != null) {
+ cfe.addSuppressed(re);
+ }
+ throw cfe;
+ }
+ }
+
+ /**
+ * Deletes the file from the file system. Any RuntimeException will be caught and returned.
+ *
+ * @param location the file to be deleted.
+ * @return RuntimeException caught, if any. null otherwise.
+ */
+ private RuntimeException tryDelete(Path location) {
+ try {
+ io().deleteFile(location.toString());
+ return null;
+ } catch (RuntimeException re) {
+ return re;
+ }
+ }
+
protected FileSystem getFS(Path path, Configuration conf) {
return Util.getFS(path, conf);
}
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
index 70d7401..ef0e5ab 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
@@ -120,6 +120,10 @@ public class HadoopTableTestBase {
!name.startsWith("snap") && Files.getFileExtension(name).equalsIgnoreCase("avro")));
}
+ List<File> listMetadataJsonFiles() {
+ return Lists.newArrayList(metadataDir.listFiles((dir, name) -> name.endsWith(".metadata.json")));
+ }
+
File version(int i) {
return new File(metadataDir, "v" + i + getFileExtension(new Configuration()));
}
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
index f3d1b62..fb15b56 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
@@ -20,23 +20,38 @@
package org.apache.iceberg.hadoop;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.io.File;
+import java.io.IOException;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.BaseTable;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestHadoopCommits extends HadoopTableTestBase {
@@ -293,4 +308,63 @@ public class TestHadoopCommits extends HadoopTableTestBase {
Assert.assertEquals("Current snapshot should contain 1 merged manifest",
1, metadata.currentSnapshot().manifests().size());
}
+
+ @Test
+ public void testRenameReturnFalse() throws Exception {
+ FileSystem mockFS = mock(FileSystem.class);
+ when(mockFS.exists(any())).thenReturn(false);
+ when(mockFS.rename(any(), any())).thenReturn(false);
+ testRenameWithFS(mockFS);
+ }
+
+ @Test
+ public void testRenameThrow() throws Exception {
+ FileSystem mockFS = mock(FileSystem.class);
+ when(mockFS.exists(any())).thenReturn(false);
+ when(mockFS.rename(any(), any())).thenThrow(new IOException("test injected"));
+ testRenameWithFS(mockFS);
+ }
+
+ /**
+ * Test rename during {@link HadoopTableOperations#commit(TableMetadata, TableMetadata)} with the provided
+ * {@link FileSystem} object. The provided FileSystem will be injected for commit call.
+ */
+ private void testRenameWithFS(FileSystem mockFS) throws Exception {
+ assertTrue("Should create v1 metadata",
+ version(1).exists() && version(1).isFile());
+ assertFalse("Should not create v2 or newer versions",
+ version(2).exists());
+ assertTrue(table instanceof BaseTable);
+ BaseTable baseTable = (BaseTable) table;
+ // use v1 metafile as the test rename destination.
+ TableMetadata meta1 = baseTable.operations().current();
+
+ // create v2 metafile as base. This is solely for the convenience of rename testing later
+ // (so that we have 2 valid and different metadata files, which will reach the rename part during commit)
+ table.updateSchema()
+ .addColumn("n", Types.IntegerType.get())
+ .commit();
+ assertTrue("Should create v2 for the update",
+ version(2).exists() && version(2).isFile());
+ assertEquals("Should write the current version to the hint file",
+ 2, readVersionHint());
+
+ // mock / spy the classes for testing
+ TableOperations tops = baseTable.operations();
+ assertTrue(tops instanceof HadoopTableOperations);
+ HadoopTableOperations spyOps = Mockito.spy((HadoopTableOperations) tops);
+
+ // inject the mockFS into the TableOperations
+ doReturn(mockFS).when(spyOps).getFS(any(), any());
+ try {
+ spyOps.commit(spyOps.current(), meta1);
+ fail("Commit should fail due to mock file system");
+ } catch (CommitFailedException expected) {
+ }
+
+ // Verifies that there is no temporary metadata.json files left on rename failures.
+ Set<String> actual = listMetadataJsonFiles().stream().map(File::getName).collect(Collectors.toSet());
+ Set<String> expected = Sets.newHashSet("v1.metadata.json", "v2.metadata.json");
+ assertEquals("only v1 and v2 metadata.json should exist.", expected, actual);
+ }
}