You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/10/20 18:05:12 UTC

[GitHub] [drill] vvysotskyi commented on a change in pull request #2338: DRILL-1282 Move parquet to use v2 format as default

vvysotskyi commented on a change in pull request #2338:
URL: https://github.com/apache/drill/pull/2338#discussion_r732979821



##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
##########
@@ -78,209 +79,237 @@
  *
  */
 class AsyncPageReader extends PageReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+  static final Logger logger = LoggerFactory.getLogger(AsyncPageReader.class);
 
   private ExecutorService threadPool;
   private long queueSize;
   private LinkedBlockingQueue<ReadStatus> pageQueue;
   private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
   private long totalPageValuesRead = 0;
-  private Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
+  private final Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
                                                      // FindBugs complains if we synchronize on a Concurrent Queue
 
-  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
-      ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
-    super(parentStatus, fs, path, columnChunkMetaData);
+  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path) throws ExecutionSetupException {
+    super(parentStatus, fs, path);
     threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
     queueSize = parentColumnReader.parentReader.readQueueSize;
     pageQueue = new LinkedBlockingQueue<>((int) queueSize);
     asyncPageRead = new ConcurrentLinkedQueue<>();
   }
 
   @Override
-  protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
-      final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException {
-    if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-      try {
-        assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
-        long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
-        while (bytesToSkip > 0) {
-          long skipped = dataReader.skip(bytesToSkip);
-          if (skipped > 0) {
-            bytesToSkip -= skipped;
-          } else {
-            // no good way to handle this. Guava uses InputStream.available to check
-            // if EOF is reached and because available is not reliable,
-            // tries to read the rest of the data.
-            DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
-            if (skipBuf != null) {
-              skipBuf.release();
-            } else {
-              throw new EOFException("End of File reached.");
-            }
-          }
-        }
-      } catch (IOException e) {
-        handleAndThrowException(e, "Error Reading dictionary page.");
-      }
-    }
-  }
-
-  @Override protected void init() throws IOException {
+  protected void init() throws IOException {
     super.init();
     //Avoid Init if a shutdown is already in progress even if init() is called once
     if (!parentColumnReader.isShuttingDown) {
       asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
     }
   }
 
-  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
-    DrillBuf data;
-    boolean isDictionary = false;
-    synchronized (this) {
-      data = readStatus.getPageData();
-      readStatus.setPageData(null);
-      isDictionary = readStatus.isDictionaryPage;
-    }
-    if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) {
-      DrillBuf compressedData = data;
-      data = decompress(readStatus.getPageHeader(), compressedData);
-      synchronized (this) {
-        readStatus.setPageData(null);
-      }
-      compressedData.release();
-    } else {
-      if (isDictionary) {
-        stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
-      } else {
-        stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead);
-      }
-    }
-    return data;
+  /**
+   * Reads and stores this column chunk's dictionary page.
+   * @throws IOException
+   */
+  protected void loadDictionary(ReadStatus readStatus) throws IOException {
+    assert readStatus.isDictionaryPage();
+    assert this.dictionary == null;
+
+    // dictData is not a local because we need to release it later.
+    this.dictData = codecName == CompressionCodecName.UNCOMPRESSED
+      ? readStatus.getPageData()
+      : decompressPageV1(readStatus);
+
+    DictionaryPage page = new DictionaryPage(
+      asBytesInput(dictData, 0, pageHeader.uncompressed_page_size),
+      pageHeader.uncompressed_page_size,
+      pageHeader.dictionary_page_header.num_values,
+      valueOf(pageHeader.dictionary_page_header.encoding.name())
+    );
+
+    this.dictionary = page.getEncoding().initDictionary(columnDescriptor, page);
   }
 
