You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/10/18 10:13:10 UTC

[GitHub] [drill] dzamo opened a new pull request #2338: DRILL-1282 Move parquet to use v2 format as default

dzamo opened a new pull request #2338:
URL: https://github.com/apache/drill/pull/2338


   # [DRILL-1282](https://issues.apache.org/jira/browse/DRILL-1282): Move parquet to use v2 format as default
   
   ## Description
   
   Adds support for reading Parquet v2 to both the old and new reader implementations.
   
   ## Documentation
   List newly supported versions and compression codecs for Parquet.
   
   ## Testing
   New unit tests in TestParquetLogicalTypes.java.  Generate Parquet v2 files with ParquetSimpleTestFileGenerator and query them.  Advice is welcome on the unit tests, the tests in TestParquetLogicalTypes do not exercise every data type.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] lgtm-com[bot] commented on pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #2338:
URL: https://github.com/apache/drill/pull/2338#issuecomment-948680952


   This pull request **fixes 2 alerts** when merging 0e857cd4e73625359415065a193a7ff24e9ea098 into f4ea90ce3e70065c5db364c5f06452c079c151ac - [view on LGTM.com](https://lgtm.com/projects/g/apache/drill/rev/pr-ae7d551f72560d0834a67ef93fdd4caf491bc3ae)
   
   **fixed alerts:**
   
   * 2 for Type mismatch on container access


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] vdiravka commented on pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
vdiravka commented on pull request #2338:
URL: https://github.com/apache/drill/pull/2338#issuecomment-945632379


   `Some files do not have the expected license header`
   ```
   Warning:  Unknown file extension: /home/runner/work/drill/drill/.dockerignore
   Warning:  Unknown file extension: /home/runner/work/drill/drill/hooks/build
   Warning:  Unknown file extension: /home/runner/work/drill/drill/hooks/push
   Warning:  Unknown file extension: /home/runner/work/drill/drill/lombok.config
   Warning:  Unknown file extension: /home/runner/work/drill/drill/hooks/README
   Warning:  Missing header in: /home/runner/work/drill/drill/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DataPageHeaderInfoProvider.java
   Warning:  Unknown file extension: /home/runner/work/drill/drill/contrib/storage-cassandra/src/test/resources/queries.cql
   Warning:  Unknown file extension: /home/runner/work/drill/drill/contrib/storage-splunk/src/test/resources/logback-test.xml.bak
   Warning:  Unknown file extension: /home/runner/work/drill/drill/contrib/storage-druid/src/test/resources/druid/environment
   Warning:  Unknown file extension: /home/runner/work/drill/drill/contrib/format-httpd/src/test/resources/httpd/multiformat.access_log
   Warning:  Unknown file extension: /home/runner/work/drill/drill/contrib/storage-jdbc/src/test/resources/mysql_config_override/mysql_override.cnf
   ```
   https://github.com/apache/drill/pull/2338/checks?check_run_id=3925211717


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] vdiravka commented on pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
vdiravka commented on pull request #2338:
URL: https://github.com/apache/drill/pull/2338#issuecomment-948523143


   _Note:_ you can rebase this PR onto the latest master to avoid random CI failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo commented on a change in pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
dzamo commented on a change in pull request #2338:
URL: https://github.com/apache/drill/pull/2338#discussion_r733582283



##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
##########
@@ -78,209 +79,237 @@
  *
  */
 class AsyncPageReader extends PageReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+  static final Logger logger = LoggerFactory.getLogger(AsyncPageReader.class);
 
   private ExecutorService threadPool;
   private long queueSize;
   private LinkedBlockingQueue<ReadStatus> pageQueue;
   private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
   private long totalPageValuesRead = 0;
-  private Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
+  private final Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
                                                      // FindBugs complains if we synchronize on a Concurrent Queue
 
-  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
-      ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
-    super(parentStatus, fs, path, columnChunkMetaData);
+  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path) throws ExecutionSetupException {
+    super(parentStatus, fs, path);
     threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
     queueSize = parentColumnReader.parentReader.readQueueSize;
     pageQueue = new LinkedBlockingQueue<>((int) queueSize);
     asyncPageRead = new ConcurrentLinkedQueue<>();
   }
 
   @Override
-  protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
-      final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException {
-    if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-      try {
-        assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
-        long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
-        while (bytesToSkip > 0) {
-          long skipped = dataReader.skip(bytesToSkip);
-          if (skipped > 0) {
-            bytesToSkip -= skipped;
-          } else {
-            // no good way to handle this. Guava uses InputStream.available to check
-            // if EOF is reached and because available is not reliable,
-            // tries to read the rest of the data.
-            DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
-            if (skipBuf != null) {
-              skipBuf.release();
-            } else {
-              throw new EOFException("End of File reached.");
-            }
-          }
-        }
-      } catch (IOException e) {
-        handleAndThrowException(e, "Error Reading dictionary page.");
-      }
-    }
-  }
-
-  @Override protected void init() throws IOException {
+  protected void init() throws IOException {
     super.init();
     //Avoid Init if a shutdown is already in progress even if init() is called once
     if (!parentColumnReader.isShuttingDown) {
       asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
     }
   }
 
-  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
-    DrillBuf data;
-    boolean isDictionary = false;
-    synchronized (this) {
-      data = readStatus.getPageData();
-      readStatus.setPageData(null);
-      isDictionary = readStatus.isDictionaryPage;
-    }
-    if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) {
-      DrillBuf compressedData = data;
-      data = decompress(readStatus.getPageHeader(), compressedData);
-      synchronized (this) {
-        readStatus.setPageData(null);
-      }
-      compressedData.release();
-    } else {
-      if (isDictionary) {
-        stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
-      } else {
-        stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead);
-      }
-    }
-    return data;
+  /**
+   * Reads and stores this column chunk's dictionary page.
+   * @throws IOException
+   */
+  protected void loadDictionary(ReadStatus readStatus) throws IOException {
+    assert readStatus.isDictionaryPage();
+    assert this.dictionary == null;
+
+    // dictData is not a local because we need to release it later.
+    this.dictData = codecName == CompressionCodecName.UNCOMPRESSED
+      ? readStatus.getPageData()
+      : decompressPageV1(readStatus);
+
+    DictionaryPage page = new DictionaryPage(
+      asBytesInput(dictData, 0, pageHeader.uncompressed_page_size),
+      pageHeader.uncompressed_page_size,
+      pageHeader.dictionary_page_header.num_values,
+      valueOf(pageHeader.dictionary_page_header.encoding.name())
+    );
+
+    this.dictionary = page.getEncoding().initDictionary(columnDescriptor, page);
   }
 
