You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2017/06/12 01:27:42 UTC

[4/4] drill git commit: DRILL-5545: Fix issues reported by findbugs in Async Parquet Reader. This closes #847

DRILL-5545: Fix issues reported by findbugs in Async Parquet Reader.
This closes #847


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/a7e29876
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/a7e29876
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/a7e29876

Branch: refs/heads/master
Commit: a7e298760f9c9efa281d61a1d2ec776139f69225
Parents: 610602e
Author: Parth Chandra <pc...@maprtech.com>
Authored: Wed May 17 16:56:13 2017 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Sun Jun 11 17:28:53 2017 -0700

----------------------------------------------------------------------
 .../parquet/columnreaders/AsyncPageReader.java  | 118 +++++++------------
 .../parquet/columnreaders/ColumnReader.java     |  17 ++-
 .../store/parquet/columnreaders/PageReader.java |  31 ++++-
 3 files changed, 83 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/a7e29876/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 2e94f56..926436c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -39,6 +39,7 @@ import org.apache.parquet.format.Util;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.xerial.snappy.Snappy;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Callable;
@@ -86,17 +87,16 @@ class AsyncPageReader extends PageReader {
   private LinkedBlockingQueue<ReadStatus> pageQueue;
   private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
   private long totalPageValuesRead = 0;
+  private Object pageQueueSyncronize = new Object(); // Object to use to synchronize access to the page Queue.
+                                                     // FindBugs complains if we synchronize on a Concurrent Queue
 
   AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
       ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
     super(parentStatus, fs, path, columnChunkMetaData);
-    if (threadPool == null && asyncPageRead == null) {
-      threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
-      queueSize  = parentColumnReader.parentReader.readQueueSize;
-      pageQueue = new LinkedBlockingQueue<>((int)queueSize);
-      asyncPageRead = new ConcurrentLinkedQueue<>();
-      asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
-    }
+    threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+    queueSize = parentColumnReader.parentReader.readQueueSize;
+    pageQueue = new LinkedBlockingQueue<>((int) queueSize);
+    asyncPageRead = new ConcurrentLinkedQueue<>();
   }
 
   @Override
@@ -105,22 +105,34 @@ class AsyncPageReader extends PageReader {
     if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
       try {
         assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
-        dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos());
+        long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
+        while (bytesToSkip > 0) {
+          long skipped = dataReader.skip(bytesToSkip);
+          if (skipped > 0) {
+            bytesToSkip -= skipped;
+          } else {
+            // no good way to handle this. Guava uses InputStream.available to check
+            // if EOF is reached and because available is not reliable,
+            // tries to read the rest of the data.
+            DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
+            if (skipBuf != null) {
+              skipBuf.release();
+            } else {
+              throw new EOFException("End of File reached.");
+            }
+          }
+        }
       } catch (IOException e) {
         handleAndThrowException(e, "Error Reading dictionary page.");
       }
-      // parent constructor may call this method before the thread pool is set.
-      if (threadPool == null && asyncPageRead == null) {
-        threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
-        queueSize  = parentColumnReader.parentReader.getFragmentContext().getOptions()
-            .getOption(ExecConstants.PARQUET_PAGEREADER_QUEUE_SIZE).num_val;
-        pageQueue = new LinkedBlockingQueue<ReadStatus>((int)queueSize);
-        asyncPageRead = new ConcurrentLinkedQueue<>();
-        asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
-      }
     }
   }
 
+  @Override protected void init() throws IOException {
+    super.init();
+    asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+  }
+
   private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
     DrillBuf data;
     boolean isDictionary = false;
@@ -146,35 +158,6 @@ class AsyncPageReader extends PageReader {
     return data;
   }
 