-  // Read and decode the dictionary data
-  private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
-      throws UserException {
+  /**
+   * Reads a compressed v1 data page or a dictionary page, both of which are compressed
+   * in their entirety.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV1(ReadStatus readStatus) throws IOException {
+    Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int outputSize = pageHeader.getUncompressed_page_size();
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
+    long timeToRead;
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
-      pageHeader = readStatus.getPageHeader();
-      int uncompressedSize = pageHeader.getUncompressed_page_size();
-      final DrillBuf dictionaryData = getDecompressedPageData(readStatus);
-      Stopwatch timer = Stopwatch.createStarted();
-      allocatedDictionaryBuffers.add(dictionaryData);
-      DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0, uncompressedSize),
-          pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values,
-          valueOf(pageHeader.dictionary_page_header.encoding.name()));
-      this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
-      long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDictPageDecode.addAndGet(timeToDecode);
-    } catch (Exception e) {
-      handleAndThrowException(e, "Error decoding dictionary page.");
+      timer.start();
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(0, inputSize);
+      ByteBuffer output = outputPageData.nioBuffer(0, outputSize);
+
+      decomp.decompress(input, inputSize, output, outputSize);
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)
+      );
+
+      this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
+    } finally {
+      readStatus.setPageData(null);
+      if (inputPageData != null) {
+        inputPageData.release();
+      }
     }
-  }
 
-  private void handleAndThrowException(Exception e, String msg) throws UserException {
-    UserException ex = UserException.dataReadError(e).message(msg)
-        .pushContext("Row Group Start: ", this.parentColumnReader.columnChunkMetaData.getStartingPos())
-        .pushContext("Column: ", this.parentColumnReader.schemaElement.getName())
-        .pushContext("File: ", this.fileName).build(logger);
-    throw ex;
+    return outputPageData;
   }
 
-  private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) {
-    DrillBuf pageDataBuf = null;
+  /**
+   * Reads a compressed v2 data page which excluded the repetition and definition level
+   * sections from compression.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV2(ReadStatus readStatus) throws IOException {
     Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int repLevelSize = pageHeader.data_page_header_v2.getRepetition_levels_byte_length();
+    int defLevelSize = pageHeader.data_page_header_v2.getDefinition_levels_byte_length();
+    int compDataOffset = repLevelSize + defLevelSize;
+    int outputSize = pageHeader.uncompressed_page_size;
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
     long timeToRead;
-    int compressedSize = pageHeader.getCompressed_page_size();
-    int uncompressedSize = pageHeader.getUncompressed_page_size();
-    pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
       timer.start();
+      // Write out the uncompressed section
+      // Note that the following setBytes call to read the repetition and definition level sections
+      // advances readerIndex in inputPageData but not writerIndex in outputPageData.
+      outputPageData.setBytes(0, inputPageData, compDataOffset);
+
+      // decompress from the start of compressed data to the end of the input buffer
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(compDataOffset, inputSize - compDataOffset);
+      ByteBuffer output = outputPageData.nioBuffer(compDataOffset, outputSize - compDataOffset);
+      decomp.decompress(
+        input,
+        inputSize - compDataOffset,
+        output,
+        outputSize - compDataOffset
+      );
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
 
-      CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
-      BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
-      ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
-      ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)

Review comment:
       This method will be called regardless of the logging level...

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
##########
@@ -29,14 +33,27 @@
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
 import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 public abstract class ColumnReader<V extends ValueVector> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnReader.class);
+  public static final Set<Encoding> DICTIONARY_ENCODINGS = new HashSet<>(Arrays.asList(
+    Encoding.PLAIN_DICTIONARY,
+    Encoding.RLE_DICTIONARY
+  ));

Review comment:
       You can use `ImmutableSet` here, since it is public:
   ```suggestion
     public static final Set<Encoding> DICTIONARY_ENCODINGS = ImmutableSet.of(
       Encoding.PLAIN_DICTIONARY,
       Encoding.RLE_DICTIONARY
     );
   ```

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
##########
@@ -29,14 +33,27 @@
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
 import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 public abstract class ColumnReader<V extends ValueVector> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnReader.class);
+  public static final Set<Encoding> DICTIONARY_ENCODINGS = new HashSet<>(Arrays.asList(
+    Encoding.PLAIN_DICTIONARY,
+    Encoding.RLE_DICTIONARY
+  ));
+  public static final Set<Encoding> VALUE_ENCODINGS = Sets.union(
+    DICTIONARY_ENCODINGS,
+    new HashSet<>(Arrays.asList(
+      Encoding.DELTA_BINARY_PACKED,
+      Encoding.DELTA_BYTE_ARRAY,
+      Encoding.DELTA_LENGTH_BYTE_ARRAY
+  )));

Review comment:
       And here the same:
   ```suggestion
     public static final Set<Encoding> VALUE_ENCODINGS = ImmutableSet.<Encoding>builder()
       .addAll(DICTIONARY_ENCODINGS)
       .add(Encoding.DELTA_BINARY_PACKED)
       .add(Encoding.DELTA_BYTE_ARRAY)
       .add(Encoding.DELTA_LENGTH_BYTE_ARRAY)
       .build();
   ```

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
##########
@@ -78,209 +79,237 @@
  *
  */
 class AsyncPageReader extends PageReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+  static final Logger logger = LoggerFactory.getLogger(AsyncPageReader.class);
 
   private ExecutorService threadPool;
   private long queueSize;
   private LinkedBlockingQueue<ReadStatus> pageQueue;
   private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
   private long totalPageValuesRead = 0;
-  private Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
+  private final Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
                                                      // FindBugs complains if we synchronize on a Concurrent Queue
 
-  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
-      ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
-    super(parentStatus, fs, path, columnChunkMetaData);
+  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path) throws ExecutionSetupException {
+    super(parentStatus, fs, path);
     threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
     queueSize = parentColumnReader.parentReader.readQueueSize;
     pageQueue = new LinkedBlockingQueue<>((int) queueSize);
     asyncPageRead = new ConcurrentLinkedQueue<>();
   }
 
   @Override