-  // Read and decode the dictionary data
-  private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
-      throws UserException {
+  /**
+   * Reads a compressed v1 data page or a dictionary page, both of which are compressed
+   * in their entirety.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV1(ReadStatus readStatus) throws IOException {
+    Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int outputSize = pageHeader.getUncompressed_page_size();
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
+    long timeToRead;
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
-      pageHeader = readStatus.getPageHeader();
-      int uncompressedSize = pageHeader.getUncompressed_page_size();
-      final DrillBuf dictionaryData = getDecompressedPageData(readStatus);
-      Stopwatch timer = Stopwatch.createStarted();
-      allocatedDictionaryBuffers.add(dictionaryData);
-      DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0, uncompressedSize),
-          pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values,
-          valueOf(pageHeader.dictionary_page_header.encoding.name()));
-      this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
-      long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDictPageDecode.addAndGet(timeToDecode);
-    } catch (Exception e) {
-      handleAndThrowException(e, "Error decoding dictionary page.");
+      timer.start();
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(0, inputSize);
+      ByteBuffer output = outputPageData.nioBuffer(0, outputSize);
+
+      decomp.decompress(input, inputSize, output, outputSize);
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)
+      );
+
+      this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
+    } finally {
+      readStatus.setPageData(null);
+      if (inputPageData != null) {
+        inputPageData.release();
+      }
     }
-  }
 
-  private void handleAndThrowException(Exception e, String msg) throws UserException {
-    UserException ex = UserException.dataReadError(e).message(msg)
-        .pushContext("Row Group Start: ", this.parentColumnReader.columnChunkMetaData.getStartingPos())
-        .pushContext("Column: ", this.parentColumnReader.schemaElement.getName())
-        .pushContext("File: ", this.fileName).build(logger);
-    throw ex;
+    return outputPageData;
   }
 
-  private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) {
-    DrillBuf pageDataBuf = null;
+  /**
+   * Reads a compressed v2 data page which excluded the repetition and definition level
+   * sections from compression.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV2(ReadStatus readStatus) throws IOException {
     Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int repLevelSize = pageHeader.data_page_header_v2.getRepetition_levels_byte_length();
+    int defLevelSize = pageHeader.data_page_header_v2.getDefinition_levels_byte_length();
+    int compDataOffset = repLevelSize + defLevelSize;
+    int outputSize = pageHeader.uncompressed_page_size;
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
     long timeToRead;
-    int compressedSize = pageHeader.getCompressed_page_size();
-    int uncompressedSize = pageHeader.getUncompressed_page_size();
-    pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
       timer.start();
+      // Write out the uncompressed section
+      // Note that the following setBytes call to read the repetition and definition level sections
+      // advances readerIndex in inputPageData but not writerIndex in outputPageData.
+      outputPageData.setBytes(0, inputPageData, compDataOffset);
+
+      // decompress from the start of compressed data to the end of the input buffer
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(compDataOffset, inputSize - compDataOffset);
+      ByteBuffer output = outputPageData.nioBuffer(compDataOffset, outputSize - compDataOffset);
+      decomp.decompress(
+        input,
+        inputSize - compDataOffset,
+        output,
+        outputSize - compDataOffset
+      );
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
 
-      CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
-      BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
-      ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
-      ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)
+      );
 
-      decomp.decompress(input, compressedSize, output, uncompressedSize);
-      pageDataBuf.writerIndex(uncompressedSize);
-      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
-      this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize);
-    } catch (IOException e) {
-      handleAndThrowException(e, "Error decompressing data.");
+      this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
+    } finally {
+      readStatus.setPageData(null);
+      if (inputPageData != null) {
+        inputPageData.release();
+      }
     }
-    return pageDataBuf;
+
+    return outputPageData;
   }
 
-  @Override
-  protected void nextInternal() throws IOException {
-    ReadStatus readStatus = null;
+  private ReadStatus nextPageFromQueue() throws InterruptedException, ExecutionException {
+    ReadStatus readStatus;
+    Stopwatch timer = Stopwatch.createStarted();
+    parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
     try {
-      Stopwatch timer = Stopwatch.createStarted();
-      parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
-      try {
-        waitForExecutionResult(); // get the result of execution
-        synchronized (pageQueueSyncronize) {
-          boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
-          readStatus = pageQueue.take(); // get the data if no exception has been thrown
-          if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
-            throw new DrillRuntimeException("Unexpected end of data");
-          }
-          //if the queue was full before we took a page out, then there would
-          // have been no new read tasks scheduled. In that case, schedule a new read.
-          if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-            asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
-          }
+      waitForExecutionResult(); // get the result of execution
+      synchronized (pageQueueSyncronize) {
+        boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
+        readStatus = pageQueue.take(); // get the data if no exception has been thrown
+        if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
+          throw new DrillRuntimeException("Unexpected end of data");
+        }
+        //if the queue was full before we took a page out, then there would
+        // have been no new read tasks scheduled. In that case, schedule a new read.
+        if (!parentColumnReader.isShuttingDown && pageQueueFull) {
+          asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
         }
-      } finally {
-        parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
-      }
-      long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDiskScanWait.addAndGet(timeBlocked);
-      stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
-      if (readStatus.isDictionaryPage) {
-        stats.numDictPageLoads.incrementAndGet();
-        stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
-      } else {
-        stats.numDataPageLoads.incrementAndGet();
-        stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
       }
-      pageHeader = readStatus.getPageHeader();
+    } finally {
+      parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
+    }
 
-      // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
-      // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
-
-      do {
-        if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-          readDictionaryPageData(readStatus, parentColumnReader);
-          waitForExecutionResult(); // get the result of execution
-          synchronized (pageQueueSyncronize) {
-            boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
-            readStatus = pageQueue.take(); // get the data if no exception has been thrown
-            if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
-              break;
-            }
-            //if the queue was full before we took a page out, then there would
-            // have been no new read tasks scheduled. In that case, schedule a new read.
-            if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-              asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
-            }
-          }
-          pageHeader = readStatus.getPageHeader();
-        }
-      } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
+    long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
+    stats.timeDiskScanWait.addAndGet(timeBlocked);
+    stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
+    if (readStatus.isDictionaryPage) {
+      stats.numDictPageLoads.incrementAndGet();
+      stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+    } else {
+      stats.numDataPageLoads.incrementAndGet();
+      stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+    }
+
+    return readStatus;
+  }
 
+  @Override
+  protected void nextInternal() throws IOException {
+    try {
+      ReadStatus readStatus = nextPageFromQueue();
       pageHeader = readStatus.getPageHeader();
-      pageData = getDecompressedPageData(readStatus);
-      assert (pageData != null);
+
+      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {

Review comment:
       @vvysotskyi I thought the same thing in an earlier version, but I now don't think it would work.  The switch expression evaluates `pageHeader.getType()`, and the DICTIONARY_PAGE case *modifies* pageHeader because after loding the dictionary it loads another page.  So if we fell through from DICTIONARY_PAGE we'd need the switch expression to reevaluate `pageHeader.getType()` and I don't think it will do that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo commented on a change in pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
dzamo commented on a change in pull request #2338:
URL: https://github.com/apache/drill/pull/2338#discussion_r733575308



##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DataPageHeaderInfoProvider.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.Encoding;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Statistics;
+
+public interface DataPageHeaderInfoProvider {
+  int getNumValues();
+
+  Encoding getEncoding();
+
+  Encoding getDefinitionLevelEncoding();

Review comment:
       @vvysotskyi these two methods were already in the interface, I didn't add them.  But I agree that, unfortunately, they are not helpful because v2 rep and def level decoding requires special treatment anyway.  So should I remove them from the interface?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo merged pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
dzamo merged pull request #2338:
URL: https://github.com/apache/drill/pull/2338


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] lgtm-com[bot] commented on pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #2338:
URL: https://github.com/apache/drill/pull/2338#issuecomment-946045312


   This pull request **fixes 2 alerts** when merging 168490ae4dbdca5394e75fe78b50197a3e9a22ec into 0c9451e6720e5028e1187067cc6d1957ff998bef - [view on LGTM.com](https://lgtm.com/projects/g/apache/drill/rev/pr-71100031f326e783f721df2c35abaf9257204f6c)
   
   **fixed alerts:**
   
   * 2 for Type mismatch on container access


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo commented on pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
dzamo commented on pull request #2338:
URL: https://github.com/apache/drill/pull/2338#issuecomment-946411852


   @vvysotskyi I believe the remaining build failures are due to our CI build problems, not this PR.
   
   PS I plan to add more inline comments to the new code here when I address review comments. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] vvysotskyi commented on a change in pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
vvysotskyi commented on a change in pull request #2338:
URL: https://github.com/apache/drill/pull/2338#discussion_r732979821



##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
##########
@@ -78,209 +79,237 @@
  *
  */
 class AsyncPageReader extends PageReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+  static final Logger logger = LoggerFactory.getLogger(AsyncPageReader.class);
 
   private ExecutorService threadPool;
   private long queueSize;
   private LinkedBlockingQueue<ReadStatus> pageQueue;
   private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
   private long totalPageValuesRead = 0;
-  private Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
+  private final Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
                                                      // FindBugs complains if we synchronize on a Concurrent Queue
 
-  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
-      ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
-    super(parentStatus, fs, path, columnChunkMetaData);
+  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path) throws ExecutionSetupException {
+    super(parentStatus, fs, path);
     threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
     queueSize = parentColumnReader.parentReader.readQueueSize;
     pageQueue = new LinkedBlockingQueue<>((int) queueSize);
     asyncPageRead = new ConcurrentLinkedQueue<>();
   }
 
   @Override
