You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/01/13 20:18:07 UTC

[iceberg] branch master updated: Parquet: Lazily initialize the underlying writer in ParquetWriter (#3780)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a6a60d  Parquet: Lazily initialize the underlying writer in ParquetWriter (#3780)
1a6a60d is described below

commit 1a6a60db519b20d41a3f1adfe02b4108853b3861
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Thu Jan 13 12:17:55 2022 -0800

    Parquet: Lazily initialize the underlying writer in ParquetWriter (#3780)
    
    Co-authored-by: Tim Steinbach <ti...@shopify.com>
---
 .../java/org/apache/iceberg/io/BaseTaskWriter.java |  7 +-
 .../org/apache/iceberg/io/RollingFileWriter.java   |  6 +-
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    | 12 ++-
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    | 12 ++-
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    | 12 ++-
 .../org/apache/iceberg/parquet/ParquetWriter.java  | 96 ++++++++++++++--------
 6 files changed, 95 insertions(+), 50 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
index d787c7c..2d39acb 100644
--- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.io;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.DataFile;
@@ -281,7 +282,11 @@ public abstract class BaseTaskWriter<T> implements TaskWriter<T> {
         currentWriter.close();
 
         if (currentRows == 0L) {
-          io.deleteFile(currentFile.encryptingOutputFile());
+          try {
+            io.deleteFile(currentFile.encryptingOutputFile());
+          } catch (UncheckedIOException e) {
+            // the file may not have been created, and it isn't worth failing the job to clean up, skip deleting
+          }
         } else {
           complete(currentWriter);
         }
diff --git a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
index 24a6ce3..80a589b 100644
--- a/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
+++ b/core/src/main/java/org/apache/iceberg/io/RollingFileWriter.java
@@ -121,7 +121,11 @@ abstract class RollingFileWriter<T, W extends FileWriter<T, R>, R> implements Fi
       }
 
       if (currentFileRows == 0L) {
-        io.deleteFile(currentFile.encryptingOutputFile());
+        try {
+          io.deleteFile(currentFile.encryptingOutputFile());
+        } catch (UncheckedIOException e) {
+          // the file may not have been created, and it isn't worth failing the job to clean up, skip deleting
+        }
       } else {
         addResult(currentWriter.result());
       }
diff --git a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index e96e929..79009de 100644
--- a/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ b/flink/v1.12/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableTestBase;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -91,6 +92,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
     }
 
     table.updateProperties()
+        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 * 1024))
         .defaultFormat(format)
         .commit();
   }
@@ -217,11 +219,13 @@ public class TestDeltaTaskWriter extends TableTestBase {
     taskWriterFactory.initialize(1, 1);
 
     TaskWriter<RowData> writer = taskWriterFactory.create();
-    writer.write(createUpdateBefore(1, "aaa"));
-    writer.write(createUpdateAfter(1, "bbb"));
+    for (int i = 0; i < 8_000; i += 2) {
+      writer.write(createUpdateBefore(i + 1, "aaa"));
+      writer.write(createUpdateAfter(i + 1, "aaa"));
 
-    writer.write(createUpdateBefore(2, "aaa"));
-    writer.write(createUpdateAfter(2, "bbb"));
+      writer.write(createUpdateBefore(i + 2, "bbb"));
+      writer.write(createUpdateAfter(i + 2, "bbb"));
+    }
 
     // Assert the current data/delete file count.
     List<Path> files = Files.walk(Paths.get(tableDir.getPath(), "data"))
diff --git a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 058058f..280e148 100644
--- a/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ b/flink/v1.13/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableTestBase;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -92,6 +93,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
     }
 
     table.updateProperties()
+        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 * 1024))
         .defaultFormat(format)
         .commit();
   }