-  protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
-      final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException {
-    if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-      try {
-        assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
-        long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
-        while (bytesToSkip > 0) {
-          long skipped = dataReader.skip(bytesToSkip);
-          if (skipped > 0) {
-            bytesToSkip -= skipped;
-          } else {
-            // no good way to handle this. Guava uses InputStream.available to check
-            // if EOF is reached and because available is not reliable,
-            // tries to read the rest of the data.
-            DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
-            if (skipBuf != null) {
-              skipBuf.release();
-            } else {
-              throw new EOFException("End of File reached.");
-            }
-          }
-        }
-      } catch (IOException e) {
-        handleAndThrowException(e, "Error Reading dictionary page.");
-      }
-    }
-  }
-
-  @Override protected void init() throws IOException {
+  protected void init() throws IOException {
     super.init();
     //Avoid Init if a shutdown is already in progress even if init() is called once
     if (!parentColumnReader.isShuttingDown) {
       asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
     }
   }
 
-  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
-    DrillBuf data;
-    boolean isDictionary = false;
-    synchronized (this) {
-      data = readStatus.getPageData();
-      readStatus.setPageData(null);
-      isDictionary = readStatus.isDictionaryPage;
-    }
-    if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) {
-      DrillBuf compressedData = data;
-      data = decompress(readStatus.getPageHeader(), compressedData);
-      synchronized (this) {
-        readStatus.setPageData(null);
-      }
-      compressedData.release();
-    } else {
-      if (isDictionary) {
-        stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
-      } else {
-        stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead);
-      }
-    }
-    return data;
+  /**
+   * Reads and stores this column chunk's dictionary page.
+   * @throws IOException
+   */
+  protected void loadDictionary(ReadStatus readStatus) throws IOException {
+    assert readStatus.isDictionaryPage();
+    assert this.dictionary == null;
+
+    // dictData is not a local because we need to release it later.
+    this.dictData = codecName == CompressionCodecName.UNCOMPRESSED
+      ? readStatus.getPageData()
+      : decompressPageV1(readStatus);
+
+    DictionaryPage page = new DictionaryPage(
+      asBytesInput(dictData, 0, pageHeader.uncompressed_page_size),
+      pageHeader.uncompressed_page_size,
+      pageHeader.dictionary_page_header.num_values,
+      valueOf(pageHeader.dictionary_page_header.encoding.name())
+    );
+
+    this.dictionary = page.getEncoding().initDictionary(columnDescriptor, page);
   }
 