-  protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
-      final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException {
-    if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-      try {
-        assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
-        long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
-        while (bytesToSkip > 0) {
-          long skipped = dataReader.skip(bytesToSkip);
-          if (skipped > 0) {
-            bytesToSkip -= skipped;
-          } else {
-            // no good way to handle this. Guava uses InputStream.available to check
-            // if EOF is reached and because available is not reliable,
-            // tries to read the rest of the data.
-            DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
-            if (skipBuf != null) {
-              skipBuf.release();
-            } else {
-              throw new EOFException("End of File reached.");
-            }
-          }
-        }
-      } catch (IOException e) {
-        handleAndThrowException(e, "Error Reading dictionary page.");
-      }
-    }
-  }
-
-  @Override protected void init() throws IOException {
+  protected void init() throws IOException {
     super.init();
     //Avoid Init if a shutdown is already in progress even if init() is called once
     if (!parentColumnReader.isShuttingDown) {
       asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
     }
   }
 
-  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
-    DrillBuf data;
-    boolean isDictionary = false;
-    synchronized (this) {
-      data = readStatus.getPageData();
-      readStatus.setPageData(null);
-      isDictionary = readStatus.isDictionaryPage;
-    }
-    if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) {
-      DrillBuf compressedData = data;
-      data = decompress(readStatus.getPageHeader(), compressedData);
-      synchronized (this) {
-        readStatus.setPageData(null);
-      }
-      compressedData.release();
-    } else {
-      if (isDictionary) {
-        stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
-      } else {
-        stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead);
-      }
-    }
-    return data;
+  /**
+   * Reads and stores this column chunk's dictionary page.
+   * @throws IOException
+   */
+  protected void loadDictionary(ReadStatus readStatus) throws IOException {
+    assert readStatus.isDictionaryPage();
+    assert this.dictionary == null;
+
+    // dictData is not a local because we need to release it later.
+    this.dictData = codecName == CompressionCodecName.UNCOMPRESSED
+      ? readStatus.getPageData()
+      : decompressPageV1(readStatus);
+
+    DictionaryPage page = new DictionaryPage(
+      asBytesInput(dictData, 0, pageHeader.uncompressed_page_size),
+      pageHeader.uncompressed_page_size,
+      pageHeader.dictionary_page_header.num_values,
+      valueOf(pageHeader.dictionary_page_header.encoding.name())
+    );
+
+    this.dictionary = page.getEncoding().initDictionary(columnDescriptor, page);
   }
 
-  // Read and decode the dictionary data
-  private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
-      throws UserException {
+  /**
+   * Reads a compressed v1 data page or a dictionary page, both of which are compressed
+   * in their entirety.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV1(ReadStatus readStatus) throws IOException {
+    Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int outputSize = pageHeader.getUncompressed_page_size();
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
+    long timeToRead;
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
-      pageHeader = readStatus.getPageHeader();
-      int uncompressedSize = pageHeader.getUncompressed_page_size();
-      final DrillBuf dictionaryData = getDecompressedPageData(readStatus);
-      Stopwatch timer = Stopwatch.createStarted();
-      allocatedDictionaryBuffers.add(dictionaryData);
-      DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0, uncompressedSize),
-          pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values,
-          valueOf(pageHeader.dictionary_page_header.encoding.name()));
-      this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
-      long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDictPageDecode.addAndGet(timeToDecode);
-    } catch (Exception e) {
-      handleAndThrowException(e, "Error decoding dictionary page.");
+      timer.start();
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(0, inputSize);
+      ByteBuffer output = outputPageData.nioBuffer(0, outputSize);
+
+      decomp.decompress(input, inputSize, output, outputSize);
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)
+      );
+
+      this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
+    } finally {
+      readStatus.setPageData(null);
+      if (inputPageData != null) {
+        inputPageData.release();
+      }
     }
-  }
 
-  private void handleAndThrowException(Exception e, String msg) throws UserException {
-    UserException ex = UserException.dataReadError(e).message(msg)
-        .pushContext("Row Group Start: ", this.parentColumnReader.columnChunkMetaData.getStartingPos())
-        .pushContext("Column: ", this.parentColumnReader.schemaElement.getName())
-        .pushContext("File: ", this.fileName).build(logger);
-    throw ex;
+    return outputPageData;
   }
 
-  private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) {
-    DrillBuf pageDataBuf = null;
+  /**
+   * Reads a compressed v2 data page which excluded the repetition and definition level
+   * sections from compression.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV2(ReadStatus readStatus) throws IOException {
     Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int repLevelSize = pageHeader.data_page_header_v2.getRepetition_levels_byte_length();
+    int defLevelSize = pageHeader.data_page_header_v2.getDefinition_levels_byte_length();
+    int compDataOffset = repLevelSize + defLevelSize;
+    int outputSize = pageHeader.uncompressed_page_size;
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
     long timeToRead;
-    int compressedSize = pageHeader.getCompressed_page_size();
-    int uncompressedSize = pageHeader.getUncompressed_page_size();
-    pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
       timer.start();
+      // Write out the uncompressed section
+      // Note that the following setBytes call to read the repetition and definition level sections
+      // advances readerIndex in inputPageData but not writerIndex in outputPageData.
+      outputPageData.setBytes(0, inputPageData, compDataOffset);
+
+      // decompress from the start of compressed data to the end of the input buffer
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(compDataOffset, inputSize - compDataOffset);
+      ByteBuffer output = outputPageData.nioBuffer(compDataOffset, outputSize - compDataOffset);
+      decomp.decompress(
+        input,
+        inputSize - compDataOffset,
+        output,
+        outputSize - compDataOffset
+      );
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
 
-      CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
-      BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
-      ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
-      ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)

Review comment:
       This method will be called regardless of the logging level...

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
##########
@@ -29,14 +33,27 @@
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
 import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 public abstract class ColumnReader<V extends ValueVector> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnReader.class);
+  public static final Set<Encoding> DICTIONARY_ENCODINGS = new HashSet<>(Arrays.asList(
+    Encoding.PLAIN_DICTIONARY,
+    Encoding.RLE_DICTIONARY
+  ));

Review comment:
       You can use `ImmutableSet` here, since it is public:
   ```suggestion
     public static final Set<Encoding> DICTIONARY_ENCODINGS = ImmutableSet.of(
       Encoding.PLAIN_DICTIONARY,
       Encoding.RLE_DICTIONARY
     );
   ```

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
##########
@@ -29,14 +33,27 @@
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
 import org.apache.parquet.format.SchemaElement;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 public abstract class ColumnReader<V extends ValueVector> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnReader.class);
+  public static final Set<Encoding> DICTIONARY_ENCODINGS = new HashSet<>(Arrays.asList(
+    Encoding.PLAIN_DICTIONARY,
+    Encoding.RLE_DICTIONARY
+  ));
+  public static final Set<Encoding> VALUE_ENCODINGS = Sets.union(
+    DICTIONARY_ENCODINGS,
+    new HashSet<>(Arrays.asList(
+      Encoding.DELTA_BINARY_PACKED,
+      Encoding.DELTA_BYTE_ARRAY,
+      Encoding.DELTA_LENGTH_BYTE_ARRAY
+  )));

Review comment:
       And here the same:
   ```suggestion
     public static final Set<Encoding> VALUE_ENCODINGS = ImmutableSet.<Encoding>builder()
       .addAll(DICTIONARY_ENCODINGS)
       .add(Encoding.DELTA_BINARY_PACKED)
       .add(Encoding.DELTA_BYTE_ARRAY)
       .add(Encoding.DELTA_LENGTH_BYTE_ARRAY)
       .build();
   ```

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
##########
@@ -78,209 +79,237 @@
  *
  */
 class AsyncPageReader extends PageReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+  static final Logger logger = LoggerFactory.getLogger(AsyncPageReader.class);
 
   private ExecutorService threadPool;
   private long queueSize;
   private LinkedBlockingQueue<ReadStatus> pageQueue;
   private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
   private long totalPageValuesRead = 0;
-  private Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
+  private final Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
                                                      // FindBugs complains if we synchronize on a Concurrent Queue
 
-  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
-      ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
-    super(parentStatus, fs, path, columnChunkMetaData);
+  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path) throws ExecutionSetupException {
+    super(parentStatus, fs, path);
     threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
     queueSize = parentColumnReader.parentReader.readQueueSize;
     pageQueue = new LinkedBlockingQueue<>((int) queueSize);
     asyncPageRead = new ConcurrentLinkedQueue<>();
   }
 
   @Override
