You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/02/12 11:55:18 UTC
[parquet-mr] branch master updated: PARQUET-1790: Add Api for
writing DataPageV2 to ParquetFileWriter class (#756)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 57f6b46 PARQUET-1790: Add Api for writing DataPageV2 to ParquetFileWriter class (#756)
57f6b46 is described below
commit 57f6b46dde5926cc602c583940dd6424f10b9a17
Author: Brian Mwambazi <br...@users.noreply.github.com>
AuthorDate: Wed Feb 12 12:55:07 2020 +0100
PARQUET-1790: Add Api for writing DataPageV2 to ParquetFileWriter class (#756)
---
.../apache/parquet/hadoop/ParquetFileWriter.java | 74 ++++++++++++++++++-
.../parquet/hadoop/TestParquetFileWriter.java | 83 ++++++++++++++++++++++
2 files changed, 156 insertions(+), 1 deletion(-)
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 39a75bc..d1cdee7 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -575,6 +575,71 @@ public class ParquetFileWriter {
}
/**
+ * Writes a single v2 data page
+ * @param rowCount count of rows
+ * @param nullCount count of nulls
+ * @param valueCount count of values
+ * @param repetitionLevels repetition level bytes
+ * @param definitionLevels definition level bytes
+ * @param dataEncoding encoding for data
+ * @param compressedData compressed data bytes
+ * @param uncompressedDataSize the size of uncompressed data
+ * @param statistics the statistics of the page
+ * @throws IOException if any I/O error occurs during writing the file
+ */
+ public void writeDataPageV2(int rowCount, int nullCount, int valueCount,
+ BytesInput repetitionLevels,
+ BytesInput definitionLevels,
+ Encoding dataEncoding,
+ BytesInput compressedData,
+ int uncompressedDataSize,
+ Statistics<?> statistics) throws IOException {
+ state = state.write();
+ int rlByteLength = toIntWithCheck(repetitionLevels.size());
+ int dlByteLength = toIntWithCheck(definitionLevels.size());
+
+ int compressedSize = toIntWithCheck(
+ compressedData.size() + repetitionLevels.size() + definitionLevels.size()
+ );
+
+ int uncompressedSize = toIntWithCheck(
+ uncompressedDataSize + repetitionLevels.size() + definitionLevels.size()
+ );
+
+ long beforeHeader = out.getPos();
+ if (firstPageOffset == -1) {
+ firstPageOffset = beforeHeader;
+ }
+
+ metadataConverter.writeDataPageV2Header(
+ uncompressedSize, compressedSize,
+ valueCount, nullCount, rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ out);
+
+ long headersSize = out.getPos() - beforeHeader;
+ this.uncompressedLength += uncompressedSize + headersSize;
+ this.compressedLength += compressedSize + headersSize;
+
+ if (currentStatistics == null) {
+ currentStatistics = statistics.copy();
+ } else {
+ currentStatistics.mergeStatistics(statistics);
+ }
+
+ columnIndexBuilder.add(statistics);
+ currentEncodings.add(dataEncoding);
+ encodingStatsBuilder.addDataEncoding(dataEncoding);
+
+ BytesInput.concat(repetitionLevels, definitionLevels, compressedData)
+ .writeAllTo(out);
+
+ offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount);
+ }
+
+ /**
* Writes a column chunk at once
* @param descriptor the descriptor of the column
* @param valueCount the value count in this column
@@ -886,6 +951,13 @@ public class ParquetFileWriter {
}
}
+ private int toIntWithCheck(long size) {
+ if ((int)size != size) {
+ throw new ParquetEncodingException("Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + size);
+ }
+ return (int)size;
+ }
+
private static void serializeOffsetIndexes(
List<List<OffsetIndex>> offsetIndexes,
List<BlockMetaData> blocks,
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 8763cac..6539719 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.junit.Assume;
import org.junit.Rule;
@@ -220,6 +221,69 @@ public class TestParquetFileWriter {
}
@Test
+ public void testWriteReadDataPageV2() throws Exception {
+ File testFile = temp.newFile();
+ testFile.delete();
+
+ Path path = new Path(testFile.toURI());
+ Configuration configuration = new Configuration();
+
+ ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
+ w.start();
+ w.startBlock(14);
+
+ BytesInput repLevels = BytesInput.fromInt(2);
+ BytesInput defLevels = BytesInput.fromInt(1);
+ BytesInput data = BytesInput.fromInt(3);
+ BytesInput data2 = BytesInput.fromInt(10);
+
+ org.apache.parquet.column.statistics.Statistics<?> statsC1P1 = createStatistics("s", "z", C1);
+ org.apache.parquet.column.statistics.Statistics<?> statsC1P2 = createStatistics("b", "d", C1);
+
+ w.startColumn(C1, 6, CODEC);
+ long c1Starts = w.getPos();
+ w.writeDataPageV2(4, 1, 3, repLevels, defLevels, PLAIN, data, 4, statsC1P1);
+ w.writeDataPageV2(3, 0, 3, repLevels, defLevels, PLAIN, data, 4, statsC1P2);
+ w.endColumn();
+ long c1Ends = w.getPos();
+
+ w.startColumn(C2, 5, CODEC);
+ long c2Starts = w.getPos();
+ w.writeDataPageV2(5, 2, 3, repLevels, defLevels, PLAIN, data2, 4, EMPTY_STATS);
+ w.writeDataPageV2(2, 0, 2, repLevels, defLevels, PLAIN, data2, 4, EMPTY_STATS);
+ w.endColumn();
+ long c2Ends = w.getPos();
+
+ w.endBlock();
+ w.end(new HashMap<>());
+
+ ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
+ assertEquals("footer: "+ readFooter, 1, readFooter.getBlocks().size());
+ assertEquals(c1Ends - c1Starts, readFooter.getBlocks().get(0).getColumns().get(0).getTotalSize());
+ assertEquals(c2Ends - c2Starts, readFooter.getBlocks().get(0).getColumns().get(1).getTotalSize());
+ assertEquals(c2Ends - c1Starts, readFooter.getBlocks().get(0).getTotalByteSize());
+
+ //check for stats
+ org.apache.parquet.column.statistics.Statistics<?> expectedStats = createStatistics("b", "z", C1);
+ TestUtils.assertStatsValuesEqual(expectedStats, readFooter.getBlocks().get(0).getColumns().get(0).getStatistics());
+
+ HashSet<Encoding> expectedEncoding = new HashSet<Encoding>();
+ expectedEncoding.add(PLAIN);
+ assertEquals(expectedEncoding, readFooter.getBlocks().get(0).getColumns().get(0).getEncodings());
+
+ ParquetFileReader reader = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+ readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+
+ PageReadStore pages = reader.readNextRowGroup();
+ assertEquals(14, pages.getRowCount());
+ validateV2Page(SCHEMA, pages, PATH1, 3, 4, 1, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12);
+ validateV2Page(SCHEMA, pages, PATH1, 3, 3, 0, repLevels.toByteArray(), defLevels.toByteArray(),data.toByteArray(), 12);
+ validateV2Page(SCHEMA, pages, PATH2, 3, 5, 2, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12);
+ validateV2Page(SCHEMA, pages, PATH2, 2, 2, 0, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12);
+ assertNull(reader.readNextRowGroup());
+ }
+
+ @Test
public void testAlignmentWithPadding() throws Exception {
File testFile = temp.newFile();
@@ -680,6 +744,25 @@ public class TestParquetFileWriter {
w.end(extraMetaData);
}
+ private void validateV2Page(MessageType schema, PageReadStore pages, String[] path, int values, int rows, int nullCount,
+ byte[] repetition, byte[] definition, byte[] data, int uncompressedSize) throws IOException {
+ PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
+ DataPageV2 page = (DataPageV2)pageReader.readPage();
+ assertEquals(values, page.getValueCount());
+ assertEquals(rows, page.getRowCount());
+ assertEquals(nullCount, page.getNullCount());
+ assertEquals(uncompressedSize, page.getUncompressedSize());
+ assertArrayEquals(repetition, page.getRepetitionLevels().toByteArray());
+ assertArrayEquals(definition, page.getDefinitionLevels().toByteArray());
+ assertArrayEquals(data, page.getData().toByteArray());
+ }
+
+ private org.apache.parquet.column.statistics.Statistics<?> createStatistics(String min, String max, ColumnDescriptor col) {
+ return org.apache.parquet.column.statistics.Statistics .getBuilderForReading(col.getPrimitiveType())
+ .withMin(Binary.fromString(min).getBytes()).withMax(Binary.fromString(max).getBytes()).withNumNulls(0)
+ .build();
+ }
+
private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes) throws IOException {
PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
DataPage page = pageReader.readPage();