-  // Read and decode the dictionary data
-  private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
-      throws UserException {
+  /**
+   * Reads a compressed v1 data page or a dictionary page, both of which are compressed
+   * in their entirety.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV1(ReadStatus readStatus) throws IOException {
+    Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int outputSize = pageHeader.getUncompressed_page_size();
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
+    long timeToRead;
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
-      pageHeader = readStatus.getPageHeader();
-      int uncompressedSize = pageHeader.getUncompressed_page_size();
-      final DrillBuf dictionaryData = getDecompressedPageData(readStatus);
-      Stopwatch timer = Stopwatch.createStarted();
-      allocatedDictionaryBuffers.add(dictionaryData);
-      DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0, uncompressedSize),
-          pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values,
-          valueOf(pageHeader.dictionary_page_header.encoding.name()));
-      this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
-      long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDictPageDecode.addAndGet(timeToDecode);
-    } catch (Exception e) {
-      handleAndThrowException(e, "Error decoding dictionary page.");
+      timer.start();
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(0, inputSize);
+      ByteBuffer output = outputPageData.nioBuffer(0, outputSize);
+
+      decomp.decompress(input, inputSize, output, outputSize);
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)

Review comment:
       And this call too.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
##########
@@ -78,209 +79,237 @@
  *
  */
 class AsyncPageReader extends PageReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+  static final Logger logger = LoggerFactory.getLogger(AsyncPageReader.class);
 
   private ExecutorService threadPool;
   private long queueSize;
   private LinkedBlockingQueue<ReadStatus> pageQueue;
   private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
   private long totalPageValuesRead = 0;
-  private Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
+  private final Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
                                                      // FindBugs complains if we synchronize on a Concurrent Queue
 
-  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
-      ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
-    super(parentStatus, fs, path, columnChunkMetaData);
+  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path) throws ExecutionSetupException {
+    super(parentStatus, fs, path);
     threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
     queueSize = parentColumnReader.parentReader.readQueueSize;
     pageQueue = new LinkedBlockingQueue<>((int) queueSize);
     asyncPageRead = new ConcurrentLinkedQueue<>();
   }
 
   @Override
-  protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
-      final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException {
-    if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-      try {
-        assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
-        long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
-        while (bytesToSkip > 0) {
-          long skipped = dataReader.skip(bytesToSkip);
-          if (skipped > 0) {
-            bytesToSkip -= skipped;
-          } else {
-            // no good way to handle this. Guava uses InputStream.available to check
-            // if EOF is reached and because available is not reliable,
-            // tries to read the rest of the data.
-            DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
-            if (skipBuf != null) {
-              skipBuf.release();
-            } else {
-              throw new EOFException("End of File reached.");
-            }
-          }
-        }
-      } catch (IOException e) {
-        handleAndThrowException(e, "Error Reading dictionary page.");
-      }
-    }
-  }
-
-  @Override protected void init() throws IOException {
+  protected void init() throws IOException {
     super.init();
     //Avoid Init if a shutdown is already in progress even if init() is called once
     if (!parentColumnReader.isShuttingDown) {
       asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
     }
   }
 
-  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
-    DrillBuf data;
-    boolean isDictionary = false;
-    synchronized (this) {
-      data = readStatus.getPageData();
-      readStatus.setPageData(null);
-      isDictionary = readStatus.isDictionaryPage;
-    }
-    if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) {
-      DrillBuf compressedData = data;
-      data = decompress(readStatus.getPageHeader(), compressedData);
-      synchronized (this) {
-        readStatus.setPageData(null);
-      }
-      compressedData.release();
-    } else {
-      if (isDictionary) {
-        stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
-      } else {
-        stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead);
-      }
-    }
-    return data;
+  /**
+   * Reads and stores this column chunk's dictionary page.
+   * @throws IOException
+   */
+  protected void loadDictionary(ReadStatus readStatus) throws IOException {
+    assert readStatus.isDictionaryPage();
+    assert this.dictionary == null;
+
+    // dictData is not a local because we need to release it later.
+    this.dictData = codecName == CompressionCodecName.UNCOMPRESSED
+      ? readStatus.getPageData()
+      : decompressPageV1(readStatus);
+
+    DictionaryPage page = new DictionaryPage(
+      asBytesInput(dictData, 0, pageHeader.uncompressed_page_size),
+      pageHeader.uncompressed_page_size,
+      pageHeader.dictionary_page_header.num_values,
+      valueOf(pageHeader.dictionary_page_header.encoding.name())
+    );
+
+    this.dictionary = page.getEncoding().initDictionary(columnDescriptor, page);
   }
 