-  protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
-      final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException {
-    if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-      try {
-        assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
-        long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
-        while (bytesToSkip > 0) {
-          long skipped = dataReader.skip(bytesToSkip);
-          if (skipped > 0) {
-            bytesToSkip -= skipped;
-          } else {
-            // no good way to handle this. Guava uses InputStream.available to check
-            // if EOF is reached and because available is not reliable,
-            // tries to read the rest of the data.
-            DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
-            if (skipBuf != null) {
-              skipBuf.release();
-            } else {
-              throw new EOFException("End of File reached.");
-            }
-          }
-        }
-      } catch (IOException e) {
-        handleAndThrowException(e, "Error Reading dictionary page.");
-      }
-    }
-  }
-
-  @Override protected void init() throws IOException {
+  protected void init() throws IOException {
     super.init();
     //Avoid Init if a shutdown is already in progress even if init() is called once
     if (!parentColumnReader.isShuttingDown) {
       asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
     }
   }
 
-  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
-    DrillBuf data;
-    boolean isDictionary = false;
-    synchronized (this) {
-      data = readStatus.getPageData();
-      readStatus.setPageData(null);
-      isDictionary = readStatus.isDictionaryPage;
-    }
-    if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) {
-      DrillBuf compressedData = data;
-      data = decompress(readStatus.getPageHeader(), compressedData);
-      synchronized (this) {
-        readStatus.setPageData(null);
-      }
-      compressedData.release();
-    } else {
-      if (isDictionary) {
-        stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
-      } else {
-        stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead);
-      }
-    }
-    return data;
+  /**
+   * Reads and stores this column chunk's dictionary page.
+   * @throws IOException
+   */
+  protected void loadDictionary(ReadStatus readStatus) throws IOException {
+    assert readStatus.isDictionaryPage();
+    assert this.dictionary == null;
+
+    // dictData is not a local because we need to release it later.
+    this.dictData = codecName == CompressionCodecName.UNCOMPRESSED
+      ? readStatus.getPageData()
+      : decompressPageV1(readStatus);
+
+    DictionaryPage page = new DictionaryPage(
+      asBytesInput(dictData, 0, pageHeader.uncompressed_page_size),
+      pageHeader.uncompressed_page_size,
+      pageHeader.dictionary_page_header.num_values,
+      valueOf(pageHeader.dictionary_page_header.encoding.name())
+    );
+
+    this.dictionary = page.getEncoding().initDictionary(columnDescriptor, page);
   }
 
-  // Read and decode the dictionary data
-  private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
-      throws UserException {
+  /**
+   * Reads a compressed v1 data page or a dictionary page, both of which are compressed
+   * in their entirety.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV1(ReadStatus readStatus) throws IOException {
+    Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int outputSize = pageHeader.getUncompressed_page_size();
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
+    long timeToRead;
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
-      pageHeader = readStatus.getPageHeader();
-      int uncompressedSize = pageHeader.getUncompressed_page_size();
-      final DrillBuf dictionaryData = getDecompressedPageData(readStatus);
-      Stopwatch timer = Stopwatch.createStarted();
-      allocatedDictionaryBuffers.add(dictionaryData);
-      DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0, uncompressedSize),
-          pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values,
-          valueOf(pageHeader.dictionary_page_header.encoding.name()));
-      this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
-      long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDictPageDecode.addAndGet(timeToDecode);
-    } catch (Exception e) {
-      handleAndThrowException(e, "Error decoding dictionary page.");
+      timer.start();
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(0, inputSize);
+      ByteBuffer output = outputPageData.nioBuffer(0, outputSize);
+
+      decomp.decompress(input, inputSize, output, outputSize);
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)

Review comment:
       And this call too.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
##########
@@ -78,209 +79,237 @@
  *
  */
 class AsyncPageReader extends PageReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+  static final Logger logger = LoggerFactory.getLogger(AsyncPageReader.class);
 
   private ExecutorService threadPool;
   private long queueSize;
   private LinkedBlockingQueue<ReadStatus> pageQueue;
   private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
   private long totalPageValuesRead = 0;
-  private Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
+  private final Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
                                                      // FindBugs complains if we synchronize on a Concurrent Queue
 
-  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
-      ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
-    super(parentStatus, fs, path, columnChunkMetaData);
+  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path) throws ExecutionSetupException {
+    super(parentStatus, fs, path);
     threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
     queueSize = parentColumnReader.parentReader.readQueueSize;
     pageQueue = new LinkedBlockingQueue<>((int) queueSize);
     asyncPageRead = new ConcurrentLinkedQueue<>();
   }
 
   @Override
-  protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
-      final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException {
-    if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-      try {
-        assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
-        long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
-        while (bytesToSkip > 0) {
-          long skipped = dataReader.skip(bytesToSkip);
-          if (skipped > 0) {
-            bytesToSkip -= skipped;
-          } else {
-            // no good way to handle this. Guava uses InputStream.available to check
-            // if EOF is reached and because available is not reliable,
-            // tries to read the rest of the data.
-            DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
-            if (skipBuf != null) {
-              skipBuf.release();
-            } else {
-              throw new EOFException("End of File reached.");
-            }
-          }
-        }
-      } catch (IOException e) {
-        handleAndThrowException(e, "Error Reading dictionary page.");
-      }
-    }
-  }
-
-  @Override protected void init() throws IOException {
+  protected void init() throws IOException {
     super.init();
     //Avoid Init if a shutdown is already in progress even if init() is called once
     if (!parentColumnReader.isShuttingDown) {
       asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
     }
   }
 
-  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
-    DrillBuf data;
-    boolean isDictionary = false;
-    synchronized (this) {
-      data = readStatus.getPageData();
-      readStatus.setPageData(null);
-      isDictionary = readStatus.isDictionaryPage;
-    }
-    if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) {
-      DrillBuf compressedData = data;
-      data = decompress(readStatus.getPageHeader(), compressedData);
-      synchronized (this) {
-        readStatus.setPageData(null);
-      }
-      compressedData.release();
-    } else {
-      if (isDictionary) {
-        stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
-      } else {
-        stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead);
-      }
-    }
-    return data;
+  /**
+   * Reads and stores this column chunk's dictionary page.
+   * @throws IOException
+   */
+  protected void loadDictionary(ReadStatus readStatus) throws IOException {
+    assert readStatus.isDictionaryPage();
+    assert this.dictionary == null;
+
+    // dictData is not a local because we need to release it later.
+    this.dictData = codecName == CompressionCodecName.UNCOMPRESSED
+      ? readStatus.getPageData()
+      : decompressPageV1(readStatus);
+
+    DictionaryPage page = new DictionaryPage(
+      asBytesInput(dictData, 0, pageHeader.uncompressed_page_size),
+      pageHeader.uncompressed_page_size,
+      pageHeader.dictionary_page_header.num_values,
+      valueOf(pageHeader.dictionary_page_header.encoding.name())
+    );
+
+    this.dictionary = page.getEncoding().initDictionary(columnDescriptor, page);
   }
 
