You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/07 08:45:00 UTC

[jira] [Commented] (PARQUET-1414) Limit page size based on maximum row count

    [ https://issues.apache.org/jira/browse/PARQUET-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16677848#comment-16677848 ] 

ASF GitHub Bot commented on PARQUET-1414:
-----------------------------------------

gszadovszky closed pull request #531: PARQUET-1414: Limit page size based on maximum row count
URL: https://github.com/apache/parquet-mr/pull/531
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index b17323933..41e482cfd 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -48,6 +48,7 @@
   public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
   public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
+  public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
 
   public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
 
@@ -85,10 +86,11 @@ public static WriterVersion fromString(String name) {
   private final ByteBufferAllocator allocator;
   private final ValuesWriterFactory valuesWriterFactory;
   private final int columnIndexTruncateLength;
+  private final int pageRowCountLimit;
 
   private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
                             int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
-                            ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength) {
+                            ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) {
     this.pageSizeThreshold = pageSize;
     this.initialSlabSize = CapacityByteArrayOutputStream
       .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -102,6 +104,7 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
 
     this.valuesWriterFactory = writerFactory;
     this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
+    this.pageRowCountLimit = pageRowCountLimit;
   }
 
   public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -194,6 +197,10 @@ public boolean estimateNextSizeCheck() {
     return estimateNextSizeCheck;
   }
 
+  public int getPageRowCountLimit() {
+    return pageRowCountLimit;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -213,18 +220,22 @@ public static Builder copy(ParquetProperties toCopy) {
     private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
     private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
     private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+    private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
 
     private Builder() {
     }
 
     private Builder(ParquetProperties toCopy) {
+      this.pageSize = toCopy.pageSizeThreshold;
       this.enableDict = toCopy.enableDictionary;
       this.dictPageSize = toCopy.dictionaryPageSizeThreshold;
       this.writerVersion = toCopy.writerVersion;
       this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck;
       this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck;
       this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck;
+      this.valuesWriterFactory = toCopy.valuesWriterFactory;
       this.allocator = toCopy.allocator;
+      this.pageRowCountLimit = toCopy.pageRowCountLimit;
     }
 
     /**
@@ -313,11 +324,17 @@ public Builder withColumnIndexTruncateLength(int length) {
       return this;
     }
 
+    public Builder withPageRowCountLimit(int rowCount) {
+      Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
+      pageRowCountLimit = rowCount;
+      return this;
+    }
+
     public ParquetProperties build() {
       ParquetProperties properties =
         new ParquetProperties(writerVersion, pageSize, dictPageSize,
           enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
-          estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength);
+          estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit);
       // we pass a constructed but uninitialized factory to ParquetProperties above as currently
       // creation of ValuesWriters is invoked from within ParquetProperties. In the future
       // we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index 5cd7d876e..f79c09de2 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -67,7 +67,7 @@
 
     this.columns = new TreeMap<>();
 
-    this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+    this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit());
 
     columnWriterProvider = new ColumnWriterProvider() {
       @Override
@@ -95,7 +95,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
     }
     this.columns = unmodifiableMap(mcolumns);
 
-    this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+    this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit());
 
     columnWriterProvider = new ColumnWriterProvider() {
       @Override
@@ -190,13 +190,17 @@ public void endRecord() {
 
   private void sizeCheck() {
     long minRecordToWait = Long.MAX_VALUE;
+    int pageRowCountLimit = props.getPageRowCountLimit();
+    long rowCountForNextRowCountCheck = rowCount + pageRowCountLimit;
     for (ColumnWriterBase writer : columns.values()) {
       long usedMem = writer.getCurrentPageBufferedSize();
       long rows = rowCount - writer.getRowsWrittenSoFar();
       long remainingMem = props.getPageSizeThreshold() - usedMem;
-      if (remainingMem <= thresholdTolerance) {
+      if (remainingMem <= thresholdTolerance || rows >= pageRowCountLimit) {
         writer.writePage();
         remainingMem = props.getPageSizeThreshold();
+      } else {
+        rowCountForNextRowCountCheck = min(rowCountForNextRowCountCheck, rowCount + (pageRowCountLimit - rows));
       }
       long rowsToFillPage =
           usedMem == 0 ?
@@ -219,5 +223,10 @@ private void sizeCheck() {
     } else {
       rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck();
     }
+
+    // Do the check earlier if required to keep the row count limit
+    if (rowCountForNextRowCountCheck < rowCountForNextSizeCheck) {
+      rowCountForNextSizeCheck = rowCountForNextRowCountCheck;
+    }
   }
 }
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index e5db38c94..f89d0cbf7 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -18,19 +18,28 @@
  */
 package org.apache.parquet.column.mem;
 
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.impl.ColumnReadStoreImpl;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.impl.ColumnWriteStoreV2;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.page.mem.MemPageStore;
 import org.apache.parquet.example.DummyRecordConverter;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -166,6 +175,68 @@ public void testMemColumnSeveralPagesRepeated() throws Exception {
     }
   }
 
+  @Test
+  public void testPageSize() {
+    MessageType schema = Types.buildMessage()
+        .requiredList().requiredElement(BINARY).named("binary_col")
+        .requiredList().requiredElement(INT32).named("int32_col")
+        .named("msg");
+    System.out.println(schema);
+    MemPageStore memPageStore = new MemPageStore(123);
+
+    // Using V2 pages so we have rowCount info
+    ColumnWriteStore writeStore = new ColumnWriteStoreV2(schema, memPageStore, ParquetProperties.builder()
+        .withPageSize(1024) // Less than 10 records for binary_col
+        .withMinRowCountForPageSizeCheck(1) // Enforce having precise page sizing
+        .withPageRowCountLimit(10)
+        .withDictionaryEncoding(false) // Enforce having large binary_col pages
+        .build());
+    ColumnDescriptor binaryCol = schema.getColumnDescription(new String[] { "binary_col", "list", "element" });
+    ColumnWriter binaryColWriter = writeStore.getColumnWriter(binaryCol);
+    ColumnDescriptor int32Col = schema.getColumnDescription(new String[] { "int32_col", "list", "element" });
+    ColumnWriter int32ColWriter = writeStore.getColumnWriter(int32Col);
+    // Writing 123 records
+    for (int i = 0; i < 123; ++i) {
+      // Writing 10 values per record
+      for (int j = 0; j < 10; ++j) {
+        binaryColWriter.write(Binary.fromString("aaaaaaaaaaaa"), j == 0 ? 0 : 2, 2);
+        int32ColWriter.write(42, j == 0 ? 0 : 2, 2);
+      }
+      writeStore.endRecord();
+    }
+    writeStore.flush();
+
+    // Check that all the binary_col pages are <= 1024 bytes
+    {
+      PageReader binaryColPageReader = memPageStore.getPageReader(binaryCol);
+      assertEquals(1230, binaryColPageReader.getTotalValueCount());
+      int pageCnt = 0;
+      int valueCnt = 0;
+      while (valueCnt < binaryColPageReader.getTotalValueCount()) {
+        DataPage page = binaryColPageReader.readPage();
+        ++pageCnt;
+        valueCnt += page.getValueCount();
+        LOG.info("binary_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get());
+        assertTrue("Compressed size should be less than 1024", page.getCompressedSize() <= 1024);
+      }
+    }
+
+    // Check that all the int32_col pages contain <= 10 rows
+    {
+      PageReader int32ColPageReader = memPageStore.getPageReader(int32Col);
+      assertEquals(1230, int32ColPageReader.getTotalValueCount());
+      int pageCnt = 0;
+      int valueCnt = 0;
+      while (valueCnt < int32ColPageReader.getTotalValueCount()) {
+        DataPage page = int32ColPageReader.readPage();
+        ++pageCnt;
+        valueCnt += page.getValueCount();
+        LOG.info("int32_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get());
+        assertTrue("Row count should be less than 10", page.getIndexRowCount().get() <= 10);
+      }
+    }
+  }
+
   private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
     return new ColumnWriteStoreV1(memPageStore,
         ParquetProperties.builder()
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 0789bf50d..04cbd15c0 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -144,6 +144,7 @@
   public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
   public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
   public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
+  public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
 
   public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
     String level = conf.get(JOB_SUMMARY_LEVEL);
@@ -325,6 +326,18 @@ private static int getColumnIndexTruncateLength(Configuration conf) {
     return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
   }
 
+  public static void setPageRowCountLimit(JobContext jobContext, int rowCount) {
+    setPageRowCountLimit(getConfiguration(jobContext), rowCount);
+  }
+
+  public static void setPageRowCountLimit(Configuration conf, int rowCount) {
+    conf.setInt(PAGE_ROW_COUNT_LIMIT, rowCount);
+  }
+
+  private static int getPageRowCountLimit(Configuration conf) {
+    return conf.getInt(PAGE_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT);
+  }
+
   private WriteSupport<T> writeSupport;
   private ParquetOutputCommitter committer;
 
@@ -380,6 +393,7 @@ private static int getColumnIndexTruncateLength(Configuration conf) {
         .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
         .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
         .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
+        .withPageRowCountLimit(getPageRowCountLimit(conf))
         .build();
 
     long blockSize = getLongBlockSize(conf);
@@ -398,6 +412,7 @@ private static int getColumnIndexTruncateLength(Configuration conf) {
       LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
       LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
       LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
+      LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
     }
 
     WriteContext init = writeSupport.init(conf);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 5b0e4f82d..1ed5e32ca 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -425,6 +425,17 @@ public SELF withPageSize(int pageSize) {
       return self();
     }
 
+    /**
+     * Sets the Parquet format page row count limit used by the constructed writer.
+     *
+     * @param rowCount limit for the number of rows stored in a page
+     * @return this builder for method chaining
+     */
+    public SELF withPageRowCountLimit(int rowCount) {
+      encodingPropsBuilder.withPageRowCountLimit(rowCount);
+      return self();
+    }
+
     /**
      * Set the Parquet format dictionary page size used by the constructed
      * writer.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Limit page size based on maximum row count
> ------------------------------------------
>
>                 Key: PARQUET-1414
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1414
>             Project: Parquet
>          Issue Type: Improvement
>            Reporter: Gabor Szadovszky
>            Assignee: Gabor Szadovszky
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>
> For column index based filtering it is important to have enough pages for a column. In case of a perfectly matching encoding for the suitable data it can happen that all of the values can be encoded in one page (e.g. a column of an ascending counter).
> With this improvement we would be able to limit the pages by the maximum number of rows to be written in it so we would have enough pages for every column.
> Based on the benchmarks listed [here|https://docs.google.com/spreadsheets/d/1hfQPy8NkGbgGugnHWvIHSzZ-3Q5M7f3Dtf_oD9ACFRg] 20k seems to be a good choice for the default value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)