-  // Read and decode the dictionary data
-  private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
-      throws UserException {
+  /**
+   * Reads a compressed v1 data page or a dictionary page, both of which are compressed
+   * in their entirety.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV1(ReadStatus readStatus) throws IOException {
+    Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int outputSize = pageHeader.getUncompressed_page_size();
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
+    long timeToRead;
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
-      pageHeader = readStatus.getPageHeader();
-      int uncompressedSize = pageHeader.getUncompressed_page_size();
-      final DrillBuf dictionaryData = getDecompressedPageData(readStatus);
-      Stopwatch timer = Stopwatch.createStarted();
-      allocatedDictionaryBuffers.add(dictionaryData);
-      DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0, uncompressedSize),
-          pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values,
-          valueOf(pageHeader.dictionary_page_header.encoding.name()));
-      this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
-      long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDictPageDecode.addAndGet(timeToDecode);
-    } catch (Exception e) {
-      handleAndThrowException(e, "Error decoding dictionary page.");
+      timer.start();
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(0, inputSize);
+      ByteBuffer output = outputPageData.nioBuffer(0, outputSize);
+
+      decomp.decompress(input, inputSize, output, outputSize);
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)
+      );
+
+      this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
+    } finally {
+      readStatus.setPageData(null);
+      if (inputPageData != null) {
+        inputPageData.release();
+      }
     }
-  }
 
-  private void handleAndThrowException(Exception e, String msg) throws UserException {
-    UserException ex = UserException.dataReadError(e).message(msg)
-        .pushContext("Row Group Start: ", this.parentColumnReader.columnChunkMetaData.getStartingPos())
-        .pushContext("Column: ", this.parentColumnReader.schemaElement.getName())
-        .pushContext("File: ", this.fileName).build(logger);
-    throw ex;
+    return outputPageData;
   }
 
-  private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) {
-    DrillBuf pageDataBuf = null;
+  /**
+   * Reads a compressed v2 data page which excluded the repetition and definition level
+   * sections from compression.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV2(ReadStatus readStatus) throws IOException {
     Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int repLevelSize = pageHeader.data_page_header_v2.getRepetition_levels_byte_length();
+    int defLevelSize = pageHeader.data_page_header_v2.getDefinition_levels_byte_length();
+    int compDataOffset = repLevelSize + defLevelSize;
+    int outputSize = pageHeader.uncompressed_page_size;
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
     long timeToRead;
-    int compressedSize = pageHeader.getCompressed_page_size();
-    int uncompressedSize = pageHeader.getUncompressed_page_size();
-    pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
       timer.start();
+      // Write out the uncompressed section
+      // Note that the following setBytes call to read the repetition and definition level sections
+      // advances readerIndex in inputPageData but not writerIndex in outputPageData.
+      outputPageData.setBytes(0, inputPageData, compDataOffset);
+
+      // decompress from the start of compressed data to the end of the input buffer
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(compDataOffset, inputSize - compDataOffset);
+      ByteBuffer output = outputPageData.nioBuffer(compDataOffset, outputSize - compDataOffset);
+      decomp.decompress(
+        input,
+        inputSize - compDataOffset,
+        output,
+        outputSize - compDataOffset
+      );
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
 
-      CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
-      BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
-      ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
-      ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)
+      );
 
-      decomp.decompress(input, compressedSize, output, uncompressedSize);
-      pageDataBuf.writerIndex(uncompressedSize);
-      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
-      this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize);
-    } catch (IOException e) {
-      handleAndThrowException(e, "Error decompressing data.");
+      this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
+    } finally {
+      readStatus.setPageData(null);
+      if (inputPageData != null) {
+        inputPageData.release();
+      }
     }
-    return pageDataBuf;
+
+    return outputPageData;
   }
 
-  @Override
-  protected void nextInternal() throws IOException {
-    ReadStatus readStatus = null;
+  private ReadStatus nextPageFromQueue() throws InterruptedException, ExecutionException {
+    ReadStatus readStatus;
+    Stopwatch timer = Stopwatch.createStarted();
+    parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
     try {
-      Stopwatch timer = Stopwatch.createStarted();
-      parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
-      try {
-        waitForExecutionResult(); // get the result of execution
-        synchronized (pageQueueSyncronize) {
-          boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
-          readStatus = pageQueue.take(); // get the data if no exception has been thrown
-          if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
-            throw new DrillRuntimeException("Unexpected end of data");
-          }
-          //if the queue was full before we took a page out, then there would
-          // have been no new read tasks scheduled. In that case, schedule a new read.
-          if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-            asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
-          }
+      waitForExecutionResult(); // get the result of execution
+      synchronized (pageQueueSyncronize) {
+        boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
+        readStatus = pageQueue.take(); // get the data if no exception has been thrown
+        if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
+          throw new DrillRuntimeException("Unexpected end of data");
+        }
+        //if the queue was full before we took a page out, then there would
+        // have been no new read tasks scheduled. In that case, schedule a new read.
+        if (!parentColumnReader.isShuttingDown && pageQueueFull) {
+          asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
         }
-      } finally {
-        parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
-      }
-      long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDiskScanWait.addAndGet(timeBlocked);
-      stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
-      if (readStatus.isDictionaryPage) {
-        stats.numDictPageLoads.incrementAndGet();
-        stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
-      } else {
-        stats.numDataPageLoads.incrementAndGet();
-        stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
       }
-      pageHeader = readStatus.getPageHeader();
+    } finally {
+      parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
+    }
 
-      // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
-      // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
-
-      do {
-        if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-          readDictionaryPageData(readStatus, parentColumnReader);
-          waitForExecutionResult(); // get the result of execution
-          synchronized (pageQueueSyncronize) {
-            boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
-            readStatus = pageQueue.take(); // get the data if no exception has been thrown
-            if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
-              break;
-            }
-            //if the queue was full before we took a page out, then there would
-            // have been no new read tasks scheduled. In that case, schedule a new read.
-            if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-              asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
-            }
-          }
-          pageHeader = readStatus.getPageHeader();
-        }
-      } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
+    long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
+    stats.timeDiskScanWait.addAndGet(timeBlocked);
+    stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
+    if (readStatus.isDictionaryPage) {
+      stats.numDictPageLoads.incrementAndGet();
+      stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+    } else {
+      stats.numDataPageLoads.incrementAndGet();
+      stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+    }
+
+    return readStatus;
+  }
 
+  @Override
+  protected void nextInternal() throws IOException {
+    try {
+      ReadStatus readStatus = nextPageFromQueue();
       pageHeader = readStatus.getPageHeader();
-      pageData = getDecompressedPageData(readStatus);
-      assert (pageData != null);
+
+      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {

Review comment:
       We can combine this if with the switch statement below.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DataPageHeaderInfoProvider.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.Encoding;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Statistics;
+
+public interface DataPageHeaderInfoProvider {
+  int getNumValues();
+
+  Encoding getEncoding();
+
+  Encoding getDefinitionLevelEncoding();

Review comment:
       Looks like this and `getRepetitionLevelEncoding` methods are used now for the v1 version, so no need in introducing them in this interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org