-  // Read and decode the dictionary data
-  private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
-      throws UserException {
+  /**
+   * Reads a compressed v1 data page or a dictionary page, both of which are compressed
+   * in their entirety.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV1(ReadStatus readStatus) throws IOException {
+    Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int outputSize = pageHeader.getUncompressed_page_size();
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
+    long timeToRead;
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
-      pageHeader = readStatus.getPageHeader();
-      int uncompressedSize = pageHeader.getUncompressed_page_size();
-      final DrillBuf dictionaryData = getDecompressedPageData(readStatus);
-      Stopwatch timer = Stopwatch.createStarted();
-      allocatedDictionaryBuffers.add(dictionaryData);
-      DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0, uncompressedSize),
-          pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values,
-          valueOf(pageHeader.dictionary_page_header.encoding.name()));
-      this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
-      long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDictPageDecode.addAndGet(timeToDecode);
-    } catch (Exception e) {
-      handleAndThrowException(e, "Error decoding dictionary page.");
+      timer.start();
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(0, inputSize);
+      ByteBuffer output = outputPageData.nioBuffer(0, outputSize);
+
+      decomp.decompress(input, inputSize, output, outputSize);
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)
+      );
+
+      this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
+    } finally {
+      readStatus.setPageData(null);
+      if (inputPageData != null) {
+        inputPageData.release();
+      }
     }
-  }
 
-  private void handleAndThrowException(Exception e, String msg) throws UserException {
-    UserException ex = UserException.dataReadError(e).message(msg)
-        .pushContext("Row Group Start: ", this.parentColumnReader.columnChunkMetaData.getStartingPos())
-        .pushContext("Column: ", this.parentColumnReader.schemaElement.getName())
-        .pushContext("File: ", this.fileName).build(logger);
-    throw ex;
+    return outputPageData;
   }
 
-  private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) {
-    DrillBuf pageDataBuf = null;
+  /**
+   * Reads a compressed v2 data page which excluded the repetition and definition level
+   * sections from compression.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV2(ReadStatus readStatus) throws IOException {
     Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int repLevelSize = pageHeader.data_page_header_v2.getRepetition_levels_byte_length();
+    int defLevelSize = pageHeader.data_page_header_v2.getDefinition_levels_byte_length();
+    int compDataOffset = repLevelSize + defLevelSize;
+    int outputSize = pageHeader.uncompressed_page_size;
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
     long timeToRead;
-    int compressedSize = pageHeader.getCompressed_page_size();
-    int uncompressedSize = pageHeader.getUncompressed_page_size();
-    pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
       timer.start();
+      // Write out the uncompressed section
+      // Note that the following setBytes call to read the repetition and definition level sections
+      // advances readerIndex in inputPageData but not writerIndex in outputPageData.
+      outputPageData.setBytes(0, inputPageData, compDataOffset);
+
+      // decompress from the start of compressed data to the end of the input buffer
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(compDataOffset, inputSize - compDataOffset);
+      ByteBuffer output = outputPageData.nioBuffer(compDataOffset, outputSize - compDataOffset);
+      decomp.decompress(
+        input,
+        inputSize - compDataOffset,
+        output,
+        outputSize - compDataOffset
+      );
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
 
-      CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
-      BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
-      ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
-      ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)
+      );
 
-      decomp.decompress(input, compressedSize, output, uncompressedSize);
-      pageDataBuf.writerIndex(uncompressedSize);
-      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
-      this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize);
-    } catch (IOException e) {
-      handleAndThrowException(e, "Error decompressing data.");
+      this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
+    } finally {
+      readStatus.setPageData(null);
+      if (inputPageData != null) {
+        inputPageData.release();
+      }
     }
-    return pageDataBuf;
+
+    return outputPageData;
   }
 
-  @Override
-  protected void nextInternal() throws IOException {
-    ReadStatus readStatus = null;
+  private ReadStatus nextPageFromQueue() throws InterruptedException, ExecutionException {
+    ReadStatus readStatus;
+    Stopwatch timer = Stopwatch.createStarted();
+    parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
     try {
-      Stopwatch timer = Stopwatch.createStarted();
-      parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
-      try {
-        waitForExecutionResult(); // get the result of execution
-        synchronized (pageQueueSyncronize) {
-          boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
-          readStatus = pageQueue.take(); // get the data if no exception has been thrown
-          if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
-            throw new DrillRuntimeException("Unexpected end of data");
-          }
-          //if the queue was full before we took a page out, then there would
-          // have been no new read tasks scheduled. In that case, schedule a new read.
-          if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-            asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
-          }
+      waitForExecutionResult(); // get the result of execution
+      synchronized (pageQueueSyncronize) {
+        boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
+        readStatus = pageQueue.take(); // get the data if no exception has been thrown
+        if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
+          throw new DrillRuntimeException("Unexpected end of data");
+        }
+        //if the queue was full before we took a page out, then there would
+        // have been no new read tasks scheduled. In that case, schedule a new read.
+        if (!parentColumnReader.isShuttingDown && pageQueueFull) {
+          asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
         }
-      } finally {
-        parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
-      }
-      long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDiskScanWait.addAndGet(timeBlocked);
-      stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
-      if (readStatus.isDictionaryPage) {
-        stats.numDictPageLoads.incrementAndGet();
-        stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
-      } else {
-        stats.numDataPageLoads.incrementAndGet();
-        stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
       }
-      pageHeader = readStatus.getPageHeader();
+    } finally {
+      parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
+    }
 
-      // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
-      // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
-
-      do {
-        if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-          readDictionaryPageData(readStatus, parentColumnReader);
-          waitForExecutionResult(); // get the result of execution
-          synchronized (pageQueueSyncronize) {
-            boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
-            readStatus = pageQueue.take(); // get the data if no exception has been thrown
-            if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
-              break;
-            }
-            //if the queue was full before we took a page out, then there would
-            // have been no new read tasks scheduled. In that case, schedule a new read.
-            if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-              asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
-            }
-          }
-          pageHeader = readStatus.getPageHeader();
-        }
-      } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
+    long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
+    stats.timeDiskScanWait.addAndGet(timeBlocked);
+    stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
+    if (readStatus.isDictionaryPage) {
+      stats.numDictPageLoads.incrementAndGet();
+      stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+    } else {
+      stats.numDataPageLoads.incrementAndGet();
+      stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+    }
+
+    return readStatus;
+  }
 
+  @Override
+  protected void nextInternal() throws IOException {
+    try {
+      ReadStatus readStatus = nextPageFromQueue();
       pageHeader = readStatus.getPageHeader();
-      pageData = getDecompressedPageData(readStatus);
-      assert (pageData != null);
+
+      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {

Review comment:
       We can combine this if with the switch statement below.

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DataPageHeaderInfoProvider.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.Encoding;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Statistics;
+
+public interface DataPageHeaderInfoProvider {
+  int getNumValues();
+
+  Encoding getEncoding();
+
+  Encoding getDefinitionLevelEncoding();

Review comment:
       Looks like this and `getRepetitionLevelEncoding` methods are used now for the v1 version, so no need in introducing them in this interface.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo commented on a change in pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
dzamo commented on a change in pull request #2338:
URL: https://github.com/apache/drill/pull/2338#discussion_r733582283



##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
##########
@@ -78,209 +79,237 @@
  *
  */
 class AsyncPageReader extends PageReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+  static final Logger logger = LoggerFactory.getLogger(AsyncPageReader.class);
 
   private ExecutorService threadPool;
   private long queueSize;
   private LinkedBlockingQueue<ReadStatus> pageQueue;
   private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
   private long totalPageValuesRead = 0;
-  private Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
+  private final Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
                                                      // FindBugs complains if we synchronize on a Concurrent Queue
 
-  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
-      ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
-    super(parentStatus, fs, path, columnChunkMetaData);
+  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path) throws ExecutionSetupException {
+    super(parentStatus, fs, path);
     threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
     queueSize = parentColumnReader.parentReader.readQueueSize;
     pageQueue = new LinkedBlockingQueue<>((int) queueSize);
     asyncPageRead = new ConcurrentLinkedQueue<>();
   }
 
   @Override
