You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2020/01/29 23:28:59 UTC

[incubator-hudi] branch master updated: HUDI-117 Close file handle before throwing an exception due to append failure. Add test cases to handle/verify stage failure scenarios.

This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f34be1  HUDI-117 Close file handle before throwing an exception due to append failure. Add test cases to handle/verify stage failure scenarios.
6f34be1 is described below

commit 6f34be1b8dd55304317c40565dad23c09ddda5bf
Author: Balajee Nagasubramaniam <ba...@uber.com>
AuthorDate: Tue Jan 28 15:53:57 2020 -0800

    HUDI-117 Close file handle before throwing an exception due to append failure.
    Add test cases to handle/verify stage failure scenarios.
---
 .../common/table/log/HoodieLogFormatWriter.java    | 42 +++++++++-
 .../hudi/common/table/log/TestHoodieLogFormat.java | 93 ++++++++++++++++++++++
 2 files changed, 134 insertions(+), 1 deletion(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index b8d5f89..201b879 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -72,6 +72,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
     this.replication = replication;
     this.logWriteToken = logWriteToken;
     this.rolloverLogWriteToken = rolloverLogWriteToken;
+    addShutDownHook();
     Path path = logFile.getPath();
     if (fs.exists(path)) {
       boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
@@ -87,6 +88,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
             // may still happen if scheme is viewfs.
             isAppendSupported = false;
           } else {
+            /*
+             * Before throwing an exception, close the outputstream,
+             * to ensure that the lease on the log file is released.
+             */
+            close();
             throw ioe;
           }
         }
@@ -221,6 +227,24 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
     return output.getPos();
   }
 
+  /**
+   * Close the output stream when the JVM exits.
+   */
+  private void addShutDownHook() {
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      public void run() {
+        try {
+          if (output != null) {
+            close();
+          }
+        } catch (Exception e) {
+          LOG.warn("unable to close output stream for log file " + logFile, e);
+          // fail silently for any sort of exception
+        }
+      }
+    });
+  }
+
   private void handleAppendExceptionOrRecoverLease(Path path, RemoteException e)
       throws IOException, InterruptedException {
     if (e.getMessage().contains(APPEND_UNAVAILABLE_EXCEPTION_MESSAGE)) {
@@ -256,7 +280,23 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
         throw new HoodieException(e);
       }
     } else {
-      throw new HoodieIOException("Failed to open an append stream ", e);
+      // When fs.append() has failed and an exception is thrown, by closing the output stream
+      // we shall force hdfs to release the lease on the log file. When Spark retries this task (with
+      // new attemptId, say taskId.1) it will be able to acquire lease on the log file (as output stream was
+      // closed properly by taskId.0).
+      //
+      // If close() call were to fail throwing an exception, our best bet is to rollover to a new log file.
+      try {
+        close();
+        // output stream has been successfully closed and lease on the log file has been released,
+        // before throwing an exception for the append failure.
+        throw new HoodieIOException("Failed to append to the output stream ", e);
+      } catch (Exception ce) {
+        LOG.warn("Failed to close the output stream for " + fs.getClass().getName() + " on path " + path
+            + ". Rolling over to a new log file.");
+        this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
+        createNewFile();
+      }
     }
   }
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
index f0a4902..c4853cb 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/log/TestHoodieLogFormat.java
@@ -1113,6 +1113,99 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
     assertEquals("We would read 0 records", 0, scanner.getTotalLogRecords());
   }
 
+  /*
+   * During a spark stage failure, when the stage is retried, tasks that are part of the previous attempt
+   * of the stage would continue to run.  As a result two different tasks could be performing the same operation.
+   * When trying to update the log file, only one of the tasks would succeed (one holding lease on the log file).
+   *
+   * In order to make progress in this scenario, second task attempting to update the log file would rollover to
+   * a new version of the log file.  As a result, we might end up with two log files with same set of data records
+   * present in both of them.
+   *
+   * Following uint tests mimic this scenario to ensure that the reader can handle merging multiple log files with
+   * duplicate data.
+   *
+   */
+  private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2)
+      throws IOException, URISyntaxException, InterruptedException {
+    try {
+      // Write one Data block with same InstantTime (written in same batch)
+      Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+      List<IndexedRecord> records = SchemaTestUtil.generateHoodieTestRecords(0, 101);
+      List<IndexedRecord> records2 = new ArrayList<>(records);
+
+      // Write1 with numRecordsInLog1 records written to log.1
+      Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+          .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
+          .overBaseCommit("100").withFs(fs).build();
+
+      Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
+      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+      HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records.subList(0, numRecordsInLog1), header);
+      writer = writer.appendBlock(dataBlock);
+      // Get the size of the block
+      long size = writer.getCurrentSize();
+      writer.close();
+
+      // write2 with numRecordsInLog2 records written to log.2
+      Writer writer2 = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+          .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
+          .overBaseCommit("100").withFs(fs).withSizeThreshold(size - 1).build();
+
+      Map<HoodieLogBlock.HeaderMetadataType, String> header2 = Maps.newHashMap();
+      header2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+      header2.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+      HoodieAvroDataBlock dataBlock2 = new HoodieAvroDataBlock(records2.subList(0, numRecordsInLog2), header2);
+      writer2 = writer2.appendBlock(dataBlock2);
+      // Get the size of the block
+      writer2.close();
+
+      // From the two log files generated, read the records
+      List<String> allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1",
+          HoodieLogFile.DELTA_EXTENSION, "100").map(s -> s.getPath().toString()).collect(Collectors.toList());
+
+      HoodieMergedLogRecordScanner scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema,
+          "100", 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+
+      assertEquals("We would read 100 records",
+          Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog());
+
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt()
+      throws IOException, URISyntaxException, InterruptedException {
+    /*
+     * FIRST_ATTEMPT_FAILED:
+     * Original task from the stage attempt failed, but subsequent stage retry succeeded.
+     */
+    testAvroLogRecordReaderMergingMultipleLogFiles(77, 100);
+  }
+
+  @Test
+  public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt()
+      throws IOException, URISyntaxException, InterruptedException {
+    /*
+     * SECOND_ATTEMPT_FAILED:
+     * Original task from stage attempt succeeded, but subsequent retry attempt failed.
+     */
+    testAvroLogRecordReaderMergingMultipleLogFiles(100, 66);
+  }
+
+  @Test
+  public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts()
+      throws IOException, URISyntaxException, InterruptedException {
+    /*
+     * BOTH_ATTEMPTS_SUCCEEDED:
+     * Original task from the stage attempt and duplicate task from the stage retry succeeded.
+     */
+    testAvroLogRecordReaderMergingMultipleLogFiles(100, 100);
+  }
+
   @SuppressWarnings("unchecked")
   @Test
   public void testBasicAppendAndReadInReverse() throws IOException, URISyntaxException, InterruptedException {