You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by GitBox <gi...@apache.org> on 2022/11/23 15:57:10 UTC

[GitHub] [parquet-mr] wgtmac commented on a diff in pull request #968: PARQUET-2149: Async IO implementation for ParquetFileReader

wgtmac commented on code in PR #968:
URL: https://github.com/apache/parquet-mr/pull/968#discussion_r1030605835


##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.BlockCipher.Decryptor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader.Chunk;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the reading of a single page.
+ */
+public class FilePageReader implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FilePageReader.class);
+
+  private final ParquetFileReader parquetFileReader;
+  private final Chunk chunk;
+  private final int currentBlock;
+  private final BlockCipher.Decryptor headerBlockDecryptor;
+  private final BlockCipher.Decryptor pageBlockDecryptor;
+  private final byte[] aadPrefix;
+  private final int rowGroupOrdinal;
+  private final int columnOrdinal;
+
+  //state
+  private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new LinkedBlockingDeque<>();
+  private DictionaryPage dictionaryPage = null;
+  private int pageIndex = 0;
+  private long valuesCountReadSoFar = 0;
+  private int dataPageCountReadSoFar = 0;
+
+  // derived
+  private final PrimitiveType type;
+  private final byte[] dataPageAAD;
+  private byte[] dataPageHeaderAAD = null;
+
+  private final BytesInputDecompressor decompressor;
+
+  private final ConcurrentLinkedQueue<Future<Void>> readFutures = new ConcurrentLinkedQueue<>();
+
+  private final LongAdder totalTimeReadOnePage = new LongAdder();
+  private final LongAdder totalCountReadOnePage = new LongAdder();
+  private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L);
+  private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+  private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+  private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L);
+
+  public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int currentBlock,
+    Decryptor headerBlockDecryptor,
+    Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal,
+    BytesInputDecompressor decompressor
+  ) {
+    this.parquetFileReader = parquetFileReader;
+    this.chunk = chunk;
+    this.currentBlock = currentBlock;
+    this.headerBlockDecryptor = headerBlockDecryptor;
+    this.pageBlockDecryptor = pageBlockDecryptor;
+    this.aadPrefix = aadPrefix;
+    this.rowGroupOrdinal = rowGroupOrdinal;
+    this.columnOrdinal = columnOrdinal;
+    this.decompressor = decompressor;
+
+    this.type = parquetFileReader.getFileMetaData().getSchema()
+      .getType(chunk.getDescriptor().getCol().getPath()).asPrimitiveType();
+
+    if (null != headerBlockDecryptor) {
+      dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader,
+        rowGroupOrdinal,
+        columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+    }
+    if (null != pageBlockDecryptor) {
+      dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal,
+        columnOrdinal, 0);
+    } else {
+      dataPageAAD = null;
+    }
+  }
+
+  public DictionaryPage getDictionaryPage() {
+    return this.dictionaryPage;
+  }
+
+  public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk() {
+    return this.pagesInChunk;
+  }
+
+  void readAllRemainingPagesAsync() {
+    readFutures.offer(ParquetFileReader.processThreadPool.submit(new FilePageReaderTask(this)));
+  }
+
+  void readAllRemainingPages() throws IOException {
+    while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+      readOnePage();
+    }
+    if (chunk.offsetIndex == null
+      && valuesCountReadSoFar != chunk.getDescriptor().getMetadata().getValueCount()) {
+      // Would be nice to have a CorruptParquetFileException or something as a subclass?
+      throw new IOException(
+        "Expected " + chunk.getDescriptor().getMetadata().getValueCount()
+          + " values in column chunk at " +
+          parquetFileReader.getPath() + " offset "
+          + chunk.descriptor.getMetadata().getFirstDataPageOffset() +
+          " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+          + " pages ending at file offset " + (chunk.getDescriptor().getFileOffset()
+          + chunk.stream.position()));
+    }
+    try {
+      pagesInChunk.put(Optional.empty()); // add a marker for end of data
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while reading page data", e);
+    }
+  }
+
+  void readOnePage() throws IOException {
+    long startRead = System.nanoTime();
+    try {
+      byte[] pageHeaderAAD = dataPageHeaderAAD;
+      if (null != headerBlockDecryptor) {
+        // Important: this verifies file integrity (makes sure dictionary page had not been removed)
+        if (null == dictionaryPage && chunk.getDescriptor().getMetadata().hasDictionaryPage()) {
+          pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPageHeader,
+            rowGroupOrdinal, columnOrdinal, -1);
+        } else {
+          int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+          AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+        }
+      }
+      PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, pageHeaderAAD);
+      int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      final BytesInput pageBytes;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          // there is only one dictionary page per column chunk
+          if (dictionaryPage != null) {
+            throw new ParquetDecodingException(
+              "more than one dictionary page in column " + chunk.getDescriptor().getCol());
+          }
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);
+          if (parquetFileReader.options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
+            chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+              "could not verify dictionary page integrity, CRC checksum verification failed");
+          }
+          DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
+          DictionaryPage compressedDictionaryPage =
+            new DictionaryPage(
+              pageBytes,
+              uncompressedPageSize,
+              dicHeader.getNum_values(),
+              parquetFileReader.converter.getEncoding(dicHeader.getEncoding())
+            );
+          // Copy crc to new page, used for testing
+          if (pageHeader.isSetCrc()) {
+            compressedDictionaryPage.setCrc(pageHeader.getCrc());
+          }
+          dictionaryPage = compressedDictionaryPage;
+          break;
+        case DATA_PAGE:
+          DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);