-  protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
-      final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException {
-    if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-      try {
-        assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
-        long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
-        while (bytesToSkip > 0) {
-          long skipped = dataReader.skip(bytesToSkip);
-          if (skipped > 0) {
-            bytesToSkip -= skipped;
-          } else {
-            // no good way to handle this. Guava uses InputStream.available to check
-            // if EOF is reached and because available is not reliable,
-            // tries to read the rest of the data.
-            DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
-            if (skipBuf != null) {
-              skipBuf.release();
-            } else {
-              throw new EOFException("End of File reached.");
-            }
-          }
-        }
-      } catch (IOException e) {
-        handleAndThrowException(e, "Error Reading dictionary page.");
-      }
-    }
-  }
-
-  @Override protected void init() throws IOException {
+  protected void init() throws IOException {
     super.init();
     //Avoid Init if a shutdown is already in progress even if init() is called once
     if (!parentColumnReader.isShuttingDown) {
       asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
     }
   }
 
-  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
-    DrillBuf data;
-    boolean isDictionary = false;
-    synchronized (this) {
-      data = readStatus.getPageData();
-      readStatus.setPageData(null);
-      isDictionary = readStatus.isDictionaryPage;
-    }
-    if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) {
-      DrillBuf compressedData = data;
-      data = decompress(readStatus.getPageHeader(), compressedData);
-      synchronized (this) {
-        readStatus.setPageData(null);
-      }
-      compressedData.release();
-    } else {
-      if (isDictionary) {
-        stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
-      } else {
-        stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead);
-      }
-    }
-    return data;
+  /**
+   * Reads and stores this column chunk's dictionary page.
+   * @throws IOException
+   */
+  protected void loadDictionary(ReadStatus readStatus) throws IOException {
+    assert readStatus.isDictionaryPage();
+    assert this.dictionary == null;
+
+    // dictData is not a local because we need to release it later.
+    this.dictData = codecName == CompressionCodecName.UNCOMPRESSED
+      ? readStatus.getPageData()
+      : decompressPageV1(readStatus);
+
+    DictionaryPage page = new DictionaryPage(
+      asBytesInput(dictData, 0, pageHeader.uncompressed_page_size),
+      pageHeader.uncompressed_page_size,
+      pageHeader.dictionary_page_header.num_values,
+      valueOf(pageHeader.dictionary_page_header.encoding.name())
+    );
+
+    this.dictionary = page.getEncoding().initDictionary(columnDescriptor, page);
   }
 
