You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/01/13 20:18:07 UTC
[iceberg] branch master updated: Parquet: Lazily initialize the underlying writer in ParquetWriter (#3780)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 1a6a60d Parquet: Lazily initialize the underlying writer in ParquetWriter (#3780)
1a6a60d is described below
commit 1a6a60db519b20d41a3f1adfe02b4108853b3861
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu Jan 13 12:17:55 2022 -0800
Parquet: Lazily initialize the underlying writer in ParquetWriter (#3780)
Co-authored-by: Tim Steinbach <ti...@shopify.com>
---
.../java/org/apache/iceberg/io/BaseTaskWriter.java | 7 +-
.../org/apache/iceberg/io/RollingFileWriter.java | 6 +-
.../iceberg/flink/sink/TestDeltaTaskWriter.java | 12 ++-
.../iceberg/flink/sink/TestDeltaTaskWriter.java | 12 ++-
.../iceberg/flink/sink/TestDeltaTaskWriter.java | 12 ++-
.../org/apache/iceberg/parquet/ParquetWriter.java | 96 ++++++++++++++--------
6 files changed, 95 insertions(+), 50 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index d787c7c..2d39acb 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.io;
import java.io.Closeable;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.DataFile;
@@ -281,7 +282,11 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
currentWriter.close();
if (currentRows == 0L) {
- io.deleteFile(currentFile.encryptingOutputFile());
+ try {
+ io.deleteFile(currentFile.encryptingOutputFile());
+ } catch (UncheckedIOException e) {
+ // the file may not have been created, and it isn't worth failing the job to clean up, skip deleting
+ }
} else {
complete(currentWriter);
}
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
index 24a6ce3..80a589b 100644
--- a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
@@ -121,7 +121,11 @@ abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements Fi
}
if (currentFileRows == 0L) {
- io.deleteFile(currentFile.encryptingOutputFile());
+ try {
+ io.deleteFile(currentFile.encryptingOutputFile());
+ } catch (UncheckedIOException e) {
+ // the file may not have been created, and it isn't worth failing the job to clean up, skip deleting
+ }
} else {
addResult(currentWriter.result());
}
diff --git a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index e96e929..79009de 100644
--- a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -91,6 +92,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
}
table.updateProperties()
+ .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 * 1024))
.defaultFormat(format)
.commit();
}
@@ -217,11 +219,13 @@ public class TestDeltaTaskWriter extends TableTestBase {
taskWriterFactory.initialize(1, 1);
TaskWriter<RowData> writer = taskWriterFactory.create();
- writer.write(createUpdateBefore(1, "aaa"));
- writer.write(createUpdateAfter(1, "bbb"));
+ for (int i = 0; i < 8_000; i += 2) {
+ writer.write(createUpdateBefore(i + 1, "aaa"));
+ writer.write(createUpdateAfter(i + 1, "aaa"));
- writer.write(createUpdateBefore(2, "aaa"));
- writer.write(createUpdateAfter(2, "bbb"));
+ writer.write(createUpdateBefore(i + 2, "bbb"));
+ writer.write(createUpdateAfter(i + 2, "bbb"));
+ }
// Assert the current data/delete file count.
List<Path> files = Files.walk(Paths.get(tableDir.getPath(), "data"))
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 058058f..280e148 100644
--- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -92,6 +93,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
}
table.updateProperties()
+ .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 * 1024))
.defaultFormat(format)
.commit();
}
@@ -218,11 +220,13 @@ public class TestDeltaTaskWriter extends TableTestBase {
taskWriterFactory.initialize(1, 1);
TaskWriter<RowData> writer = taskWriterFactory.create();
- writer.write(createUpdateBefore(1, "aaa"));
- writer.write(createUpdateAfter(1, "bbb"));
+ for (int i = 0; i < 8_000; i += 2) {
+ writer.write(createUpdateBefore(i + 1, "aaa"));
+ writer.write(createUpdateAfter(i + 1, "aaa"));
- writer.write(createUpdateBefore(2, "aaa"));
- writer.write(createUpdateAfter(2, "bbb"));
+ writer.write(createUpdateBefore(i + 2, "bbb"));
+ writer.write(createUpdateAfter(i + 2, "bbb"));
+ }
// Assert the current data/delete file count.
List<Path> files = Files.walk(Paths.get(tableDir.getPath(), "data"))
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 058058f..280e148 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -92,6 +93,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
}
table.updateProperties()
+ .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 * 1024))
.defaultFormat(format)
.commit();
}
@@ -218,11 +220,13 @@ public class TestDeltaTaskWriter extends TableTestBase {
taskWriterFactory.initialize(1, 1);
TaskWriter<RowData> writer = taskWriterFactory.create();
- writer.write(createUpdateBefore(1, "aaa"));
- writer.write(createUpdateAfter(1, "bbb"));
+ for (int i = 0; i < 8_000; i += 2) {
+ writer.write(createUpdateBefore(i + 1, "aaa"));
+ writer.write(createUpdateAfter(i + 1, "aaa"));
- writer.write(createUpdateBefore(2, "aaa"));
- writer.write(createUpdateAfter(2, "bbb"));
+ writer.write(createUpdateBefore(i + 2, "bbb"));
+ writer.write(createUpdateAfter(i + 2, "bbb"));
+ }
// Assert the current data/delete file count.
List<Path> files = Files.walk(Paths.get(tableDir.getPath(), "data"))
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index 3900126..81a9c58 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -46,14 +46,16 @@ import org.apache.parquet.schema.MessageType;
class ParquetWriter<T> implements FileAppender<T>, Closeable {
- private static DynConstructors.Ctor<PageWriteStore> pageStoreCtorParquet = DynConstructors
- .builder(PageWriteStore.class)
- .hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore",
- CodecFactory.BytesCompressor.class,
- MessageType.class,
- ByteBufferAllocator.class,
- int.class)
- .build();
+ private static final Metrics EMPTY_METRICS = new Metrics(0L, null, null, null, null);
+
+ private static final DynConstructors.Ctor<PageWriteStore> pageStoreCtorParquet = DynConstructors
+ .builder(PageWriteStore.class)
+ .hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore",
+ CodecFactory.BytesCompressor.class,
+ MessageType.class,
+ ByteBufferAllocator.class,
+ int.class)
+ .build();
private static final DynMethods.UnboundMethod flushToWriter = DynMethods
.builder("flushToFileWriter")
@@ -66,16 +68,18 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private final CodecFactory.BytesCompressor compressor;
private final MessageType parquetSchema;
private final ParquetValueWriter<T> model;
- private final ParquetFileWriter writer;
private final MetricsConfig metricsConfig;
private final int columnIndexTruncateLength;
+ private final ParquetFileWriter.Mode writeMode;
+ private final OutputFile output;
+ private final Configuration conf;
private DynMethods.BoundMethod flushPageStoreToWriter;
private ColumnWriteStore writeStore;
- private long nextRowGroupSize = 0;
private long recordCount = 0;
private long nextCheckRecordCount = 10;
private boolean closed;
+ private ParquetFileWriter writer;
private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
@@ -96,21 +100,28 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
this.model = (ParquetValueWriter<T>) createWriterFunc.apply(parquetSchema);
this.metricsConfig = metricsConfig;
this.columnIndexTruncateLength = conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+ this.writeMode = writeMode;
+ this.output = output;
+ this.conf = conf;
- try {
- this.writer = new ParquetFileWriter(ParquetIO.file(output, conf), parquetSchema,
- writeMode, rowGroupSize, 0);
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to create Parquet file");
- }
+ startRowGroup();
+ }
- try {
- writer.start();
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to start Parquet file writer");
- }
+ private void ensureWriterInitialized() {
+ if (writer == null) {
+ try {
+ this.writer = new ParquetFileWriter(
+ ParquetIO.file(output, conf), parquetSchema, writeMode, targetRowGroupSize, 0);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to create Parquet file");
+ }
- startRowGroup();
+ try {
+ writer.start();
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to start Parquet file writer");
+ }
+ }
}
@Override
@@ -123,7 +134,11 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
@Override
public Metrics metrics() {
- return ParquetUtil.footerMetrics(writer.getFooter(), model.metrics(), metricsConfig);
+ Preconditions.checkState(closed, "Cannot return metrics for unclosed writer");
+ if (writer != null) {
+ return ParquetUtil.footerMetrics(writer.getFooter(), model.metrics(), metricsConfig);
+ }
+ return EMPTY_METRICS;
}
/**
@@ -138,11 +153,19 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
@Override
public long length() {
try {
- if (closed) {
- return writer.getPos();
- } else {
- return writer.getPos() + (writeStore.isColumnFlushNeeded() ? writeStore.getBufferedSize() : 0);
+ long length = 0L;
+
+ if (writer != null) {
+ length += writer.getPos();
}
+
+ if (!closed && recordCount > 0) {
+ // recordCount > 0 when there are records in the write store that have not been flushed to the Parquet file
+ length += writeStore.getBufferedSize();
+ }
+
+ return length;
+
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to get file length");
}
@@ -150,7 +173,10 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
@Override
public List<Long> splitOffsets() {
- return ParquetUtil.getSplitOffsets(writer.getFooter());
+ if (writer != null) {
+ return ParquetUtil.getSplitOffsets(writer.getFooter());
+ }
+ return null;
}
private void checkSize() {
@@ -158,10 +184,10 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
long bufferedSize = writeStore.getBufferedSize();
double avgRecordSize = ((double) bufferedSize) / recordCount;
- if (bufferedSize > (nextRowGroupSize - 2 * avgRecordSize)) {
+ if (bufferedSize > (targetRowGroupSize - 2 * avgRecordSize)) {
flushRowGroup(false);
} else {
- long remainingSpace = nextRowGroupSize - bufferedSize;
+ long remainingSpace = targetRowGroupSize - bufferedSize;
long remainingRecords = (long) (remainingSpace / avgRecordSize);
this.nextCheckRecordCount = recordCount + Math.min(Math.max(remainingRecords / 2, 100), 10000);
}
@@ -171,6 +197,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private void flushRowGroup(boolean finished) {
try {
if (recordCount > 0) {
+ ensureWriterInitialized();
writer.startBlock(recordCount);
writeStore.flush();
flushPageStoreToWriter.invoke(writer);
@@ -187,11 +214,6 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private void startRowGroup() {
Preconditions.checkState(!closed, "Writer is closed");
- try {
- this.nextRowGroupSize = Math.min(writer.getNextRowGroupSize(), targetRowGroupSize);
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
this.nextCheckRecordCount = Math.min(Math.max(recordCount / 2, 100), 10000);
this.recordCount = 0;
@@ -210,7 +232,9 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
this.closed = true;
flushRowGroup(true);
writeStore.close();
- writer.end(metadata);
+ if (writer != null) {
+ writer.end(metadata);
+ }
}
}
}