Review Comment:
   `chunk.readAsBytesInput(compressedPageSize)` is running inside `processThreadPool ` and what it does is as below:
   ```
       public BytesInput readAsBytesInput(int size) throws IOException {
         if (LOG.isDebugEnabled()) {
           String mode = (isAsyncIOReaderEnabled()) ? "ASYNC" : "SYNC";
           LOG.debug("{} READ BYTES INPUT: stream {}", mode, stream);
         }
         return BytesInput.from(stream.sliceBuffers(size));
       }
   ```
   
   The `stream.sliceBuffers(size)` is a blocking call to `SequenceByteBufferInputStream.sliceBuffers()` which calls `AsyncMultiBufferInputStream.sliceBuffers()` internally and waits for return of `AsyncMultiBufferInputStream.nextBuffer()`. Please note that `AsyncMultiBufferInputStream.nextBuffer()` is running in the `ioThreadPool `.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.BlockCipher.Decryptor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader.Chunk;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the reading of a single page.
+ */
+public class FilePageReader implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FilePageReader.class);
+
+  private final ParquetFileReader parquetFileReader;
+  private final Chunk chunk;
+  private final int currentBlock;
+  private final BlockCipher.Decryptor headerBlockDecryptor;
+  private final BlockCipher.Decryptor pageBlockDecryptor;
+  private final byte[] aadPrefix;
+  private final int rowGroupOrdinal;
+  private final int columnOrdinal;
+
+  //state
+  private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new LinkedBlockingDeque<>();
+  private DictionaryPage dictionaryPage = null;
+  private int pageIndex = 0;
+  private long valuesCountReadSoFar = 0;
+  private int dataPageCountReadSoFar = 0;
+
+  // derived
+  private final PrimitiveType type;
+  private final byte[] dataPageAAD;
+  private byte[] dataPageHeaderAAD = null;
+
+  private final BytesInputDecompressor decompressor;
+
+  private final ConcurrentLinkedQueue<Future<Void>> readFutures = new ConcurrentLinkedQueue<>();
+
+  private final LongAdder totalTimeReadOnePage = new LongAdder();
+  private final LongAdder totalCountReadOnePage = new LongAdder();
+  private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L);
+  private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+  private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+  private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L);
+
+  public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int currentBlock,
+    Decryptor headerBlockDecryptor,
+    Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal,
+    BytesInputDecompressor decompressor
+  ) {
+    this.parquetFileReader = parquetFileReader;
+    this.chunk = chunk;
+    this.currentBlock = currentBlock;
+    this.headerBlockDecryptor = headerBlockDecryptor;
+    this.pageBlockDecryptor = pageBlockDecryptor;
+    this.aadPrefix = aadPrefix;
+    this.rowGroupOrdinal = rowGroupOrdinal;
+    this.columnOrdinal = columnOrdinal;
+    this.decompressor = decompressor;
+
+    this.type = parquetFileReader.getFileMetaData().getSchema()
+      .getType(chunk.getDescriptor().getCol().getPath()).asPrimitiveType();
+
+    if (null != headerBlockDecryptor) {
+      dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader,
+        rowGroupOrdinal,
+        columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+    }
+    if (null != pageBlockDecryptor) {
+      dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal,
+        columnOrdinal, 0);
+    } else {
+      dataPageAAD = null;
+    }
+  }
+
+  public DictionaryPage getDictionaryPage() {
+    return this.dictionaryPage;
+  }
+
+  public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk() {
+    return this.pagesInChunk;
+  }
+
+  void readAllRemainingPagesAsync() {
+    readFutures.offer(ParquetFileReader.processThreadPool.submit(new FilePageReaderTask(this)));
+  }
+
+  void readAllRemainingPages() throws IOException {
+    while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+      readOnePage();
+    }
+    if (chunk.offsetIndex == null
+      && valuesCountReadSoFar != chunk.getDescriptor().getMetadata().getValueCount()) {
+      // Would be nice to have a CorruptParquetFileException or something as a subclass?
+      throw new IOException(
+        "Expected " + chunk.getDescriptor().getMetadata().getValueCount()
+          + " values in column chunk at " +
+          parquetFileReader.getPath() + " offset "
+          + chunk.descriptor.getMetadata().getFirstDataPageOffset() +
+          " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+          + " pages ending at file offset " + (chunk.getDescriptor().getFileOffset()
+          + chunk.stream.position()));
+    }
+    try {
+      pagesInChunk.put(Optional.empty()); // add a marker for end of data
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while reading page data", e);
+    }
+  }
+
+  void readOnePage() throws IOException {
+    long startRead = System.nanoTime();
+    try {
+      byte[] pageHeaderAAD = dataPageHeaderAAD;
+      if (null != headerBlockDecryptor) {
+        // Important: this verifies file integrity (makes sure dictionary page had not been removed)
+        if (null == dictionaryPage && chunk.getDescriptor().getMetadata().hasDictionaryPage()) {
+          pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPageHeader,
+            rowGroupOrdinal, columnOrdinal, -1);
+        } else {
+          int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+          AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+        }
+      }
+      PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, pageHeaderAAD);
+      int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      final BytesInput pageBytes;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          // there is only one dictionary page per column chunk
+          if (dictionaryPage != null) {
+            throw new ParquetDecodingException(
+              "more than one dictionary page in column " + chunk.getDescriptor().getCol());
+          }
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);
+          if (parquetFileReader.options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
+            chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+              "could not verify dictionary page integrity, CRC checksum verification failed");
+          }
+          DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
+          DictionaryPage compressedDictionaryPage =
+            new DictionaryPage(
+              pageBytes,
+              uncompressedPageSize,
+              dicHeader.getNum_values(),
+              parquetFileReader.converter.getEncoding(dicHeader.getEncoding())
+            );
+          // Copy crc to new page, used for testing
+          if (pageHeader.isSetCrc()) {
+            compressedDictionaryPage.setCrc(pageHeader.getCrc());
+          }
+          dictionaryPage = compressedDictionaryPage;
+          break;
+        case DATA_PAGE:
+          DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);