-  // Read and decode the dictionary data
-  private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
-      throws UserException {
+  /**
+   * Reads a compressed v1 data page or a dictionary page, both of which are compressed
+   * in their entirety.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV1(ReadStatus readStatus) throws IOException {
+    Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int outputSize = pageHeader.getUncompressed_page_size();
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
+    long timeToRead;
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
-      pageHeader = readStatus.getPageHeader();
-      int uncompressedSize = pageHeader.getUncompressed_page_size();
-      final DrillBuf dictionaryData = getDecompressedPageData(readStatus);
-      Stopwatch timer = Stopwatch.createStarted();
-      allocatedDictionaryBuffers.add(dictionaryData);
-      DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0, uncompressedSize),
-          pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values,
-          valueOf(pageHeader.dictionary_page_header.encoding.name()));
-      this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
-      long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDictPageDecode.addAndGet(timeToDecode);
-    } catch (Exception e) {
-      handleAndThrowException(e, "Error decoding dictionary page.");
+      timer.start();
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(0, inputSize);
+      ByteBuffer output = outputPageData.nioBuffer(0, outputSize);
+
+      decomp.decompress(input, inputSize, output, outputSize);
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)
+      );
+
+      this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
+    } finally {
+      readStatus.setPageData(null);
+      if (inputPageData != null) {
+        inputPageData.release();
+      }
     }
-  }
 
-  private void handleAndThrowException(Exception e, String msg) throws UserException {
-    UserException ex = UserException.dataReadError(e).message(msg)
-        .pushContext("Row Group Start: ", this.parentColumnReader.columnChunkMetaData.getStartingPos())
-        .pushContext("Column: ", this.parentColumnReader.schemaElement.getName())
-        .pushContext("File: ", this.fileName).build(logger);
-    throw ex;
+    return outputPageData;
   }
 
-  private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) {
-    DrillBuf pageDataBuf = null;
+  /**
+   * Reads a compressed v2 data page which excluded the repetition and definition level
+   * sections from compression.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV2(ReadStatus readStatus) throws IOException {
     Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int repLevelSize = pageHeader.data_page_header_v2.getRepetition_levels_byte_length();
+    int defLevelSize = pageHeader.data_page_header_v2.getDefinition_levels_byte_length();
+    int compDataOffset = repLevelSize + defLevelSize;
+    int outputSize = pageHeader.uncompressed_page_size;
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
     long timeToRead;
-    int compressedSize = pageHeader.getCompressed_page_size();
-    int uncompressedSize = pageHeader.getUncompressed_page_size();
-    pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
       timer.start();
+      // Write out the uncompressed section
+      // Note that the following setBytes call to read the repetition and definition level sections
+      // advances readerIndex in inputPageData but not writerIndex in outputPageData.
+      outputPageData.setBytes(0, inputPageData, compDataOffset);
+
+      // decompress from the start of compressed data to the end of the input buffer
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(compDataOffset, inputSize - compDataOffset);
+      ByteBuffer output = outputPageData.nioBuffer(compDataOffset, outputSize - compDataOffset);
+      decomp.decompress(
+        input,
+        inputSize - compDataOffset,
+        output,
+        outputSize - compDataOffset
+      );
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
 
-      CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
-      BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
-      ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
-      ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)
+      );
 
-      decomp.decompress(input, compressedSize, output, uncompressedSize);
-      pageDataBuf.writerIndex(uncompressedSize);
-      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
-      this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize);
-    } catch (IOException e) {
-      handleAndThrowException(e, "Error decompressing data.");
+      this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
+    } finally {
+      readStatus.setPageData(null);
+      if (inputPageData != null) {
+        inputPageData.release();
+      }
     }
-    return pageDataBuf;
+
+    return outputPageData;
   }
 
-  @Override
-  protected void nextInternal() throws IOException {
-    ReadStatus readStatus = null;
+  private ReadStatus nextPageFromQueue() throws InterruptedException, ExecutionException {
+    ReadStatus readStatus;
+    Stopwatch timer = Stopwatch.createStarted();
+    parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
     try {
-      Stopwatch timer = Stopwatch.createStarted();
-      parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
-      try {
-        waitForExecutionResult(); // get the result of execution
-        synchronized (pageQueueSyncronize) {
-          boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
-          readStatus = pageQueue.take(); // get the data if no exception has been thrown
-          if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
-            throw new DrillRuntimeException("Unexpected end of data");
-          }
-          //if the queue was full before we took a page out, then there would
-          // have been no new read tasks scheduled. In that case, schedule a new read.
-          if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-            asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
-          }
+      waitForExecutionResult(); // get the result of execution
+      synchronized (pageQueueSyncronize) {
+        boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
+        readStatus = pageQueue.take(); // get the data if no exception has been thrown
+        if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
+          throw new DrillRuntimeException("Unexpected end of data");
+        }
+        //if the queue was full before we took a page out, then there would
+        // have been no new read tasks scheduled. In that case, schedule a new read.
+        if (!parentColumnReader.isShuttingDown && pageQueueFull) {
+          asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
         }
-      } finally {
-        parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
-      }
-      long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDiskScanWait.addAndGet(timeBlocked);
-      stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
-      if (readStatus.isDictionaryPage) {
-        stats.numDictPageLoads.incrementAndGet();
-        stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
-      } else {
-        stats.numDataPageLoads.incrementAndGet();
-        stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
       }
-      pageHeader = readStatus.getPageHeader();
+    } finally {
+      parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
+    }
 
-      // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
-      // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
-
-      do {
-        if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-          readDictionaryPageData(readStatus, parentColumnReader);
-          waitForExecutionResult(); // get the result of execution
-          synchronized (pageQueueSyncronize) {
-            boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
-            readStatus = pageQueue.take(); // get the data if no exception has been thrown
-            if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
-              break;
-            }
-            //if the queue was full before we took a page out, then there would
-            // have been no new read tasks scheduled. In that case, schedule a new read.
-            if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-              asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
-            }
-          }
-          pageHeader = readStatus.getPageHeader();
-        }
-      } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
+    long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
+    stats.timeDiskScanWait.addAndGet(timeBlocked);
+    stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
+    if (readStatus.isDictionaryPage) {
+      stats.numDictPageLoads.incrementAndGet();
+      stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+    } else {
+      stats.numDataPageLoads.incrementAndGet();
+      stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+    }
+
+    return readStatus;
+  }
 
+  @Override
+  protected void nextInternal() throws IOException {
+    try {
+      ReadStatus readStatus = nextPageFromQueue();
       pageHeader = readStatus.getPageHeader();
-      pageData = getDecompressedPageData(readStatus);
-      assert (pageData != null);
+
+      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {

Review comment:
       @vvysotskyi I thought the same thing in an earlier version, but I now don't think it would work.  The switch expression evaluates `pageHeader.getType()`, and the DICTIONARY_PAGE case *modifies* pageHeader because after loading the dictionary it loads another page.  So if we fell through from DICTIONARY_PAGE we'd need the switch expression to reevaluate `pageHeader.getType()` and I don't think it will do that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] lgtm-com[bot] commented on pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #2338:
URL: https://github.com/apache/drill/pull/2338#issuecomment-945935936


   This pull request **fixes 2 alerts** when merging 21db1f0d7de6dcadc737bbc85c768b79725972d0 into 0c9451e6720e5028e1187067cc6d1957ff998bef - [view on LGTM.com](https://lgtm.com/projects/g/apache/drill/rev/pr-81a6ae711781854fd87fc20d072285cc8bdd9e70)
   
   **fixed alerts:**
   
   * 2 for Type mismatch on container access


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] lgtm-com[bot] commented on pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #2338:
URL: https://github.com/apache/drill/pull/2338#issuecomment-945802536


   This pull request **fixes 2 alerts** when merging 83246e18cccf9d5301cc50fe8e94074b3c521a64 into 0c9451e6720e5028e1187067cc6d1957ff998bef - [view on LGTM.com](https://lgtm.com/projects/g/apache/drill/rev/pr-3054282a0c4d4fa1d186f269d43d884f89494baa)
   
   **fixed alerts:**
   
   * 2 for Type mismatch on container access


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] dzamo commented on a change in pull request #2338: DRILL-1282 Move parquet to use v2 format as default

Posted by GitBox <gi...@apache.org>.
dzamo commented on a change in pull request #2338:
URL: https://github.com/apache/drill/pull/2338#discussion_r733582283



##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
##########
@@ -78,209 +79,237 @@
  *
  */
 class AsyncPageReader extends PageReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+  static final Logger logger = LoggerFactory.getLogger(AsyncPageReader.class);
 
   private ExecutorService threadPool;
   private long queueSize;
   private LinkedBlockingQueue<ReadStatus> pageQueue;
   private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
   private long totalPageValuesRead = 0;
-  private Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
+  private final Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
                                                      // FindBugs complains if we synchronize on a Concurrent Queue
 
-  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
-      ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
-    super(parentStatus, fs, path, columnChunkMetaData);
+  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path) throws ExecutionSetupException {
+    super(parentStatus, fs, path);
     threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
     queueSize = parentColumnReader.parentReader.readQueueSize;
     pageQueue = new LinkedBlockingQueue<>((int) queueSize);
     asyncPageRead = new ConcurrentLinkedQueue<>();
   }
 
   @Override
-  protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
-      final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException {
-    if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-      try {
-        assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
-        long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
-        while (bytesToSkip > 0) {
-          long skipped = dataReader.skip(bytesToSkip);
-          if (skipped > 0) {
-            bytesToSkip -= skipped;
-          } else {
-            // no good way to handle this. Guava uses InputStream.available to check
-            // if EOF is reached and because available is not reliable,
-            // tries to read the rest of the data.
-            DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
-            if (skipBuf != null) {
-              skipBuf.release();
-            } else {
-              throw new EOFException("End of File reached.");
-            }
-          }
-        }
-      } catch (IOException e) {
-        handleAndThrowException(e, "Error Reading dictionary page.");
-      }
-    }
-  }
-
-  @Override protected void init() throws IOException {
+  protected void init() throws IOException {
     super.init();
     //Avoid Init if a shutdown is already in progress even if init() is called once
     if (!parentColumnReader.isShuttingDown) {
       asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
     }
   }
 
-  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
-    DrillBuf data;
-    boolean isDictionary = false;
-    synchronized (this) {
-      data = readStatus.getPageData();
-      readStatus.setPageData(null);
-      isDictionary = readStatus.isDictionaryPage;
-    }
-    if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) {
-      DrillBuf compressedData = data;
-      data = decompress(readStatus.getPageHeader(), compressedData);
-      synchronized (this) {
-        readStatus.setPageData(null);
-      }
-      compressedData.release();
-    } else {
-      if (isDictionary) {
-        stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
-      } else {
-        stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead);
-      }
-    }
-    return data;
+  /**
+   * Reads and stores this column chunk's dictionary page.
+   * @throws IOException
+   */
+  protected void loadDictionary(ReadStatus readStatus) throws IOException {
+    assert readStatus.isDictionaryPage();
+    assert this.dictionary == null;
+
+    // dictData is not a local because we need to release it later.
+    this.dictData = codecName == CompressionCodecName.UNCOMPRESSED
+      ? readStatus.getPageData()
+      : decompressPageV1(readStatus);
+
+    DictionaryPage page = new DictionaryPage(
+      asBytesInput(dictData, 0, pageHeader.uncompressed_page_size),
+      pageHeader.uncompressed_page_size,
+      pageHeader.dictionary_page_header.num_values,
+      valueOf(pageHeader.dictionary_page_header.encoding.name())
+    );
+
+    this.dictionary = page.getEncoding().initDictionary(columnDescriptor, page);
   }
 
