You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/01/05 22:07:06 UTC
[iceberg] branch master updated: Parquet: Return correct length
after writer is closed (#2001)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 78495a2 Parquet: Return correct length after writer is closed (#2001)
78495a2 is described below
commit 78495a20bfc207e4f9c3f6cae2b9be14923f3562
Author: Roman Pleshkov <rp...@mediamath.com>
AuthorDate: Tue Jan 5 17:06:54 2021 -0500
Parquet: Return correct length after writer is closed (#2001)
---
.../org/apache/iceberg/parquet/ParquetWriter.java | 28 +++++++++++--
.../iceberg/parquet/ParquetWritingTestUtils.java | 48 ++++++++++++++++------
.../org/apache/iceberg/parquet/TestParquet.java | 46 ++++++++++++++++++---
3 files changed, 99 insertions(+), 23 deletions(-)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
index 8b5d000..3900126 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java
@@ -33,6 +33,7 @@ import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.column.ColumnWriteStore;
@@ -74,6 +75,7 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
private long nextRowGroupSize = 0;
private long recordCount = 0;
private long nextCheckRecordCount = 10;
+ private boolean closed;
private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
@@ -124,10 +126,23 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
return ParquetUtil.footerMetrics(writer.getFooter(), model.metrics(), metricsConfig);
}
+ /**
+ * Returns the approximate length of the output file produced by this writer.
+ * <p>
+ * Prior to calling {@link ParquetWriter#close}, the result is approximate. After calling close, the length is
+ * exact.
+ *
+ * @return the approximate length of the output file produced by this writer or the exact length if this writer is
+ * closed.
+ */
@Override
public long length() {
try {
- return writer.getPos() + (writeStore.isColumnFlushNeeded() ? writeStore.getBufferedSize() : 0);
+ if (closed) {
+ return writer.getPos();
+ } else {
+ return writer.getPos() + (writeStore.isColumnFlushNeeded() ? writeStore.getBufferedSize() : 0);
+ }
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to get file length");
}
@@ -170,6 +185,8 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
}
private void startRowGroup() {
+ Preconditions.checkState(!closed, "Writer is closed");
+
try {
this.nextRowGroupSize = Math.min(writer.getNextRowGroupSize(), targetRowGroupSize);
} catch (IOException e) {
@@ -189,8 +206,11 @@ class ParquetWriter<T> implements FileAppender<T>, Closeable {
@Override
public void close() throws IOException {
- flushRowGroup(true);
- writeStore.close();
- writer.end(metadata);
+ if (!closed) {
+ this.closed = true;
+ flushRowGroup(true);
+ writeStore.close();
+ writer.end(metadata);
+ }
}
}
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
index 2a0f6a4..24effa7 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/ParquetWritingTestUtils.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.parquet;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
@@ -53,20 +54,41 @@ class ParquetWritingTestUtils {
}
static File writeRecords(
- TemporaryFolder temp,
- Schema schema, Map<String, String> properties,
- Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
- GenericData.Record... records) throws IOException {
- File tmpFolder = temp.newFolder("parquet");
- String filename = UUID.randomUUID().toString();
- File file = new File(tmpFolder, FileFormat.PARQUET.addExtension(filename));
- try (FileAppender<GenericData.Record> writer = Parquet.write(localOutput(file))
- .schema(schema)
- .setAll(properties)
- .createWriterFunc(createWriterFunc)
- .build()) {
+ TemporaryFolder temp,
+ Schema schema, Map<String, String> properties,
+ Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
+ GenericData.Record... records) throws IOException {
+ File file = createTempFile(temp);
+ write(file, schema, properties, createWriterFunc, records);
+ return file;
+ }
+
+ static long write(File file, Schema schema, Map<String, String> properties,
+ Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
+ GenericData.Record... records) throws IOException {
+
+ long len = 0;
+
+ FileAppender<GenericData.Record> writer = Parquet.write(localOutput(file))
+ .schema(schema)
+ .setAll(properties)
+ .createWriterFunc(createWriterFunc)
+ .build();
+
+ try (Closeable toClose = writer) {
writer.addAll(Lists.newArrayList(records));
+ len = writer.length(); // in deprecated adapter we need to get the length first and then close the writer
}
- return file;
+
+ if (writer instanceof ParquetWriter) {
+ len = writer.length();
+ }
+ return len;
+ }
+
+ static File createTempFile(TemporaryFolder temp) throws IOException {
+ File tmpFolder = temp.newFolder("parquet");
+ String filename = UUID.randomUUID().toString();
+ return new File(tmpFolder, FileFormat.PARQUET.addExtension(filename));
}
}
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
index d9d5eb0..c931e26 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.parquet;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.apache.avro.generic.GenericData;
@@ -29,6 +30,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.util.Pair;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.MessageType;
import org.junit.Assert;
@@ -38,7 +40,8 @@ import org.junit.rules.TemporaryFolder;
import static org.apache.iceberg.Files.localInput;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
-import static org.apache.iceberg.parquet.ParquetWritingTestUtils.writeRecords;
+import static org.apache.iceberg.parquet.ParquetWritingTestUtils.createTempFile;
+import static org.apache.iceberg.parquet.ParquetWritingTestUtils.write;
import static org.apache.iceberg.types.Types.NestedField.optional;
public class TestParquet {
@@ -49,7 +52,7 @@ public class TestParquet {
@Test
public void testRowGroupSizeConfigurable() throws IOException {
// Without an explicit writer function
- File parquetFile = generateFileWithTwoRowGroups(null);
+ File parquetFile = generateFileWithTwoRowGroups(null).first();
try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
Assert.assertEquals(2, reader.getRowGroups().size());
@@ -58,14 +61,43 @@ public class TestParquet {
@Test
public void testRowGroupSizeConfigurableWithWriter() throws IOException {
- File parquetFile = generateFileWithTwoRowGroups(ParquetAvroWriter::buildWriter);
+ File parquetFile = generateFileWithTwoRowGroups(ParquetAvroWriter::buildWriter).first();
try (ParquetFileReader reader = ParquetFileReader.open(ParquetIO.file(localInput(parquetFile)))) {
Assert.assertEquals(2, reader.getRowGroups().size());
}
}
- private File generateFileWithTwoRowGroups(Function<MessageType, ParquetValueWriter<?>> createWriterFunc)
+ @Test
+ public void testNumberOfBytesWritten() throws IOException {
+ Schema schema = new Schema(
+ optional(1, "intCol", IntegerType.get())
+ );
+
+ // this value was specifically derived to reproduce iss1980
+ // record count grow factor is 10000 (hardcoded)
+ // total 10 checkSize method calls
+ // for the 10th time (the last call of the checkSize method) nextCheckRecordCount == 100100
+ // 100099 + 1 >= 100100
+ int recordCount = 100099;
+ File file = createTempFile(temp);
+
+ List<GenericData.Record> records = new ArrayList<>(recordCount);
+ org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
+ for (int i = 1; i <= recordCount; i++) {
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put("intCol", i);
+ records.add(record);
+ }
+
+ long actualSize = write(file, schema, Collections.emptyMap(), ParquetAvroWriter::buildWriter,
+ records.toArray(new GenericData.Record[]{}));
+
+ long expectedSize = ParquetIO.file(localInput(file)).getLength();
+ Assert.assertEquals(expectedSize, actualSize);
+ }
+
+ private Pair<File, Long> generateFileWithTwoRowGroups(Function<MessageType, ParquetValueWriter<?>> createWriterFunc)
throws IOException {
Schema schema = new Schema(
optional(1, "intCol", IntegerType.get())
@@ -85,12 +117,14 @@ public class TestParquet {
// Force multiple row groups by making the byte size very small
// Note there'a also minimumRowGroupRecordCount which cannot be configured so we have to write
// at least that many records for a new row group to occur
- return writeRecords(temp,
+ File file = createTempFile(temp);
+ long size = write(file,
schema,
ImmutableMap.of(
PARQUET_ROW_GROUP_SIZE_BYTES,
Integer.toString(minimumRowGroupRecordCount * Integer.BYTES)),
createWriterFunc,
- records.toArray(new GenericData.Record[] {}));
+ records.toArray(new GenericData.Record[]{}));
+ return Pair.of(file, size);
}
}