You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/01/05 22:07:06 UTC

[iceberg] branch master updated: Parquet: Return correct length after writer is closed (#2001)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 78495a2  Parquet: Return correct length after writer is closed (#2001)
78495a2 is described below

commit 78495a20bfc207e4f9c3f6cae2b9be14923f3562
Author: Roman Pleshkov <rp...@mediamath.com>
AuthorDate: Tue Jan 5 17:06:54 2021 -0500

    Parquet: Return correct length after writer is closed (#2001)
---
 .../org/apache/iceberg/parquet/ParquetWriter.java  | 28 +++++++++++--
 .../iceberg/parquet/ParquetWritingTestUtils.java   | 48 ++++++++++++++++------
 .../org/apache/iceberg/parquet/TestParquet.java    | 46 ++++++++++++++++++---
 3 files changed, 99 insertions(+), 23 deletions(-)

diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index 8b5d000..3900126 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.column.ColumnWriteStore;
@@ -74,6 +75,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
   private long nextRowGroupSize = 0;
   private long recordCount = 0;
   private long nextCheckRecordCount = 10;
+  private boolean closed;
 
   private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
   private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
@@ -124,10 +126,23 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
     return ParquetUtil.footerMetrics(writer.getFooter(), model.metrics(), metricsConfig);
   }
 
+  /**
+   * Returns the approximate length of the output file produced by this writer.
+   * <p>
+   * Prior to calling {@link ParquetWriter#close}, the result is approximate. After calling close, the length is
+   * exact.
+   *
+   * @return the approximate length of the output file produced by this writer or the exact length if this writer is
+   * closed.
+   */
   @Override
   public long length() {
     try {
-      return writer.getPos() + (writeStore.isColumnFlushNeeded() ? writeStore.getBufferedSize() : 0);
+      if (closed) {
+        return writer.getPos();
+      } else {
+        return writer.getPos() + (writeStore.isColumnFlushNeeded() ? writeStore.getBufferedSize() : 0);
+      }
     } catch (IOException e) {
       throw new RuntimeIOException(e, "Failed to get file length");
     }
@@ -170,6 +185,8 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
   }
 
   private void startRowGroup() {
+    Preconditions.checkState(!closed, "Writer is closed");
+
     try {
       this.nextRowGroupSize = Math.min(writer.getNextRowGroupSize(), targetRowGroupSize);
     } catch (IOException e) {
@@ -189,8 +206,11 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
 
   @Override
   public void close() throws IOException {
-    flushRowGroup(true);
-    writeStore.close();
-    writer.end(metadata);
+    if (!closed) {
+      this.closed = true;
+      flushRowGroup(true);
+      writeStore.close();
+      writer.end(metadata);
+    }
   }
 }
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
index 2a0f6a4..24effa7 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.parquet;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
@@ -53,20 +54,41 @@ class ParquetWritingTestUtils {
   }
 
   static File writeRecords(
-      TemporaryFolder temp,
-      Schema schema, Map<String, String> properties,
-      Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
-      GenericData.Record... records) throws IOException {
-    File tmpFolder = temp.newFolder("parquet");
-    String filename = UUID.randomUUID().toString();
-    File file = new File(tmpFolder, FileFormat.PARQUET.addExtension(filename));
-    try (FileAppender<GenericData.Record> writer = Parquet.write(localOutput(file))
-        .schema(schema)
-        .setAll(properties)
-        .createWriterFunc(createWriterFunc)
-        .build()) {
+          TemporaryFolder temp,
+          Schema schema, Map<String, String> properties,
+          Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
+          GenericData.Record... records) throws IOException {
+    File file = createTempFile(temp);
+    write(file, schema, properties, createWriterFunc, records);
+    return file;
+  }
+
+  static long write(File file, Schema schema, Map<String, String> properties,
+                    Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
+                    GenericData.Record... records) throws IOException {
+
+    long len = 0;
+
+    FileAppender<GenericData.Record> writer = Parquet.write(localOutput(file))
+            .schema(schema)
+            .setAll(properties)
+            .createWriterFunc(createWriterFunc)
+            .build();
+
+    try (Closeable toClose = writer) {
       writer.addAll(Lists.newArrayList(records));
+      len = writer.length(); // in deprecated adapter we need to get the length first and then close the writer
     }
-    return file;
+
+    if (writer instanceof ParquetWriter) {
+      len = writer.length();
+    }
+    return len;
+  }
+
+  static File createTempFile(TemporaryFolder temp) throws IOException {
+    File tmpFolder = temp.newFolder("parquet");
+    String filename = UUID.randomUUID().toString();
+    return new File(tmpFolder, FileFormat.PARQUET.addExtension(filename));
   }
 }
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
index d9d5eb0..c931e26 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.parquet;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.function.Function;
 import org.apache.avro.generic.GenericData;
@@ -29,6 +30,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.util.Pair;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.schema.MessageType;
 import org.junit.Assert;
@@ -38,7 +40,8 @@ import org.junit.rules.TemporaryFolder;
 
 import static org.apache.iceberg.Files.localInput;
 import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
-import static org.apache.iceberg.parquet.ParquetWritingTestUtils.writeRecords;
+import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile;
+import static org.apache.iceberg.parquet.ParquetWritingTestUtils.write;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 
 public class TestParquet {
@@ -49,7 +52,7 @@ public class TestParquet {
   @Test
   public void testRowGroupSizeConfigurable() throws IOException {
     // Without an explicit writer function
-    File parquetFile = generateFileWithTwoRowGroups(null);
+    File parquetFile = generateFileWithTwoRowGroups(null).first();
 
     try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
       Assert.assertEquals(2, reader.getRowGroups().size());
@@ -58,14 +61,43 @@ public class TestParquet {
 
   @Test
   public void testRowGroupSizeConfigurableWithWriter() throws IOException {
-    File parquetFile = generateFileWithTwoRowGroups(ParquetAvroWriter::buildWriter);
+    File parquetFile = generateFileWithTwoRowGroups(ParquetAvroWriter::buildWriter).first();
 
     try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
       Assert.assertEquals(2, reader.getRowGroups().size());
     }
   }
 
-  private File generateFileWithTwoRowGroups(Function<MessageType, ParquetValueWriter<?>> createWriterFunc)
+  @Test
+  public void testNumberOfBytesWritten() throws IOException {
+    Schema schema = new Schema(
+        optional(1, "intCol", IntegerType.get())
+    );
+
+    // this value was specifically derived to reproduce iss1980
+    // record count grow factor is 10000 (hardcoded)
+    // total 10 checkSize method calls
+    // for the 10th time (the last call of the checkSize method) nextCheckRecordCount == 100100
+    // 100099 + 1 >= 100100
+    int recordCount = 100099;
+    File file = createTempFile(temp);
+
+    List<GenericData.Record> records = new ArrayList<>(recordCount);
+    org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
+    for (int i = 1; i <= recordCount; i++) {
+      GenericData.Record record = new GenericData.Record(avroSchema);
+      record.put("intCol", i);
+      records.add(record);
+    }
+
+    long actualSize = write(file, schema, Collections.emptyMap(), ParquetAvroWriter::buildWriter,
+        records.toArray(new GenericData.Record[]{}));
+
+    long expectedSize = ParquetIO.file(localInput(file)).getLength();
+    Assert.assertEquals(expectedSize, actualSize);
+  }
+
+  private Pair<File, Long> generateFileWithTwoRowGroups(Function<MessageType, ParquetValueWriter<?>> createWriterFunc)
       throws IOException {
     Schema schema = new Schema(
         optional(1, "intCol", IntegerType.get())
@@ -85,12 +117,14 @@ public class TestParquet {
     // Force multiple row groups by making the byte size very small
     // Note there'a also minimumRowGroupRecordCount which cannot be configured so we have to write
     // at least that many records for a new row group to occur
-    return writeRecords(temp,
+    File file = createTempFile(temp);
+    long size = write(file,
         schema,
         ImmutableMap.of(
             PARQUET_ROW_GROUP_SIZE_BYTES,
             Integer.toString(minimumRowGroupRecordCount * Integer.BYTES)),
         createWriterFunc,
-        records.toArray(new GenericData.Record[] {}));
+        records.toArray(new GenericData.Record[]{}));
+    return Pair.of(file, size);
   }
 }