You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/09/24 02:59:09 UTC

[iceberg] branch master updated: Core: Avoid throwing IOException in new write methods (#3170)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 31efe35  Core: Avoid throwing IOException in new write methods (#3170)
31efe35 is described below

commit 31efe350218a52a0db14993284425109f0529b61
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu Sep 23 19:58:57 2021 -0700

    Core: Avoid throwing IOException in new write methods (#3170)
---
 .../iceberg/deletes/EqualityDeleteWriter.java      |  2 +-
 .../iceberg/deletes/PositionDeleteWriter.java      |  2 +-
 .../org/apache/iceberg/io/ClusteredWriter.java     | 11 ++++--
 .../java/org/apache/iceberg/io/FanoutWriter.java   |  2 +-
 .../java/org/apache/iceberg/io/FileWriter.java     |  7 ++--
 .../org/apache/iceberg/io/PartitioningWriter.java  |  4 +--
 .../org/apache/iceberg/io/RollingFileWriter.java   | 11 ++++--
 .../apache/iceberg/io/SortedPosDeleteWriter.java   |  2 +-
 .../apache/iceberg/io/TestPartitioningWriters.java | 41 ++++------------------
 9 files changed, 30 insertions(+), 52 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java
index 7e0a826..c914ad2 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java
@@ -58,7 +58,7 @@ public class EqualityDeleteWriter<T> implements FileWriter<T, DeleteWriteResult>
   }
 
   @Override
-  public void write(T row) throws IOException {
+  public void write(T row) {
     appender.add(row);
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
index 7532420..a7dff07 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
@@ -57,7 +57,7 @@ public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, De
   }
 
   @Override
-  public void write(PositionDelete<T> positionDelete) throws IOException {
+  public void write(PositionDelete<T> positionDelete) {
     referencedDataFiles.add(positionDelete.path());
     appender.add(positionDelete);
   }
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
index 8729fd1..61a6f9f 100644
--- a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.io;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.Comparator;
 import java.util.Set;
 import org.apache.iceberg.PartitionSpec;
@@ -63,7 +64,7 @@ abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
   protected abstract R aggregatedResult();
 
   @Override
-  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+  public void write(T row, PartitionSpec spec, StructLike partition) {
     if (!spec.equals(currentSpec)) {
       if (currentSpec != null) {
         closeCurrentWriter();
@@ -110,9 +111,13 @@ abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
     }
   }
 
-  private void closeCurrentWriter() throws IOException {
+  private void closeCurrentWriter() {
     if (currentWriter != null) {
-      currentWriter.close();
+      try {
+        currentWriter.close();
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to close current writer", e);
+      }
 
       addResult(currentWriter.result());
 
diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
index 122a25d..631fc0a 100644
--- a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
@@ -49,7 +49,7 @@ abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
   protected abstract R aggregatedResult();
 
   @Override
-  public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+  public void write(T row, PartitionSpec spec, StructLike partition) {
     FileWriter<T, R> writer = writer(spec, partition);
     writer.write(row);
   }
diff --git a/core/src/main/java/org/apache/iceberg/io/FileWriter.java b/core/src/main/java/org/apache/iceberg/io/FileWriter.java
index cbaca4f..6f0c4ab 100644
--- a/core/src/main/java/org/apache/iceberg/io/FileWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/FileWriter.java
@@ -20,7 +20,6 @@
 package org.apache.iceberg.io;
 
 import java.io.Closeable;
-import java.io.IOException;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 
@@ -41,9 +40,8 @@ public interface FileWriter<T, R> extends Closeable {
    * Writes rows to a predefined spec/partition.
    *
    * @param rows data or delete records
-   * @throws IOException in case of an error during the write process
    */
-  default void write(Iterable<T> rows) throws IOException {
+  default void write(Iterable<T> rows) {
     for (T row : rows) {
       write(row);
     }
@@ -53,9 +51,8 @@ public interface FileWriter<T, R> extends Closeable {
    * Writes a row to a predefined spec/partition.
    *
    * @param row a data or delete record
-   * @throws IOException in case of an error during the write process
    */
-  void write(T row) throws IOException;
+  void write(T row);
 
   /**
    * Returns the number of bytes that were currently written by this writer.
diff --git a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java
index 329e68c..4afdd21 100644
--- a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java
@@ -20,7 +20,6 @@
 package org.apache.iceberg.io;
 
 import java.io.Closeable;
-import java.io.IOException;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.PartitionSpec;
@@ -46,9 +45,8 @@ public interface PartitioningWriter<T, R> extends Closeable {
    * @param row a data or delete record
    * @param spec a partition spec
    * @param partition a partition or null if the spec is unpartitioned
-   * @throws IOException in case of an error during the write process
    */
-  void write(T row, PartitionSpec spec, StructLike partition) throws IOException;
+  void write(T row, PartitionSpec spec, StructLike partition);
 
   /**
    * Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s.
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
index ed35933..24a6ce3 100644
--- a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.io;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.encryption.EncryptedOutputFile;
@@ -81,7 +82,7 @@ abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements Fi
   }
 
   @Override
-  public void write(T row) throws IOException {
+  public void write(T row) {
     currentWriter.write(row);
     currentFileRows++;
 
@@ -111,9 +112,13 @@ abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements Fi
     }
   }
 
-  private void closeCurrentWriter() throws IOException {
+  private void closeCurrentWriter() {
     if (currentWriter != null) {
-      currentWriter.close();
+      try {
+        currentWriter.close();
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failed to close current writer", e);
+      }
 
       if (currentFileRows == 0L) {
         io.deleteFile(currentFile.encryptingOutputFile());
diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
index f1c9e52..36a0313 100644
--- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
@@ -79,7 +79,7 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr
   }
 
   @Override
-  public void write(PositionDelete<T> payload) throws IOException {
+  public void write(PositionDelete<T> payload) {
     delete(payload.path(), payload.pos(), payload.row());
   }
 
diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
index b01ccc5..054881a 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.io;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.List;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
@@ -152,13 +151,7 @@ public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
 
     AssertHelpers.assertThrows("Should fail to write out of order partitions",
         IllegalStateException.class, "Encountered records that belong to already closed files",
-        () -> {
-          try {
-            writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
-          } catch (IOException e) {
-            throw new UncheckedIOException(e);
-          }
-        });
+        () -> writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa")));
 
     writer.close();
   }
@@ -303,23 +296,11 @@ public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
 
     AssertHelpers.assertThrows("Should fail to write out of order partitions",
         IllegalStateException.class, "Encountered records that belong to already closed files",
-        () -> {
-          try {
-            writer.write(toRow(7, "ccc"), identitySpec, partitionKey(identitySpec, "ccc"));
-          } catch (IOException e) {
-            throw new UncheckedIOException(e);
-          }
-        });
+        () -> writer.write(toRow(7, "ccc"), identitySpec, partitionKey(identitySpec, "ccc")));
 
     AssertHelpers.assertThrows("Should fail to write out of order specs",
         IllegalStateException.class, "Encountered records that belong to already closed files",
-        () -> {
-          try {
-            writer.write(toRow(7, "aaa"), unpartitionedSpec, null);
-          } catch (IOException e) {
-            throw new UncheckedIOException(e);
-          }
-        });
+        () -> writer.write(toRow(7, "aaa"), unpartitionedSpec, null));
 
     writer.close();
   }
@@ -459,23 +440,15 @@ public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
     AssertHelpers.assertThrows("Should fail to write out of order partitions",
         IllegalStateException.class, "Encountered records that belong to already closed files",
         () -> {
-          try {
-            PositionDelete<T> positionDelete = positionDelete("file-5.parquet", 1L, null);
-            writer.write(positionDelete, identitySpec, partitionKey(identitySpec, "ccc"));
-          } catch (IOException e) {
-            throw new UncheckedIOException(e);
-          }
+          PositionDelete<T> positionDelete = positionDelete("file-5.parquet", 1L, null);
+          writer.write(positionDelete, identitySpec, partitionKey(identitySpec, "ccc"));
         });
 
     AssertHelpers.assertThrows("Should fail to write out of order specs",
         IllegalStateException.class, "Encountered records that belong to already closed files",
         () -> {
-          try {
-            PositionDelete<T> positionDelete = positionDelete("file-1.parquet", 3L, null);
-            writer.write(positionDelete, unpartitionedSpec, null);
-          } catch (IOException e) {
-            throw new UncheckedIOException(e);
-          }
+          PositionDelete<T> positionDelete = positionDelete("file-1.parquet", 3L, null);
+          writer.write(positionDelete, unpartitionedSpec, null);
         });
 
     writer.close();