You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/11/02 11:43:33 UTC

[hudi] branch master updated: [HUDI-2515] Add close when producing records failed (#3746)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f239187  [HUDI-2515] Add close when producing records failed (#3746)
f239187 is described below

commit f239187da8052680bde355efbaa33149d37b4280
Author: 董可伦 <do...@inspur.com>
AuthorDate: Tue Nov 2 19:43:20 2021 +0800

    [HUDI-2515] Add close when producing records failed (#3746)
---
 .../org/apache/hudi/common/util/ParquetReaderIterator.java | 14 ++++++++------
 .../hudi/common/util/queue/BoundedInMemoryQueue.java       |  5 +++--
 .../apache/hudi/common/util/TestParquetReaderIterator.java |  7 +++++--
 3 files changed, 16 insertions(+), 10 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
index 20c79dd..5970e02 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetReaderIterator.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.common.util;
 
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueue;
-import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieException;
 
 import org.apache.parquet.hadoop.ParquetReader;
 
@@ -49,8 +49,9 @@ public class ParquetReaderIterator<T> implements Iterator<T> {
         this.next = parquetReader.read();
       }
       return this.next != null;
-    } catch (IOException io) {
-      throw new HoodieIOException("unable to read next record from parquet file ", io);
+    } catch (Exception e) {
+      FileIOUtils.closeQuietly(parquetReader);
+      throw new HoodieException("unable to read next record from parquet file ", e);
     }
   }
 
@@ -60,14 +61,15 @@ public class ParquetReaderIterator<T> implements Iterator<T> {
       // To handle case when next() is called before hasNext()
       if (this.next == null) {
         if (!hasNext()) {
-          throw new HoodieIOException("No more records left to read from parquet file");
+          throw new HoodieException("No more records left to read from parquet file");
         }
       }
       T retVal = this.next;
       this.next = parquetReader.read();
       return retVal;
-    } catch (IOException io) {
-      throw new HoodieIOException("unable to read next record from parquet file ", io);
+    } catch (Exception e) {
+      FileIOUtils.closeQuietly(parquetReader);
+      throw new HoodieException("unable to read next record from parquet file ", e);
     }
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
index 4d55249..dfe33b4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
@@ -172,7 +172,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
   /**
    * Inserts record into queue after applying transformation.
    *
-   * @param t Item to be queueed
+   * @param t Item to be queued
    */
   public void insertRecord(I t) throws Exception {
     // If already closed, throw exception
@@ -222,7 +222,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
         throw new HoodieException(e);
       }
     }
-    // Check one more time here as it is possible producer errored out and closed immediately
+    // Check one more time here as it is possible producer erred out and closed immediately
     throwExceptionIfFailed();
 
     if (newRecord != null && newRecord.isPresent()) {
@@ -244,6 +244,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
 
   private void throwExceptionIfFailed() {
     if (this.hasFailed.get() != null) {
+      close();
       throw new HoodieException("operation has failed", this.hasFailed.get());
     }
   }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java
index 799ed24..37fead4 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetReaderIterator.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.common.util;
 
-import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieException;
 
 import org.apache.parquet.hadoop.ParquetReader;
 import org.junit.jupiter.api.Test;
@@ -30,6 +30,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestParquetReaderIterator {
@@ -59,6 +61,7 @@ public class TestParquetReaderIterator {
     assertEquals(1, iterator.next());
     // no more entries to iterate on
     assertFalse(iterator.hasNext());
-    assertThrows(HoodieIOException.class, iterator::next, "should throw an exception since there is only 1 record");
+    assertThrows(HoodieException.class, iterator::next, "should throw an exception since there is only 1 record");
+    verify(reader, times(1)).close();
   }
 }