@@ -218,11 +220,13 @@ public class TestDeltaTaskWriter extends TableTestBase {
     taskWriterFactory.initialize(1, 1);
 
     TaskWriter<RowData> writer = taskWriterFactory.create();
-    writer.write(createUpdateBefore(1, "aaa"));
-    writer.write(createUpdateAfter(1, "bbb"));
+    for (int i = 0; i < 8_000; i += 2) {
+      writer.write(createUpdateBefore(i + 1, "aaa"));
+      writer.write(createUpdateAfter(i + 1, "aaa"));
 
-    writer.write(createUpdateBefore(2, "aaa"));
-    writer.write(createUpdateAfter(2, "bbb"));
+      writer.write(createUpdateBefore(i + 2, "bbb"));
+      writer.write(createUpdateAfter(i + 2, "bbb"));
+    }
 
     // Assert the current data/delete file count.
     List<Path> files = Files.walk(Paths.get(tableDir.getPath(), "data"))
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 058058f..280e148 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableTestBase;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
@@ -92,6 +93,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
     }
 
     table.updateProperties()
+        .set(TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES, String.valueOf(8 * 1024))
         .defaultFormat(format)
         .commit();
   }
@@ -218,11 +220,13 @@ public class TestDeltaTaskWriter extends TableTestBase {
     taskWriterFactory.initialize(1, 1);
 
     TaskWriter<RowData> writer = taskWriterFactory.create();
-    writer.write(createUpdateBefore(1, "aaa"));
-    writer.write(createUpdateAfter(1, "bbb"));
+    for (int i = 0; i < 8_000; i += 2) {
+      writer.write(createUpdateBefore(i + 1, "aaa"));
+      writer.write(createUpdateAfter(i + 1, "aaa"));
 
-    writer.write(createUpdateBefore(2, "aaa"));
-    writer.write(createUpdateAfter(2, "bbb"));
+      writer.write(createUpdateBefore(i + 2, "bbb"));
+      writer.write(createUpdateAfter(i + 2, "bbb"));
+    }
 
     // Assert the current data/delete file count.
     List<Path> files = Files.walk(Paths.get(tableDir.getPath(), "data"))
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index 3900126..81a9c58 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -46,14 +46,16 @@ import org.apache.parquet.schema.MessageType;
 
 class ParquetWriter<T> implements FileAppender<T>, Closeable {
 
-  private static DynConstructors.Ctor<PageWriteStore> pageStoreCtorParquet = DynConstructors
-          .builder(PageWriteStore.class)
-          .hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore",
-              CodecFactory.BytesCompressor.class,
-              MessageType.class,
-              ByteBufferAllocator.class,
-              int.class)
-          .build();
+  private static final Metrics EMPTY_METRICS = new Metrics(0L, null, null, null, null);
+
+  private static final DynConstructors.Ctor<PageWriteStore> pageStoreCtorParquet = DynConstructors
+      .builder(PageWriteStore.class)
+      .hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore",
+          CodecFactory.BytesCompressor.class,
+          MessageType.class,
+          ByteBufferAllocator.class,
+          int.class)
+      .build();
 
   private static final DynMethods.UnboundMethod flushToWriter = DynMethods
       .builder("flushToFileWriter")
@@ -66,16 +68,18 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
   private final CodecFactory.BytesCompressor compressor;
   private final MessageType parquetSchema;
   private final ParquetValueWriter<T> model;
-  private final ParquetFileWriter writer;
   private final MetricsConfig metricsConfig;
   private final int columnIndexTruncateLength;
+  private final ParquetFileWriter.Mode writeMode;
+  private final OutputFile output;
+  private final Configuration conf;
 
   private DynMethods.BoundMethod flushPageStoreToWriter;
   private ColumnWriteStore writeStore;
-  private long nextRowGroupSize = 0;
   private long recordCount = 0;
   private long nextCheckRecordCount = 10;
   private boolean closed;
+  private ParquetFileWriter writer;
 
   private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
   private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
@@ -96,21 +100,28 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
     this.model = (ParquetValueWriter<T>) createWriterFunc.apply(parquetSchema);
     this.metricsConfig = metricsConfig;
     this.columnIndexTruncateLength = conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+    this.writeMode = writeMode;
+    this.output = output;
+    this.conf = conf;
 
