You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/04/09 00:16:32 UTC

[incubator-iceberg] branch master updated: Delete temporary metadata file when rename fails in HadoopTableOperations (#144)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 84675a8  Delete temporary metadata file when rename fails in HadoopTableOperations (#144)
84675a8 is described below

commit 84675a8ed5d600921adb886d50d42acc6d5252f8
Author: Xiao <xi...@users.noreply.github.com>
AuthorDate: Mon Apr 8 17:16:28 2019 -0700

    Delete temporary metadata file when rename fails in HadoopTableOperations (#144)
---
 .../iceberg/hadoop/HadoopTableOperations.java      | 57 +++++++++++++---
 .../apache/iceberg/hadoop/HadoopTableTestBase.java |  4 ++
 .../apache/iceberg/hadoop/TestHadoopCommits.java   | 76 +++++++++++++++++++++-
 3 files changed, 126 insertions(+), 11 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
index 9d4bb68..c5033be 100644
--- a/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
+++ b/core/src/main/java/org/apache/iceberg/hadoop/HadoopTableOperations.java
@@ -136,16 +136,8 @@ public class HadoopTableOperations implements TableOperations {
           "Failed to check if next version exists: " + finalMetadataFile);
     }
 
-    try {
-      // this rename operation is the atomic commit operation
-      if (!fs.rename(tempMetadataFile, finalMetadataFile)) {
-        throw new CommitFailedException(
-            "Failed to commit changes using rename: %s", finalMetadataFile);
-      }
-    } catch (IOException e) {
-      throw new CommitFailedException(e,
-          "Failed to commit changes using rename: %s", finalMetadataFile);
-    }
+    // this rename operation is the atomic commit operation
+    renameToFinal(fs, tempMetadataFile, finalMetadataFile);
 
     // update the best-effort version pointer
     writeVersionHint(nextVersion);
@@ -212,6 +204,51 @@ public class HadoopTableOperations implements TableOperations {
     }
   }
 
+  /**
+   * Renames the source file to destination, using the provided file system. If the rename failed,
+   * an attempt will be made to delete the source file.
+   *
+   * @param fs the filesystem used for the rename
+   * @param src the source file
+   * @param dst the destination file
+   */
+  private void renameToFinal(FileSystem fs, Path src, Path dst) {
+    try {
+      if (!fs.rename(src, dst)) {
+        CommitFailedException cfe = new CommitFailedException(
+            "Failed to commit changes using rename: %s", dst);
+        RuntimeException re = tryDelete(src);
+        if (re != null) {
+          cfe.addSuppressed(re);
+        }
+        throw cfe;
+      }
+    } catch (IOException e) {
+      CommitFailedException cfe = new CommitFailedException(e,
+          "Failed to commit changes using rename: %s", dst);
+      RuntimeException re = tryDelete(src);
+      if (re != null) {
+        cfe.addSuppressed(re);
+      }
+      throw cfe;
+    }
+  }
+
+  /**
+   * Deletes the file from the file system. Any RuntimeException will be caught and returned.
+   *
+   * @param location the file to be deleted.
+   * @return RuntimeException caught, if any. null otherwise.
+   */
+  private RuntimeException tryDelete(Path location) {
+    try {
+      io().deleteFile(location.toString());
+      return null;
+    } catch (RuntimeException re) {
+      return re;
+    }
+  }
+
   protected FileSystem getFS(Path path, Configuration conf) {
     return Util.getFS(path, conf);
   }
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java b/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
index 70d7401..ef0e5ab 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/HadoopTableTestBase.java
@@ -120,6 +120,10 @@ public class HadoopTableTestBase {
         !name.startsWith("snap") && Files.getFileExtension(name).equalsIgnoreCase("avro")));
   }
 
+  List<File> listMetadataJsonFiles() {
+    return Lists.newArrayList(metadataDir.listFiles((dir, name) -> name.endsWith(".metadata.json")));
+  }
+
   File version(int i) {
     return new File(metadataDir, "v" + i + getFileExtension(new Configuration()));
   }
diff --git a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
index f3d1b62..fb15b56 100644
--- a/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
+++ b/core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
@@ -20,23 +20,38 @@
 package org.apache.iceberg.hadoop;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestHadoopCommits extends HadoopTableTestBase {
 
@@ -293,4 +308,63 @@ public class TestHadoopCommits extends HadoopTableTestBase {
     Assert.assertEquals("Current snapshot should contain 1 merged manifest",
         1, metadata.currentSnapshot().manifests().size());
   }
+
+  @Test
+  public void testRenameReturnFalse() throws Exception {
+    FileSystem mockFS = mock(FileSystem.class);
+    when(mockFS.exists(any())).thenReturn(false);
+    when(mockFS.rename(any(), any())).thenReturn(false);
+    testRenameWithFS(mockFS);
+  }
+
+  @Test
+  public void testRenameThrow() throws Exception {
+    FileSystem mockFS = mock(FileSystem.class);
+    when(mockFS.exists(any())).thenReturn(false);
+    when(mockFS.rename(any(), any())).thenThrow(new IOException("test injected"));
+    testRenameWithFS(mockFS);
+  }
+
+  /**
+   * Test rename during {@link HadoopTableOperations#commit(TableMetadata, TableMetadata)} with the provided
+   * {@link FileSystem} object. The provided FileSystem will be injected for commit call.
+   */
+  private void testRenameWithFS(FileSystem mockFS) throws Exception {
+    assertTrue("Should create v1 metadata",
+        version(1).exists() && version(1).isFile());
+    assertFalse("Should not create v2 or newer versions",
+        version(2).exists());
+    assertTrue(table instanceof BaseTable);
+    BaseTable baseTable = (BaseTable) table;
+    // use v1 metafile as the test rename destination.
+    TableMetadata meta1 = baseTable.operations().current();
+
+    // create v2 metafile as base. This is solely for the convenience of rename testing later
+    // (so that we have 2 valid and different metadata files, which will reach the rename part during commit)
+    table.updateSchema()
+        .addColumn("n", Types.IntegerType.get())
+        .commit();
+    assertTrue("Should create v2 for the update",
+        version(2).exists() && version(2).isFile());
+    assertEquals("Should write the current version to the hint file",
+        2, readVersionHint());
+
+    // mock / spy the classes for testing
+    TableOperations tops = baseTable.operations();
+    assertTrue(tops instanceof HadoopTableOperations);
+    HadoopTableOperations spyOps = Mockito.spy((HadoopTableOperations) tops);
+
+    // inject the mockFS into the TableOperations
+    doReturn(mockFS).when(spyOps).getFS(any(), any());
+    try {
+      spyOps.commit(spyOps.current(), meta1);
+      fail("Commit should fail due to mock file system");
+    } catch (CommitFailedException expected) {
+    }
+
+    // Verifies that there is no temporary metadata.json files left on rename failures.
+    Set<String> actual = listMetadataJsonFiles().stream().map(File::getName).collect(Collectors.toSet());
+    Set<String> expected = Sets.newHashSet("v1.metadata.json", "v2.metadata.json");
+    assertEquals("only v1 and v2 metadata.json should exist.", expected, actual);
+  }
 }