-  // Read and decode the dictionary data
-  private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
-      throws UserException {
+  /**
+   * Reads a compressed v1 data page or a dictionary page, both of which are compressed
+   * in their entirety.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV1(ReadStatus readStatus) throws IOException {
+    Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int outputSize = pageHeader.getUncompressed_page_size();
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
+    long timeToRead;
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
-      pageHeader = readStatus.getPageHeader();
-      int uncompressedSize = pageHeader.getUncompressed_page_size();
-      final DrillBuf dictionaryData = getDecompressedPageData(readStatus);
-      Stopwatch timer = Stopwatch.createStarted();
-      allocatedDictionaryBuffers.add(dictionaryData);
-      DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0, uncompressedSize),
-          pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values,
-          valueOf(pageHeader.dictionary_page_header.encoding.name()));
-      this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
-      long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDictPageDecode.addAndGet(timeToDecode);
-    } catch (Exception e) {
-      handleAndThrowException(e, "Error decoding dictionary page.");
+      timer.start();
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(0, inputSize);
+      ByteBuffer output = outputPageData.nioBuffer(0, outputSize);
+
+      decomp.decompress(input, inputSize, output, outputSize);
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)
+      );
+
+      this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
+    } finally {
+      readStatus.setPageData(null);
+      if (inputPageData != null) {
+        inputPageData.release();
+      }
     }
-  }
 
-  private void handleAndThrowException(Exception e, String msg) throws UserException {
-    UserException ex = UserException.dataReadError(e).message(msg)
-        .pushContext("Row Group Start: ", this.parentColumnReader.columnChunkMetaData.getStartingPos())
-        .pushContext("Column: ", this.parentColumnReader.schemaElement.getName())
-        .pushContext("File: ", this.fileName).build(logger);
-    throw ex;
+    return outputPageData;
   }
 
-  private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) {
-    DrillBuf pageDataBuf = null;
+  /**
+   * Reads a compressed v2 data page which excluded the repetition and definition level
+   * sections from compression.
+   * @return decompressed Parquet page data
+   * @throws IOException
+   */
+  protected DrillBuf decompressPageV2(ReadStatus readStatus) throws IOException {
     Stopwatch timer = Stopwatch.createUnstarted();
+
+    PageHeader pageHeader = readStatus.getPageHeader();
+    int inputSize = pageHeader.getCompressed_page_size();
+    int repLevelSize = pageHeader.data_page_header_v2.getRepetition_levels_byte_length();
+    int defLevelSize = pageHeader.data_page_header_v2.getDefinition_levels_byte_length();
+    int compDataOffset = repLevelSize + defLevelSize;
+    int outputSize = pageHeader.uncompressed_page_size;
+    // TODO: does reporting this number have the same meaning in an async context?
+    long start = dataReader.getPos();
     long timeToRead;
-    int compressedSize = pageHeader.getCompressed_page_size();
-    int uncompressedSize = pageHeader.getUncompressed_page_size();
-    pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
+
+    DrillBuf inputPageData = readStatus.getPageData();
+    DrillBuf outputPageData = this.allocator.buffer(outputSize);
+
     try {
       timer.start();
+      // Write out the uncompressed section
+      // Note that the following setBytes call to read the repetition and definition level sections
+      // advances readerIndex in inputPageData but not writerIndex in outputPageData.
+      outputPageData.setBytes(0, inputPageData, compDataOffset);
+
+      // decompress from the start of compressed data to the end of the input buffer
+      CompressionCodecName codecName = columnChunkMetaData.getCodec();
+      CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
+      ByteBuffer input = inputPageData.nioBuffer(compDataOffset, inputSize - compDataOffset);
+      ByteBuffer output = outputPageData.nioBuffer(compDataOffset, outputSize - compDataOffset);
+      decomp.decompress(
+        input,
+        inputSize - compDataOffset,
+        output,
+        outputSize - compDataOffset
+      );
+      outputPageData.writerIndex(outputSize);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
 
-      CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
-      BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
-      ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
-      ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
+      logger.trace(
+        "Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+        columnChunkMetaData.toString(),
+        dataReader.getPos(), // TODO: see comment on earlier call to getPos()
+        outputSize,
+        ByteBufUtil.hexDump(outputPageData)
+      );
 
-      decomp.decompress(input, compressedSize, output, uncompressedSize);
-      pageDataBuf.writerIndex(uncompressedSize);
-      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
-      this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize);
-    } catch (IOException e) {
-      handleAndThrowException(e, "Error decompressing data.");
+      this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
+    } finally {
+      readStatus.setPageData(null);
+      if (inputPageData != null) {
+        inputPageData.release();
+      }
     }
-    return pageDataBuf;
+
+    return outputPageData;
   }
 
-  @Override
-  protected void nextInternal() throws IOException {
-    ReadStatus readStatus = null;
+  private ReadStatus nextPageFromQueue() throws InterruptedException, ExecutionException {
+    ReadStatus readStatus;
+    Stopwatch timer = Stopwatch.createStarted();
+    parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
     try {
-      Stopwatch timer = Stopwatch.createStarted();
-      parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
-      try {
-        waitForExecutionResult(); // get the result of execution
-        synchronized (pageQueueSyncronize) {
-          boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
-          readStatus = pageQueue.take(); // get the data if no exception has been thrown
-          if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
-            throw new DrillRuntimeException("Unexpected end of data");
-          }
-          //if the queue was full before we took a page out, then there would
-          // have been no new read tasks scheduled. In that case, schedule a new read.
-          if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-            asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
-          }
+      waitForExecutionResult(); // get the result of execution
+      synchronized (pageQueueSyncronize) {
+        boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
+        readStatus = pageQueue.take(); // get the data if no exception has been thrown
+        if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
+          throw new DrillRuntimeException("Unexpected end of data");
+        }
+        //if the queue was full before we took a page out, then there would
+        // have been no new read tasks scheduled. In that case, schedule a new read.
+        if (!parentColumnReader.isShuttingDown && pageQueueFull) {
+          asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
         }
-      } finally {
-        parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
-      }
-      long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDiskScanWait.addAndGet(timeBlocked);
-      stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
-      if (readStatus.isDictionaryPage) {
-        stats.numDictPageLoads.incrementAndGet();
-        stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
-      } else {
-        stats.numDataPageLoads.incrementAndGet();
-        stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
       }
-      pageHeader = readStatus.getPageHeader();
+    } finally {
+      parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
+    }
 
-      // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
-      // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
-
-      do {
-        if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-          readDictionaryPageData(readStatus, parentColumnReader);
-          waitForExecutionResult(); // get the result of execution
-          synchronized (pageQueueSyncronize) {
-            boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
-            readStatus = pageQueue.take(); // get the data if no exception has been thrown
-            if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
-              break;
-            }
-            //if the queue was full before we took a page out, then there would
-            // have been no new read tasks scheduled. In that case, schedule a new read.
-            if (!parentColumnReader.isShuttingDown && pageQueueFull) {
-              asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
-            }
-          }
-          pageHeader = readStatus.getPageHeader();
-        }
-      } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
+    long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
+    stats.timeDiskScanWait.addAndGet(timeBlocked);
+    stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
+    if (readStatus.isDictionaryPage) {
+      stats.numDictPageLoads.incrementAndGet();
+      stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+    } else {
+      stats.numDataPageLoads.incrementAndGet();
+      stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+    }
+
+    return readStatus;
+  }
 
+  @Override
+  protected void nextInternal() throws IOException {
+    try {
+      ReadStatus readStatus = nextPageFromQueue();
       pageHeader = readStatus.getPageHeader();
-      pageData = getDecompressedPageData(readStatus);
-      assert (pageData != null);
+
+      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {

Review comment:
       @vvysotskyi I thought the same thing in an earlier version, but I now don't think it would work.  The switch expression evaluates `pageHeader.getType()`, and the DICTIONARY_PAGE case *modifies* pageHeader because after loading the dictionary it loads another page.  So if we fell through from DICTIONARY_PAGE we'd need the switch expression to reevaluate `pageHeader.getType()` and I don't think it will do that?  I.e. I'd think switches only evaluate their expression once...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@drill.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org