You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/09/24 02:59:09 UTC
[iceberg] branch master updated: Core: Avoid throwing IOException
in new write methods (#3170)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 31efe35 Core: Avoid throwing IOException in new write methods (#3170)
31efe35 is described below
commit 31efe350218a52a0db14993284425109f0529b61
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu Sep 23 19:58:57 2021 -0700
Core: Avoid throwing IOException in new write methods (#3170)
---
.../iceberg/deletes/EqualityDeleteWriter.java | 2 +-
.../iceberg/deletes/PositionDeleteWriter.java | 2 +-
.../org/apache/iceberg/io/ClusteredWriter.java | 11 ++++--
.../java/org/apache/iceberg/io/FanoutWriter.java | 2 +-
.../java/org/apache/iceberg/io/FileWriter.java | 7 ++--
.../org/apache/iceberg/io/PartitioningWriter.java | 4 +--
.../org/apache/iceberg/io/RollingFileWriter.java | 11 ++++--
.../apache/iceberg/io/SortedPosDeleteWriter.java | 2 +-
.../apache/iceberg/io/TestPartitioningWriters.java | 41 ++++------------------
9 files changed, 30 insertions(+), 52 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java
index 7e0a826..c914ad2 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/EqualityDeleteWriter.java
@@ -58,7 +58,7 @@ public class EqualityDeleteWriter<T> implements FileWriter<T, DeleteWriteResult>
}
@Override
- public void write(T row) throws IOException {
+ public void write(T row) {
appender.add(row);
}
diff --git a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
index 7532420..a7dff07 100644
--- a/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/deletes/PositionDeleteWriter.java
@@ -57,7 +57,7 @@ public class PositionDeleteWriter<T> implements FileWriter<PositionDelete<T>, De
}
@Override
- public void write(PositionDelete<T> positionDelete) throws IOException {
+ public void write(PositionDelete<T> positionDelete) {
referencedDataFiles.add(positionDelete.path());
appender.add(positionDelete);
}
diff --git a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
index 8729fd1..61a6f9f 100644
--- a/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.io;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.Set;
import org.apache.iceberg.PartitionSpec;
@@ -63,7 +64,7 @@ abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
protected abstract R aggregatedResult();
@Override
- public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+ public void write(T row, PartitionSpec spec, StructLike partition) {
if (!spec.equals(currentSpec)) {
if (currentSpec != null) {
closeCurrentWriter();
@@ -110,9 +111,13 @@ abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
}
}
- private void closeCurrentWriter() throws IOException {
+ private void closeCurrentWriter() {
if (currentWriter != null) {
- currentWriter.close();
+ try {
+ currentWriter.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close current writer", e);
+ }
addResult(currentWriter.result());
diff --git a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
index 122a25d..631fc0a 100644
--- a/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/FanoutWriter.java
@@ -49,7 +49,7 @@ abstract class FanoutWriter<T, R> implements PartitioningWriter<T, R> {
protected abstract R aggregatedResult();
@Override
- public void write(T row, PartitionSpec spec, StructLike partition) throws IOException {
+ public void write(T row, PartitionSpec spec, StructLike partition) {
FileWriter<T, R> writer = writer(spec, partition);
writer.write(row);
}
diff --git a/core/src/main/java/org/apache/iceberg/io/FileWriter.java b/core/src/main/java/org/apache/iceberg/io/FileWriter.java
index cbaca4f..6f0c4ab 100644
--- a/core/src/main/java/org/apache/iceberg/io/FileWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/FileWriter.java
@@ -20,7 +20,6 @@
package org.apache.iceberg.io;
import java.io.Closeable;
-import java.io.IOException;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
@@ -41,9 +40,8 @@ public interface FileWriter<T, R> extends Closeable {
* Writes rows to a predefined spec/partition.
*
* @param rows data or delete records
- * @throws IOException in case of an error during the write process
*/
- default void write(Iterable<T> rows) throws IOException {
+ default void write(Iterable<T> rows) {
for (T row : rows) {
write(row);
}
@@ -53,9 +51,8 @@ public interface FileWriter<T, R> extends Closeable {
* Writes a row to a predefined spec/partition.
*
* @param row a data or delete record
- * @throws IOException in case of an error during the write process
*/
- void write(T row) throws IOException;
+ void write(T row);
/**
* Returns the number of bytes that were currently written by this writer.
diff --git a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java
index 329e68c..4afdd21 100644
--- a/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/PartitioningWriter.java
@@ -20,7 +20,6 @@
package org.apache.iceberg.io;
import java.io.Closeable;
-import java.io.IOException;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
@@ -46,9 +45,8 @@ public interface PartitioningWriter<T, R> extends Closeable {
* @param row a data or delete record
* @param spec a partition spec
* @param partition a partition or null if the spec is unpartitioned
- * @throws IOException in case of an error during the write process
*/
- void write(T row, PartitionSpec spec, StructLike partition) throws IOException;
+ void write(T row, PartitionSpec spec, StructLike partition);
/**
* Returns a result that contains information about written {@link DataFile}s or {@link DeleteFile}s.
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
index ed35933..24a6ce3 100644
--- a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.io;
import java.io.IOException;
+import java.io.UncheckedIOException;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptedOutputFile;
@@ -81,7 +82,7 @@ abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements Fi
}
@Override
- public void write(T row) throws IOException {
+ public void write(T row) {
currentWriter.write(row);
currentFileRows++;
@@ -111,9 +112,13 @@ abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements Fi
}
}
- private void closeCurrentWriter() throws IOException {
+ private void closeCurrentWriter() {
if (currentWriter != null) {
- currentWriter.close();
+ try {
+ currentWriter.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to close current writer", e);
+ }
if (currentFileRows == 0L) {
io.deleteFile(currentFile.encryptingOutputFile());
diff --git a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
index f1c9e52..36a0313 100644
--- a/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/SortedPosDeleteWriter.java
@@ -79,7 +79,7 @@ class SortedPosDeleteWriter<T> implements FileWriter<PositionDelete<T>, DeleteWr
}
@Override
- public void write(PositionDelete<T> payload) throws IOException {
+ public void write(PositionDelete<T> payload) {
delete(payload.path(), payload.pos(), payload.row());
}
diff --git a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
index b01ccc5..054881a 100644
--- a/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
+++ b/data/src/test/java/org/apache/iceberg/io/TestPartitioningWriters.java
@@ -21,7 +21,6 @@ package org.apache.iceberg.io;
import java.io.File;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.util.List;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
@@ -152,13 +151,7 @@ public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
AssertHelpers.assertThrows("Should fail to write out of order partitions",
IllegalStateException.class, "Encountered records that belong to already closed files",
- () -> {
- try {
- writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa"));
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- });
+ () -> writer.write(toRow(6, "aaa"), spec, partitionKey(spec, "aaa")));
writer.close();
}
@@ -303,23 +296,11 @@ public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
AssertHelpers.assertThrows("Should fail to write out of order partitions",
IllegalStateException.class, "Encountered records that belong to already closed files",
- () -> {
- try {
- writer.write(toRow(7, "ccc"), identitySpec, partitionKey(identitySpec, "ccc"));
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- });
+ () -> writer.write(toRow(7, "ccc"), identitySpec, partitionKey(identitySpec, "ccc")));
AssertHelpers.assertThrows("Should fail to write out of order specs",
IllegalStateException.class, "Encountered records that belong to already closed files",
- () -> {
- try {
- writer.write(toRow(7, "aaa"), unpartitionedSpec, null);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
- });
+ () -> writer.write(toRow(7, "aaa"), unpartitionedSpec, null));
writer.close();
}
@@ -459,23 +440,15 @@ public abstract class TestPartitioningWriters<T> extends WriterTestBase<T> {
AssertHelpers.assertThrows("Should fail to write out of order partitions",
IllegalStateException.class, "Encountered records that belong to already closed files",
() -> {
- try {
- PositionDelete<T> positionDelete = positionDelete("file-5.parquet", 1L, null);
- writer.write(positionDelete, identitySpec, partitionKey(identitySpec, "ccc"));
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ PositionDelete<T> positionDelete = positionDelete("file-5.parquet", 1L, null);
+ writer.write(positionDelete, identitySpec, partitionKey(identitySpec, "ccc"));
});
AssertHelpers.assertThrows("Should fail to write out of order specs",
IllegalStateException.class, "Encountered records that belong to already closed files",
() -> {
- try {
- PositionDelete<T> positionDelete = positionDelete("file-1.parquet", 3L, null);
- writer.write(positionDelete, unpartitionedSpec, null);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ PositionDelete<T> positionDelete = positionDelete("file-1.parquet", 3L, null);
+ writer.write(positionDelete, unpartitionedSpec, null);
});
writer.close();