Review Comment:
   In the current implementation, tasks of `ioThreadPool` are submitted first and then tasks of `processThreadPool` get submitted. If we merge these two thread pools into a single one, problem will happen due to uncertainty of the execution order in the ExecutorService. When the tasks that are previously submitted to `processThreadPool` are running first and have exhausted all the threads in the pool, they will be unable to proceed because none of the I/O tasks manage to get an available thread. Please correct me if I am wrong. Thanks! @parthchandra 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/FilePageReader.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
+import org.apache.parquet.crypto.AesCipher;
+import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
+import org.apache.parquet.format.BlockCipher;
+import org.apache.parquet.format.BlockCipher.Decryptor;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.hadoop.ParquetFileReader.Chunk;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the reading of a single page.
+ */
+public class FilePageReader implements Closeable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FilePageReader.class);
+
+  private final ParquetFileReader parquetFileReader;
+  private final Chunk chunk;
+  private final int currentBlock;
+  private final BlockCipher.Decryptor headerBlockDecryptor;
+  private final BlockCipher.Decryptor pageBlockDecryptor;
+  private final byte[] aadPrefix;
+  private final int rowGroupOrdinal;
+  private final int columnOrdinal;
+
+  //state
+  private final LinkedBlockingDeque<Optional<DataPage>> pagesInChunk = new LinkedBlockingDeque<>();
+  private DictionaryPage dictionaryPage = null;
+  private int pageIndex = 0;
+  private long valuesCountReadSoFar = 0;
+  private int dataPageCountReadSoFar = 0;
+
+  // derived
+  private final PrimitiveType type;
+  private final byte[] dataPageAAD;
+  private byte[] dataPageHeaderAAD = null;
+
+  private final BytesInputDecompressor decompressor;
+
+  private final ConcurrentLinkedQueue<Future<Void>> readFutures = new ConcurrentLinkedQueue<>();
+
+  private final LongAdder totalTimeReadOnePage = new LongAdder();
+  private final LongAdder totalCountReadOnePage = new LongAdder();
+  private final LongAccumulator maxTimeReadOnePage = new LongAccumulator(Long::max, 0L);
+  private final LongAdder totalTimeBlockedPagesInChunk = new LongAdder();
+  private final LongAdder totalCountBlockedPagesInChunk = new LongAdder();
+  private final LongAccumulator maxTimeBlockedPagesInChunk = new LongAccumulator(Long::max, 0L);
+
+  public FilePageReader(ParquetFileReader parquetFileReader, Chunk chunk, int currentBlock,
+    Decryptor headerBlockDecryptor,
+    Decryptor pageBlockDecryptor, byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal,
+    BytesInputDecompressor decompressor
+  ) {
+    this.parquetFileReader = parquetFileReader;
+    this.chunk = chunk;
+    this.currentBlock = currentBlock;
+    this.headerBlockDecryptor = headerBlockDecryptor;
+    this.pageBlockDecryptor = pageBlockDecryptor;
+    this.aadPrefix = aadPrefix;
+    this.rowGroupOrdinal = rowGroupOrdinal;
+    this.columnOrdinal = columnOrdinal;
+    this.decompressor = decompressor;
+
+    this.type = parquetFileReader.getFileMetaData().getSchema()
+      .getType(chunk.getDescriptor().getCol().getPath()).asPrimitiveType();
+
+    if (null != headerBlockDecryptor) {
+      dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader,
+        rowGroupOrdinal,
+        columnOrdinal, chunk.getPageOrdinal(dataPageCountReadSoFar));
+    }
+    if (null != pageBlockDecryptor) {
+      dataPageAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPage, rowGroupOrdinal,
+        columnOrdinal, 0);
+    } else {
+      dataPageAAD = null;
+    }
+  }
+
+  public DictionaryPage getDictionaryPage() {
+    return this.dictionaryPage;
+  }
+
+  public LinkedBlockingDeque<Optional<DataPage>> getPagesInChunk() {
+    return this.pagesInChunk;
+  }
+
+  void readAllRemainingPagesAsync() {
+    readFutures.offer(ParquetFileReader.processThreadPool.submit(new FilePageReaderTask(this)));
+  }
+
+  void readAllRemainingPages() throws IOException {
+    while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
+      readOnePage();
+    }
+    if (chunk.offsetIndex == null
+      && valuesCountReadSoFar != chunk.getDescriptor().getMetadata().getValueCount()) {
+      // Would be nice to have a CorruptParquetFileException or something as a subclass?
+      throw new IOException(
+        "Expected " + chunk.getDescriptor().getMetadata().getValueCount()
+          + " values in column chunk at " +
+          parquetFileReader.getPath() + " offset "
+          + chunk.descriptor.getMetadata().getFirstDataPageOffset() +
+          " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
+          + " pages ending at file offset " + (chunk.getDescriptor().getFileOffset()
+          + chunk.stream.position()));
+    }
+    try {
+      pagesInChunk.put(Optional.empty()); // add a marker for end of data
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException("Interrupted while reading page data", e);
+    }
+  }
+
+  void readOnePage() throws IOException {
+    long startRead = System.nanoTime();
+    try {
+      byte[] pageHeaderAAD = dataPageHeaderAAD;
+      if (null != headerBlockDecryptor) {
+        // Important: this verifies file integrity (makes sure dictionary page had not been removed)
+        if (null == dictionaryPage && chunk.getDescriptor().getMetadata().hasDictionaryPage()) {
+          pageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DictionaryPageHeader,
+            rowGroupOrdinal, columnOrdinal, -1);
+        } else {
+          int pageOrdinal = chunk.getPageOrdinal(dataPageCountReadSoFar);
+          AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal);
+        }
+      }
+      PageHeader pageHeader = chunk.readPageHeader(headerBlockDecryptor, pageHeaderAAD);
+      int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+      int compressedPageSize = pageHeader.getCompressed_page_size();
+      final BytesInput pageBytes;
+      switch (pageHeader.type) {
+        case DICTIONARY_PAGE:
+          // there is only one dictionary page per column chunk
+          if (dictionaryPage != null) {
+            throw new ParquetDecodingException(
+              "more than one dictionary page in column " + chunk.getDescriptor().getCol());
+          }
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);
+          if (parquetFileReader.options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
+            chunk.verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+              "could not verify dictionary page integrity, CRC checksum verification failed");
+          }
+          DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
+          DictionaryPage compressedDictionaryPage =
+            new DictionaryPage(
+              pageBytes,
+              uncompressedPageSize,
+              dicHeader.getNum_values(),
+              parquetFileReader.converter.getEncoding(dicHeader.getEncoding())
+            );
+          // Copy crc to new page, used for testing
+          if (pageHeader.isSetCrc()) {
+            compressedDictionaryPage.setCrc(pageHeader.getCrc());
+          }
+          dictionaryPage = compressedDictionaryPage;
+          break;
+        case DATA_PAGE:
+          DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
+          pageBytes = chunk.readAsBytesInput(compressedPageSize);

Review Comment:
   IIUC, the task in the `processThreadPool` is blocked here to wait for the completion of task in the `ioThreadPool`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@parquet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org