-  // Read and decode the dictionary and the header
-  private void readDictionaryPage( final ColumnReader<?> parentStatus) throws UserException {
-    try {
-      Stopwatch timer = Stopwatch.createStarted();
-      ReadStatus readStatus = null;
-      synchronized(pageQueue) {
-        boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
-        asyncPageRead.poll().get(); // get the result of execution
-        readStatus = pageQueue.take(); // get the data if no exception has been thrown
-        assert (readStatus.pageData != null);
-        //if the queue was full before we took a page out, then there would
-        // have been no new read tasks scheduled. In that case, schedule a new read.
-        if (pageQueueFull) {
-          asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
-        }
-      }
-      long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
-      stats.timeDiskScanWait.addAndGet(timeBlocked);
-      stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
-      stats.numDictPageLoads.incrementAndGet();
-      stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
-      readDictionaryPageData(readStatus, parentStatus);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    } catch (Exception e) {
-      handleAndThrowException(e, "Error reading dictionary page.");
-    }
-  }
-
   // Read and decode the dictionary data
   private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
       throws UserException {
@@ -229,12 +212,11 @@ class AsyncPageReader extends PageReader {
   @Override
   protected void nextInternal() throws IOException {
     ReadStatus readStatus = null;
-    String name = parentColumnReader.columnChunkMetaData.toString();
     try {
       Stopwatch timer = Stopwatch.createStarted();
       parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
       asyncPageRead.poll().get(); // get the result of execution
-      synchronized(pageQueue) {
+      synchronized (pageQueueSyncronize) {
         boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
         readStatus = pageQueue.take(); // get the data if no exception has been thrown
         if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
@@ -259,14 +241,14 @@ class AsyncPageReader extends PageReader {
       }
       pageHeader = readStatus.getPageHeader();
 
-    // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
-    // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
+      // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
+      // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
 
       do {
         if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
           readDictionaryPageData(readStatus, parentColumnReader);
           asyncPageRead.poll().get(); // get the result of execution
-          synchronized (pageQueue) {
+          synchronized (pageQueueSyncronize) {
             boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
             readStatus = pageQueue.take(); // get the data if no exception has been thrown
             if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
@@ -278,17 +260,18 @@ class AsyncPageReader extends PageReader {
               asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
             }
           }
-          assert (readStatus.pageData != null);
           pageHeader = readStatus.getPageHeader();
         }
       } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
 
-    pageHeader = readStatus.getPageHeader();
-    pageData = getDecompressedPageData(readStatus);
-    assert(pageData != null);
+      pageHeader = readStatus.getPageHeader();
+      pageData = getDecompressedPageData(readStatus);
+      assert (pageData != null);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-    } catch (Exception e){
+    } catch (RuntimeException e) { // Catch this explicitly to satisfy findbugs
+      handleAndThrowException(e, "Error reading page data");
+    } catch (Exception e) {
       handleAndThrowException(e, "Error reading page data");
     }
 
@@ -303,13 +286,14 @@ class AsyncPageReader extends PageReader {
         } else {
           f.get(1, TimeUnit.MILLISECONDS);
         }
+      } catch (RuntimeException e) {
+        // Do Nothing
       } catch (Exception e) {
         // Do nothing.
       }
     }
 
     //Empty the page queue
-    String name = parentColumnReader.columnChunkMetaData.toString();
     ReadStatus r;
     while (!pageQueue.isEmpty()) {
       r = null;
@@ -426,25 +410,10 @@ class AsyncPageReader extends PageReader {
       DrillBuf pageData = null;
       timer.reset();
       try {
-        long s = parent.dataReader.getPos();
         PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
-        //long e = parent.dataReader.getPos();
-        //if (logger.isTraceEnabled()) {
-        //  logger.trace("[{}]: Read Page Header : ReadPos = {} : Bytes Read = {} ", name, s, e - s);
-        //}
         int compressedSize = pageHeader.getCompressed_page_size();
-        s = parent.dataReader.getPos();
         pageData = parent.dataReader.getNext(compressedSize);
         bytesRead = compressedSize;
-        //e = parent.dataReader.getPos();
-        //if (logger.isTraceEnabled()) {
-        //  DrillBuf bufStart = pageData.slice(0, compressedSize>100?100:compressedSize);
-        //  int endOffset = compressedSize>100?compressedSize-100:0;
-        //  DrillBuf bufEnd = pageData.slice(endOffset, compressedSize-endOffset);
-        //  logger
-        //      .trace("[{}]: Read Page Data : ReadPos = {} : Bytes Read = {} : Buf Start = {} : Buf End = {} ",
-        //          name, s, e - s, ByteBufUtil.hexDump(bufStart), ByteBufUtil.hexDump(bufEnd));
-        //}
 
         synchronized (parent) {
           if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
@@ -462,7 +431,10 @@ class AsyncPageReader extends PageReader {
           readStatus.setDiskScanTime(timeToRead);
           assert (totalValuesRead <= totalValuesCount);
         }
-        synchronized (queue) {
+        // You do need the synchronized block
+        // because you want the check to see if there is remaining capacity in the queue, to be
+        // synchronized
+        synchronized (parent.pageQueueSyncronize) {
           queue.put(readStatus);
           // if the queue is not full, schedule another read task immediately. If it is then the consumer
           // will schedule a new read task as soon as it removes a page from the queue.

http://git-wip-us.apache.org/repos/asf/drill/blob/a7e29876/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index 98e1d78..2d8f556 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -99,6 +99,17 @@ public abstract class ColumnReader<V extends ValueVector> {
           new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(),
               columnChunkMetaData);
     }
+    try {
+      pageReader.init();
+    } catch (IOException e) {
+      UserException ex = UserException.dataReadError(e)
+          .message("Error initializing page reader for Parquet file")
+          .pushContext("Row Group Start: ", this.columnChunkMetaData.getStartingPos())
+          .pushContext("Column: ", this.schemaElement.getName())
+          .pushContext("File: ", this.parentReader.getHadoopPath().toString() )
+          .build(logger);
+      throw ex;
+    }
     if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
       if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
         dataTypeLengthInBits = columnDescriptor.getTypeLength() * 8;
@@ -106,9 +117,7 @@ public abstract class ColumnReader<V extends ValueVector> {
         dataTypeLengthInBits = ParquetColumnMetadata.getTypeLengthInBits(columnDescriptor.getType());
       }
     }
-    if(threadPool == null) {
-      threadPool = parentReader.getOperatorContext().getScanDecodeExecutor();
-    }
+    threadPool = parentReader.getOperatorContext().getScanDecodeExecutor();
   }
 
   public int getRecordsReadInCurrentPass() {
@@ -218,7 +227,7 @@ public abstract class ColumnReader<V extends ValueVector> {
   public Future<Boolean> readPageAsync() {
     Future<Boolean> f = threadPool.submit(new Callable<Boolean>() {
       @Override public Boolean call() throws Exception {
-        return new Boolean(readPage());
+        return Boolean.valueOf(readPage());
       }
     });
     return f;

http://git-wip-us.apache.org/repos/asf/drill/blob/a7e29876/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 8a783c9..35cdb14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -46,6 +46,7 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.PrimitiveType;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -143,10 +144,6 @@ class PageReader {
             columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), enforceTotalSize,
             useFadvise);
       }
-      dataReader.init();
-
-      loadDictionaryIfExists(parentStatus, columnChunkMetaData, dataReader);
-
     } catch (IOException e) {
       throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: "
           + path.getName(), e);
@@ -154,11 +151,33 @@ class PageReader {
 
   }
 
+  protected void init() throws IOException{
+    dataReader.init();
+    loadDictionaryIfExists(parentColumnReader, parentColumnReader.columnChunkMetaData, dataReader);
+  }
+
   protected void loadDictionaryIfExists(final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentStatus,
       final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws IOException {
     Stopwatch timer = Stopwatch.createUnstarted();
     if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-      dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos());
+      long bytesToSkip = columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos();
+      while (bytesToSkip > 0) {
+        long skipped = dataReader.skip(bytesToSkip);
+        if (skipped > 0) {
+          bytesToSkip -= skipped;
+        } else {
+          // no good way to handle this. Guava uses InputStream.available to check
+          // if EOF is reached and because available is not reliable,
+          // tries to read the rest of the data.
+          DrillBuf skipBuf = dataReader.getNext((int) bytesToSkip);
+          if (skipBuf != null) {
+            skipBuf.release();
+          } else {
+            throw new EOFException("End of File reachecd.");
+          }
+        }
+      }
+
       long start=dataReader.getPos();
       timer.start();
       final PageHeader pageHeader = Util.readPageHeader(f);
@@ -366,7 +385,7 @@ class PageReader {
     if (pageHeader.type == PageType.DICTIONARY_PAGE) {
       pageType = "Dictionary Page";
     }
-    logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}", op, pageType.toString(),
+    logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}", op, pageType,
         this.parentColumnReader.parentReader.hadoopPath,
         this.parentColumnReader.columnDescriptor.toString(), start, bytesin, bytesout, time);