You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/02/12 11:55:18 UTC

[parquet-mr] branch master updated: PARQUET-1790: Add Api for writing DataPageV2 to ParquetFileWriter class (#756)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 57f6b46  PARQUET-1790: Add Api for writing DataPageV2 to ParquetFileWriter class (#756)
57f6b46 is described below

commit 57f6b46dde5926cc602c583940dd6424f10b9a17
Author: Brian Mwambazi <br...@users.noreply.github.com>
AuthorDate: Wed Feb 12 12:55:07 2020 +0100

    PARQUET-1790: Add Api for writing DataPageV2 to ParquetFileWriter class (#756)
---
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 74 ++++++++++++++++++-
 .../parquet/hadoop/TestParquetFileWriter.java      | 83 ++++++++++++++++++++++
 2 files changed, 156 insertions(+), 1 deletion(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 39a75bc..d1cdee7 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -575,6 +575,71 @@ public class ParquetFileWriter {
   }
 
   /**
+   * Writes a single v2 data page
+   * @param rowCount count of rows
+   * @param nullCount count of nulls
+   * @param valueCount count of values
+   * @param repetitionLevels repetition level bytes
+   * @param definitionLevels definition level bytes
+   * @param dataEncoding encoding for data
+   * @param compressedData compressed data bytes
+   * @param uncompressedDataSize the size of uncompressed data
+   * @param statistics the statistics of the page
+   * @throws IOException if any I/O error occurs during writing the file
+   */
+  public void writeDataPageV2(int rowCount, int nullCount, int valueCount,
+                              BytesInput repetitionLevels,
+                              BytesInput definitionLevels,
+                              Encoding dataEncoding,
+                              BytesInput compressedData,
+                              int uncompressedDataSize,
+                              Statistics<?> statistics) throws IOException {
+    state = state.write();
+    int rlByteLength = toIntWithCheck(repetitionLevels.size());
+    int dlByteLength = toIntWithCheck(definitionLevels.size());
+
+    int compressedSize = toIntWithCheck(
+      compressedData.size() + repetitionLevels.size() + definitionLevels.size()
+    );
+
+    int uncompressedSize = toIntWithCheck(
+      uncompressedDataSize + repetitionLevels.size() + definitionLevels.size()
+    );
+
+    long beforeHeader = out.getPos();
+    if (firstPageOffset == -1) {
+      firstPageOffset = beforeHeader;
+    }
+
+    metadataConverter.writeDataPageV2Header(
+      uncompressedSize, compressedSize,
+      valueCount, nullCount, rowCount,
+      dataEncoding,
+      rlByteLength,
+      dlByteLength,
+      out);
+
+    long headersSize  = out.getPos() - beforeHeader;
+    this.uncompressedLength += uncompressedSize + headersSize;
+    this.compressedLength += compressedSize + headersSize;
+
+    if (currentStatistics == null) {
+      currentStatistics = statistics.copy();
+    } else {
+      currentStatistics.mergeStatistics(statistics);
+    }
+
+    columnIndexBuilder.add(statistics);
+    currentEncodings.add(dataEncoding);
+    encodingStatsBuilder.addDataEncoding(dataEncoding);
+
+    BytesInput.concat(repetitionLevels, definitionLevels, compressedData)
+      .writeAllTo(out);
+
+    offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount);
+  }
+
+  /**
    * Writes a column chunk at once
    * @param descriptor the descriptor of the column
    * @param valueCount the value count in this column
@@ -886,6 +951,13 @@ public class ParquetFileWriter {
     }
   }
 
+  private int toIntWithCheck(long size) {
+    if ((int)size != size) {
+      throw new ParquetEncodingException("Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + size);
+    }
+    return (int)size;
+  }
+
   private static void serializeOffsetIndexes(
       List<List<OffsetIndex>> offsetIndexes,
       List<BlockMetaData> blocks,
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 8763cac..6539719 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Version;
 import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.junit.Assume;
 import org.junit.Rule;
@@ -220,6 +221,69 @@ public class TestParquetFileWriter {
   }
 
   @Test
+  public void testWriteReadDataPageV2() throws Exception {
+    File testFile = temp.newFile();
+    testFile.delete();
+
+    Path path = new Path(testFile.toURI());
+    Configuration configuration = new Configuration();
+
+    ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
+    w.start();
+    w.startBlock(14);
+
+    BytesInput repLevels = BytesInput.fromInt(2);
+    BytesInput defLevels = BytesInput.fromInt(1);
+    BytesInput data = BytesInput.fromInt(3);
+    BytesInput data2 = BytesInput.fromInt(10);
+
+    org.apache.parquet.column.statistics.Statistics<?> statsC1P1 = createStatistics("s", "z", C1);
+    org.apache.parquet.column.statistics.Statistics<?> statsC1P2 = createStatistics("b", "d", C1);
+
+    w.startColumn(C1, 6, CODEC);
+    long c1Starts = w.getPos();
+    w.writeDataPageV2(4, 1, 3, repLevels, defLevels, PLAIN, data,  4, statsC1P1);
+    w.writeDataPageV2(3, 0, 3, repLevels, defLevels, PLAIN, data,  4, statsC1P2);
+    w.endColumn();
+    long c1Ends = w.getPos();
+
+    w.startColumn(C2, 5, CODEC);
+    long c2Starts = w.getPos();
+    w.writeDataPageV2(5, 2, 3, repLevels, defLevels, PLAIN, data2,  4, EMPTY_STATS);
+    w.writeDataPageV2(2, 0, 2, repLevels, defLevels, PLAIN, data2,  4, EMPTY_STATS);
+    w.endColumn();
+    long c2Ends = w.getPos();
+
+    w.endBlock();
+    w.end(new HashMap<>());
+
+    ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
+    assertEquals("footer: "+ readFooter, 1, readFooter.getBlocks().size());
+    assertEquals(c1Ends - c1Starts, readFooter.getBlocks().get(0).getColumns().get(0).getTotalSize());
+    assertEquals(c2Ends - c2Starts, readFooter.getBlocks().get(0).getColumns().get(1).getTotalSize());
+    assertEquals(c2Ends - c1Starts, readFooter.getBlocks().get(0).getTotalByteSize());
+
+    //check for stats
+    org.apache.parquet.column.statistics.Statistics<?> expectedStats = createStatistics("b", "z", C1);
+    TestUtils.assertStatsValuesEqual(expectedStats, readFooter.getBlocks().get(0).getColumns().get(0).getStatistics());
+
+    HashSet<Encoding> expectedEncoding = new HashSet<Encoding>();
+    expectedEncoding.add(PLAIN);
+    assertEquals(expectedEncoding, readFooter.getBlocks().get(0).getColumns().get(0).getEncodings());
+
+    ParquetFileReader reader = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+      readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+
+    PageReadStore pages = reader.readNextRowGroup();
+    assertEquals(14, pages.getRowCount());
+    validateV2Page(SCHEMA, pages, PATH1, 3, 4, 1, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12);
+    validateV2Page(SCHEMA, pages, PATH1, 3, 3, 0, repLevels.toByteArray(), defLevels.toByteArray(),data.toByteArray(), 12);
+    validateV2Page(SCHEMA, pages, PATH2, 3, 5, 2, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12);
+    validateV2Page(SCHEMA, pages, PATH2, 2, 2, 0, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12);
+    assertNull(reader.readNextRowGroup());
+  }
+
+  @Test
   public void testAlignmentWithPadding() throws Exception {
     File testFile = temp.newFile();
 
@@ -680,6 +744,25 @@ public class TestParquetFileWriter {
     w.end(extraMetaData);
   }
 
+  private void validateV2Page(MessageType schema, PageReadStore pages, String[] path, int values, int rows, int nullCount,
+                              byte[] repetition, byte[] definition, byte[] data, int uncompressedSize) throws IOException {
+    PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
+    DataPageV2 page = (DataPageV2)pageReader.readPage();
+    assertEquals(values, page.getValueCount());
+    assertEquals(rows, page.getRowCount());
+    assertEquals(nullCount, page.getNullCount());
+    assertEquals(uncompressedSize, page.getUncompressedSize());
+    assertArrayEquals(repetition, page.getRepetitionLevels().toByteArray());
+    assertArrayEquals(definition, page.getDefinitionLevels().toByteArray());
+    assertArrayEquals(data, page.getData().toByteArray());
+  }
+
+  private org.apache.parquet.column.statistics.Statistics<?> createStatistics(String min, String max, ColumnDescriptor col) {
+    return org.apache.parquet.column.statistics.Statistics .getBuilderForReading(col.getPrimitiveType())
+      .withMin(Binary.fromString(min).getBytes()).withMax(Binary.fromString(max).getBytes()).withNumNulls(0)
+      .build();
+  }
+
   private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes) throws IOException {
     PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
     DataPage page = pageReader.readPage();