-    try {
-      this.writer = new ParquetFileWriter(ParquetIO.file(output, conf), parquetSchema,
-         writeMode, rowGroupSize, 0);
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to create Parquet file");
-    }
+    startRowGroup();
+  }
 
-    try {
-      writer.start();
-    } catch (IOException e) {
-      throw new RuntimeIOException(e, "Failed to start Parquet file writer");
-    }
+  private void ensureWriterInitialized() {
+    if (writer == null) {
+      try {
+        this.writer = new ParquetFileWriter(
+            ParquetIO.file(output, conf), parquetSchema, writeMode, targetRowGroupSize, 0);
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to create Parquet file");
+      }
 
-    startRowGroup();
+      try {
+        writer.start();
+      } catch (IOException e) {
+        throw new RuntimeIOException(e, "Failed to start Parquet file writer");
+      }
+    }
   }
 
   @Override
@@ -123,7 +134,11 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
 
   @Override
   public Metrics metrics() {
-    return ParquetUtil.footerMetrics(writer.getFooter(), model.metrics(), metricsConfig);
+    Preconditions.checkState(closed, "Cannot return metrics for unclosed writer");
+    if (writer != null) {
+      return ParquetUtil.footerMetrics(writer.getFooter(), model.metrics(), metricsConfig);
+    }
+    return EMPTY_METRICS;
   }
 
   /**
@@ -138,11 +153,19 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
   @Override
   public long length() {
     try {
-      if (closed) {
-        return writer.getPos();
-      } else {
-        return writer.getPos() + (writeStore.isColumnFlushNeeded() ? writeStore.getBufferedSize() : 0);
+      long length = 0L;
+
+      if (writer != null) {
+        length += writer.getPos();
       }
+
+      if (!closed && recordCount > 0) {
+        // recordCount > 0 when there are records in the write store that have not been flushed to the Parquet file
+        length += writeStore.getBufferedSize();
+      }
+
+      return length;
+
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to get file length");
     }
@@ -150,7 +173,10 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
 
   @Override
   public List<Long> splitOffsets() {
-    return ParquetUtil.getSplitOffsets(writer.getFooter());
+    if (writer != null) {
+      return ParquetUtil.getSplitOffsets(writer.getFooter());
+    }
+    return null;
   }
 
   private void checkSize() {
@@ -158,10 +184,10 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
       long bufferedSize = writeStore.getBufferedSize();
       double avgRecordSize = ((double) bufferedSize) / recordCount;
 
-      if (bufferedSize > (nextRowGroupSize - 2 * avgRecordSize)) {
+      if (bufferedSize > (targetRowGroupSize - 2 * avgRecordSize)) {
         flushRowGroup(false);
       } else {
-        long remainingSpace = nextRowGroupSize - bufferedSize;
+        long remainingSpace = targetRowGroupSize - bufferedSize;
         long remainingRecords = (long) (remainingSpace / avgRecordSize);
         this.nextCheckRecordCount = recordCount + Math.min(Math.max(remainingRecords / 2, 100), 10000);
       }
@@ -171,6 +197,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
   private void flushRowGroup(boolean finished) {
     try {
       if (recordCount > 0) {
+        ensureWriterInitialized();
         writer.startBlock(recordCount);
         writeStore.flush();
         flushPageStoreToWriter.invoke(writer);
@@ -187,11 +214,6 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
   private void startRowGroup() {
     Preconditions.checkState(!closed, "Writer is closed");
 
-    try {
-      this.nextRowGroupSize = Math.min(writer.getNextRowGroupSize(), targetRowGroupSize);
-    } catch (IOException e) {
-      throw new RuntimeIOException(e);
-    }
     this.nextCheckRecordCount = Math.min(Math.max(recordCount / 2, 100), 10000);
     this.recordCount = 0;
 
@@ -210,7 +232,9 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
       this.closed = true;
       flushRowGroup(true);
       writeStore.close();
-      writer.end(metadata);
+      if (writer != null) {
+        writer.end(metadata);
+      }
     }
   }
 }