You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2017/02/04 03:58:52 UTC

[2/8] drill git commit: DRILL-5207: Improve Parquet Scan pipelining. Add a configurable AsyncPageReader Queue. Enforce total size of parquet row group. Do not initialize BufferedDirectBufInputStream buffer in init. Wait for first read. Change default siz

DRILL-5207: Improve Parquet Scan pipelining. Add a configurable AsyncPageReader Queue. Enforce total size of parquet row group. Do not initialize BufferedDirectBufInputStream buffer in init. Wait for first read. Change default size of BufferedDirectBufInputStream. Do not invoke getOptions too many times in Parquet reader. Add metrics for processing time, and decoding time for varlen and fixedlen columns.

This closes #723


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/05201010
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/05201010
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/05201010

Branch: refs/heads/master
Commit: 052010108a47856f9b1a3c0c470b6572948dc749
Parents: 31b5282
Author: Parth Chandra <pa...@apache.org>
Authored: Wed Dec 14 12:08:20 2016 -0800
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Fri Feb 3 17:40:17 2017 -0800

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  11 +-
 .../apache/drill/exec/ops/FragmentContext.java  |   2 +-
 .../apache/drill/exec/ops/OperatorStats.java    |  12 +
 .../server/options/SystemOptionManager.java     |   2 +
 .../exec/store/parquet/ParquetReaderStats.java  |   3 +
 .../parquet/columnreaders/AsyncPageReader.java  | 267 ++++++++++++++-----
 .../parquet/columnreaders/ColumnReader.java     |   5 +-
 .../store/parquet/columnreaders/PageReader.java |  59 ++--
 .../columnreaders/ParquetRecordReader.java      | 102 ++++---
 .../columnreaders/VarLenBinaryReader.java       |  10 +-
 .../BufferedDirectBufInputStream.java           |  49 ++--
 .../util/filereader/DirectBufInputStream.java   |  21 +-
 12 files changed, 388 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/05201010/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index e8cc75c..07cc3a7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -160,6 +160,13 @@ public interface ExecConstants {
   String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async";
   OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC, true);
 
+  // Number of pages the Async Parquet page reader will read before blocking
+  String PARQUET_PAGEREADER_QUEUE_SIZE = "store.parquet.reader.pagereader.queuesize";
+  OptionValidator PARQUET_PAGEREADER_QUEUE_SIZE_VALIDATOR = new  PositiveLongValidator(PARQUET_PAGEREADER_QUEUE_SIZE, Integer.MAX_VALUE, 2);
+
+  String PARQUET_PAGEREADER_ENFORCETOTALSIZE = "store.parquet.reader.pagereader.enforceTotalSize";
+  OptionValidator PARQUET_PAGEREADER_ENFORCETOTALSIZE_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ENFORCETOTALSIZE, false);
+
   String PARQUET_COLUMNREADER_ASYNC = "store.parquet.reader.columnreader.async";
   OptionValidator PARQUET_COLUMNREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_COLUMNREADER_ASYNC, false);
 
@@ -167,9 +174,9 @@ public interface ExecConstants {
   String PARQUET_PAGEREADER_USE_BUFFERED_READ = "store.parquet.reader.pagereader.bufferedread";
   OptionValidator PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR = new  BooleanValidator(PARQUET_PAGEREADER_USE_BUFFERED_READ, true);
 
-  // Size in MiB of the buffer the Parquet page reader will use to read from disk. Default is 8 MiB
+  // Size in MiB of the buffer the Parquet page reader will use to read from disk. Default is 1 MiB
   String PARQUET_PAGEREADER_BUFFER_SIZE = "store.parquet.reader.pagereader.buffersize";
-  OptionValidator PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR = new  LongValidator(PARQUET_PAGEREADER_BUFFER_SIZE, 4*1024*1024);
+  OptionValidator PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR = new  LongValidator(PARQUET_PAGEREADER_BUFFER_SIZE, 1*1024*1024);
 
   // try to use fadvise if available
   String PARQUET_PAGEREADER_USE_FADVISE = "store.parquet.reader.pagereader.usefadvise";

http://git-wip-us.apache.org/repos/asf/drill/blob/05201010/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index e288095..8be575f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -273,7 +273,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
     return fragment.getHandle();
   }
 
-  private String getFragIdString() {
+  public String getFragIdString() {
     final FragmentHandle handle = getHandle();
     final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0";
     return frag;

http://git-wip-us.apache.org/repos/asf/drill/blob/05201010/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index b565774..7cd4523 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.ops;
 import java.util.Iterator;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.MetricValue;
 import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
 import org.apache.drill.exec.proto.UserBitShared.OperatorProfile.Builder;
@@ -187,6 +188,17 @@ public class OperatorStats {
     }
   }
 
+  public String getId() {
+    StringBuilder s = new StringBuilder();
+    return s.append(this.operatorId)
+        .append(":")
+        .append("[")
+        .append(UserBitShared.CoreOperatorType.valueOf(operatorType))
+        .append("]")
+        .toString();
+  }
+
+
   public OperatorProfile getProfile() {
     final OperatorProfile.Builder b = OperatorProfile //
         .newBuilder() //

http://git-wip-us.apache.org/repos/asf/drill/blob/05201010/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 57c72d5..f8b4334 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -103,6 +103,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
       ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
       ExecConstants.PARQUET_PAGEREADER_ASYNC_VALIDATOR,
+      ExecConstants.PARQUET_PAGEREADER_QUEUE_SIZE_VALIDATOR,
+      ExecConstants.PARQUET_PAGEREADER_ENFORCETOTALSIZE_VALIDATOR,
       ExecConstants.PARQUET_COLUMNREADER_ASYNC_VALIDATOR,
       ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR,
       ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/05201010/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
index c2711cc..b1dc0be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
@@ -41,6 +41,9 @@ public class ParquetReaderStats {
 
   public AtomicLong timeDiskScanWait = new AtomicLong();
   public AtomicLong timeDiskScan = new AtomicLong();
+  public AtomicLong timeFixedColumnRead = new AtomicLong();
+  public AtomicLong timeVarColumnRead = new AtomicLong();
+  public AtomicLong timeProcess = new AtomicLong();
 
   public ParquetReaderStats() {
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/05201010/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index e2ba865..2e94f56 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -19,9 +19,10 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 
 import com.google.common.base.Stopwatch;
 import io.netty.buffer.DrillBuf;
-import io.netty.buffer.ByteBufUtil;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DirectDecompressor;
 import org.apache.hadoop.io.compress.GzipCodec;
@@ -41,26 +42,61 @@ import org.xerial.snappy.Snappy;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.parquet.column.Encoding.valueOf;
-
+/**
+ * The AyncPageReader reads one page of data at a time asynchronously from the provided InputStream. The
+ * first request to the page reader creates a Future Task (AsyncPageReaderTask) and submits it to the
+ * scan thread pool. The result of the Future task (a page) is put into a (blocking) queue and the scan
+ * thread starts processing the data as soon as the Future task is complete.
+ * This is a simple producer-consumer queue, the AsyncPageReaderTask is the producer and the ParquetScan is
+ * the consumer.
+ * The AsyncPageReaderTask submits another Future task for reading the next page as soon as it is done,
+ * while the results queue is not full. Until the queue is full, therefore, the scan thread pool keeps the
+ * disk as busy as possible.
+ * In case the disk is slower than the processing, the queue is never filled up after the processing of the
+ * pages begins. In this case, the next disk read begins immediately after the previous read is completed
+ * and the disk is never idle. The query in this case is effectively bounded by the disk.
+ * If, however, the processing is slower than the disk (can happen with SSDs, data being cached by the
+ * FileSystem, or if the processing requires complex processing that is necessarily slow) the queue fills
+ * up. Once the queue is full, the AsyncPageReaderTask does not submit any new Future tasks. The next Future
+ * task is submitted by the *processing* thread as soon as it pulls a page out of the queue. (Note that the
+ * invariant here is that there is space for at least one more page in the queue before the Future read task
+ * is submitted to the pool). This sequence is important. Not doing so can lead to deadlocks - producer
+ * threads may block on putting data into the queue which is full while the consumer threads might be
+ * blocked trying to read from a queue that has no data.
+ * The first request to the page reader can be either to load a dictionary page or a data page; this leads
+ * to the rather odd looking code in the constructor since the parent PageReader calls
+ * loadDictionaryIfExists in the constructor.
+ * The Future tasks created are kept in a non blocking queue and the Future object is checked for any
+ * exceptions that might have occurred during the execution. The queue of Futures is also used to cancel
+ * any pending Futures at close (this may happen as a result of a cancel).
+ *
+ */
 class AsyncPageReader extends PageReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
 
-
   private ExecutorService threadPool;
-  private Future<ReadStatus> asyncPageRead;
+  private long queueSize;
+  private LinkedBlockingQueue<ReadStatus> pageQueue;
+  private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
+  private long totalPageValuesRead = 0;
 
   AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
       ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
     super(parentStatus, fs, path, columnChunkMetaData);
-    if (threadPool == null) {
+    if (threadPool == null && asyncPageRead == null) {
       threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+      queueSize  = parentColumnReader.parentReader.readQueueSize;
+      pageQueue = new LinkedBlockingQueue<>((int)queueSize);
+      asyncPageRead = new ConcurrentLinkedQueue<>();
+      asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
     }
-    asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
   }
 
   @Override
@@ -74,12 +110,14 @@ class AsyncPageReader extends PageReader {
         handleAndThrowException(e, "Error Reading dictionary page.");
       }
       // parent constructor may call this method before the thread pool is set.
-      if (threadPool == null) {
+      if (threadPool == null && asyncPageRead == null) {
         threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+        queueSize  = parentColumnReader.parentReader.getFragmentContext().getOptions()
+            .getOption(ExecConstants.PARQUET_PAGEREADER_QUEUE_SIZE).num_val;
+        pageQueue = new LinkedBlockingQueue<ReadStatus>((int)queueSize);
+        asyncPageRead = new ConcurrentLinkedQueue<>();
+        asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
       }
-      asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
-      readDictionaryPage(asyncPageRead, parentStatus);
-      asyncPageRead = null; // reset after consuming
     }
   }
 
@@ -109,17 +147,29 @@ class AsyncPageReader extends PageReader {
   }
 
   // Read and decode the dictionary and the header
-  private void readDictionaryPage(final Future<ReadStatus> asyncPageRead,
-      final ColumnReader<?> parentStatus) throws UserException {
+  private void readDictionaryPage( final ColumnReader<?> parentStatus) throws UserException {
     try {
       Stopwatch timer = Stopwatch.createStarted();
-      ReadStatus readStatus = asyncPageRead.get();
+      ReadStatus readStatus = null;
+      synchronized(pageQueue) {
+        boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
+        asyncPageRead.poll().get(); // get the result of execution
+        readStatus = pageQueue.take(); // get the data if no exception has been thrown
+        assert (readStatus.pageData != null);
+        //if the queue was full before we took a page out, then there would
+        // have been no new read tasks scheduled. In that case, schedule a new read.
+        if (pageQueueFull) {
+          asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+        }
+      }
       long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
       stats.timeDiskScanWait.addAndGet(timeBlocked);
       stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
       stats.numDictPageLoads.incrementAndGet();
-      stats.timeDictPageLoads.addAndGet(timeBlocked+readStatus.getDiskScanTime());
+      stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
       readDictionaryPageData(readStatus, parentStatus);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
     } catch (Exception e) {
       handleAndThrowException(e, "Error reading dictionary page.");
     }
@@ -176,12 +226,28 @@ class AsyncPageReader extends PageReader {
     return pageDataBuf;
   }
 
-  @Override protected void nextInternal() throws IOException {
+  @Override
+  protected void nextInternal() throws IOException {
     ReadStatus readStatus = null;
+    String name = parentColumnReader.columnChunkMetaData.toString();
     try {
       Stopwatch timer = Stopwatch.createStarted();
-      readStatus = asyncPageRead.get();
+      parentColumnReader.parentReader.getOperatorContext().getStats().startWait();
+      asyncPageRead.poll().get(); // get the result of execution
+      synchronized(pageQueue) {
+        boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
+        readStatus = pageQueue.take(); // get the data if no exception has been thrown
+        if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
+          throw new DrillRuntimeException("Unexpected end of data");
+        }
+        //if the queue was full before we took a page out, then there would
+        // have been no new read tasks scheduled. In that case, schedule a new read.
+        if (pageQueueFull) {
+          asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+        }
+      }
       long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
+      parentColumnReader.parentReader.getOperatorContext().getStats().stopWait();
       stats.timeDiskScanWait.addAndGet(timeBlocked);
       stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
       if (readStatus.isDictionaryPage) {
@@ -192,45 +258,74 @@ class AsyncPageReader extends PageReader {
         stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
       }
       pageHeader = readStatus.getPageHeader();
-      // reset this. At the time of calling close, if this is not null then a pending asyncPageRead needs to be consumed
-      asyncPageRead = null;
-    } catch (Exception e) {
-      handleAndThrowException(e, "Error reading page data.");
-    }
 
     // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
     // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
 
-    do {
-      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-        readDictionaryPageData(readStatus, parentColumnReader);
-        // Ugly. Use the Async task to make a synchronous read call.
-        readStatus = new AsyncPageReaderTask().call();
-        pageHeader = readStatus.getPageHeader();
-      }
-    } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
-
-    if (parentColumnReader.totalValuesRead + readStatus.getValuesRead()
-        < parentColumnReader.columnChunkMetaData.getValueCount()) {
-      asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
-    }
+      do {
+        if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
+          readDictionaryPageData(readStatus, parentColumnReader);
+          asyncPageRead.poll().get(); // get the result of execution
+          synchronized (pageQueue) {
+            boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
+            readStatus = pageQueue.take(); // get the data if no exception has been thrown
+            if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
+              break;
+            }
+            //if the queue was full before we took a page out, then there would
+            // have been no new read tasks scheduled. In that case, schedule a new read.
+            if (pageQueueFull) {
+              asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue)));
+            }
+          }
+          assert (readStatus.pageData != null);
+          pageHeader = readStatus.getPageHeader();
+        }
+      } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
 
     pageHeader = readStatus.getPageHeader();
     pageData = getDecompressedPageData(readStatus);
-
+    assert(pageData != null);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch (Exception e){
+      handleAndThrowException(e, "Error reading page data");
+    }
 
   }
 
-
   @Override public void clear() {
-    if (asyncPageRead != null) {
+    while (asyncPageRead != null && !asyncPageRead.isEmpty()) {
       try {
-        final ReadStatus readStatus = asyncPageRead.get();
-        readStatus.getPageData().release();
+        Future<Void> f = asyncPageRead.poll();
+        if(!f.isDone() && !f.isCancelled()){
+          f.cancel(true);
+        } else {
+          f.get(1, TimeUnit.MILLISECONDS);
+        }
       } catch (Exception e) {
         // Do nothing.
       }
     }
+
+    //Empty the page queue
+    String name = parentColumnReader.columnChunkMetaData.toString();
+    ReadStatus r;
+    while (!pageQueue.isEmpty()) {
+      r = null;
+      try {
+        r = pageQueue.take();
+        if (r == ReadStatus.EMPTY) {
+          break;
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } finally {
+        if (r != null && r.pageData != null) {
+          r.pageData.release();
+        }
+      }
+    }
     super.clear();
   }
 
@@ -242,6 +337,8 @@ class AsyncPageReader extends PageReader {
     private long valuesRead = 0;
     private long diskScanTime = 0;
 
+    public static final ReadStatus EMPTY = new ReadStatus();
+
     public synchronized PageHeader getPageHeader() {
       return pageHeader;
     }
@@ -282,57 +379,72 @@ class AsyncPageReader extends PageReader {
       this.valuesRead = valuesRead;
     }
 
-    public long getDiskScanTime() {
+    public synchronized long getDiskScanTime() {
       return diskScanTime;
     }
 
-    public void setDiskScanTime(long diskScanTime) {
+    public synchronized void setDiskScanTime(long diskScanTime) {
       this.diskScanTime = diskScanTime;
     }
-  }
 
+  }
 
-  private class AsyncPageReaderTask implements Callable<ReadStatus> {
+  private class AsyncPageReaderTask implements Callable<Void> {
 
     private final AsyncPageReader parent = AsyncPageReader.this;
+    private final LinkedBlockingQueue<ReadStatus> queue;
+    private final String name;
 
-    public AsyncPageReaderTask() {
+    public AsyncPageReaderTask(String name, LinkedBlockingQueue<ReadStatus> queue) {
+      this.name = name;
+      this.queue = queue;
     }
 
-    @Override public ReadStatus call() throws IOException {
+    @Override
+    public Void call() throws IOException {
       ReadStatus readStatus = new ReadStatus();
 
-      String oldname = Thread.currentThread().getName();
-      String name = parent.parentColumnReader.columnChunkMetaData.toString();
-      Thread.currentThread().setName(name);
-
       long bytesRead = 0;
       long valuesRead = 0;
+      final long totalValuesRead = parent.totalPageValuesRead;
       Stopwatch timer = Stopwatch.createStarted();
 
+      final long totalValuesCount = parent.parentColumnReader.columnChunkMetaData.getValueCount();
+
+      // if we are done, just put a marker object in the queue and we are done.
+      logger.trace("[{}]: Total Values COUNT {}  Total Values READ {} ", name, totalValuesCount, totalValuesRead);
+      if (totalValuesRead >= totalValuesCount) {
+        try {
+          queue.put(ReadStatus.EMPTY);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          // Do nothing.
+        }
+        return null;
+      }
+
       DrillBuf pageData = null;
+      timer.reset();
       try {
         long s = parent.dataReader.getPos();
         PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
-        long e = parent.dataReader.getPos();
-        if (logger.isTraceEnabled()) {
-          logger.trace("[{}]: Read Page Header : ReadPos = {} : Bytes Read = {} ", name, s, e - s);
-        }
+        //long e = parent.dataReader.getPos();
+        //if (logger.isTraceEnabled()) {
+        //  logger.trace("[{}]: Read Page Header : ReadPos = {} : Bytes Read = {} ", name, s, e - s);
+        //}
         int compressedSize = pageHeader.getCompressed_page_size();
         s = parent.dataReader.getPos();
         pageData = parent.dataReader.getNext(compressedSize);
-        e = parent.dataReader.getPos();
         bytesRead = compressedSize;
-
-        if (logger.isTraceEnabled()) {
-          DrillBuf bufStart = pageData.slice(0, compressedSize>100?100:compressedSize);
-          int endOffset = compressedSize>100?compressedSize-100:0;
-          DrillBuf bufEnd = pageData.slice(endOffset, compressedSize-endOffset);
-          logger
-              .trace("[{}]: Read Page Data : ReadPos = {} : Bytes Read = {} : Buf Start = {} : Buf End = {} ",
-                  name, s, e - s, ByteBufUtil.hexDump(bufStart), ByteBufUtil.hexDump(bufEnd));
-
-        }
+        //e = parent.dataReader.getPos();
+        //if (logger.isTraceEnabled()) {
+        //  DrillBuf bufStart = pageData.slice(0, compressedSize>100?100:compressedSize);
+        //  int endOffset = compressedSize>100?compressedSize-100:0;
+        //  DrillBuf bufEnd = pageData.slice(endOffset, compressedSize-endOffset);
+        //  logger
+        //      .trace("[{}]: Read Page Data : ReadPos = {} : Bytes Read = {} : Buf Start = {} : Buf End = {} ",
+        //          name, s, e - s, ByteBufUtil.hexDump(bufStart), ByteBufUtil.hexDump(bufEnd));
+        //}
 
         synchronized (parent) {
           if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
@@ -340,6 +452,7 @@ class AsyncPageReader extends PageReader {
             valuesRead += pageHeader.getDictionary_page_header().getNum_values();
           } else {
             valuesRead += pageHeader.getData_page_header().getNum_values();
+            parent.totalPageValuesRead += valuesRead;
           }
           long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
           readStatus.setPageHeader(pageHeader);
@@ -347,16 +460,30 @@ class AsyncPageReader extends PageReader {
           readStatus.setBytesRead(bytesRead);
           readStatus.setValuesRead(valuesRead);
           readStatus.setDiskScanTime(timeToRead);
+          assert (totalValuesRead <= totalValuesCount);
         }
-
+        synchronized (queue) {
+          queue.put(readStatus);
+          // if the queue is not full, schedule another read task immediately. If it is then the consumer
+          // will schedule a new read task as soon as it removes a page from the queue.
+          if (queue.remainingCapacity() > 0) {
+            asyncPageRead.offer(parent.threadPool.submit(new AsyncPageReaderTask(debugName, queue)));
+          }
+        }
+        // Do nothing.
+      } catch (InterruptedException e) {
+        if (pageData != null) {
+          pageData.release();
+        }
+        Thread.currentThread().interrupt();
       } catch (Exception e) {
         if (pageData != null) {
           pageData.release();
         }
-        throw e;
-      }
-      Thread.currentThread().setName(oldname);
-      return readStatus;
+        parent.handleAndThrowException(e, "Exception occurred while reading from disk.");
+      } finally {
+    }
+      return null;
     }
 
   }
@@ -364,7 +491,7 @@ class AsyncPageReader extends PageReader {
   private class DecompressionHelper {
     final CompressionCodecName codecName;
 
-    public DecompressionHelper(CompressionCodecName codecName) {
+    public DecompressionHelper(CompressionCodecName codecName){
       this.codecName = codecName;
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/05201010/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index 73cbc3d..c45642b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -26,7 +26,6 @@ import java.util.concurrent.Future;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -90,8 +89,7 @@ public abstract class ColumnReader<V extends ValueVector> {
     this.isFixedLength = fixedLength;
     this.schemaElement = schemaElement;
     this.valueVec =  v;
-    boolean useAsyncPageReader  = parentReader.getFragmentContext().getOptions()
-        .getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
+    boolean useAsyncPageReader = parentReader.useAsyncPageReader;
     if (useAsyncPageReader) {
       this.pageReader =
           new AsyncPageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(),
@@ -101,7 +99,6 @@ public abstract class ColumnReader<V extends ValueVector> {
           new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(),
               columnChunkMetaData);
     }
-
     if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
       if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
         dataTypeLengthInBits = columnDescriptor.getTypeLength() * 8;

http://git-wip-us.apache.org/repos/asf/drill/blob/05201010/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index f71eeae..e11fd65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 
 import com.google.common.base.Stopwatch;
 import io.netty.buffer.ByteBufUtil;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
@@ -54,7 +53,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.parquet.column.Encoding.valueOf;
-import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
 
 // class to keep track of the read position of variable length columns
 class PageReader {
@@ -111,6 +109,9 @@ class PageReader {
   private final boolean useBufferedReader;
   private final int scanBufferSize;
   private final boolean useFadvise;
+  private final boolean enforceTotalSize;
+
+  protected final String debugName;
 
   PageReader(org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
     throws ExecutionSetupException {
@@ -119,23 +120,28 @@ class PageReader {
     codecFactory = parentColumnReader.parentReader.getCodecFactory();
     this.stats = parentColumnReader.parentReader.parquetReaderStats;
     this.fileName = path.toString();
+    debugName = new StringBuilder()
+       .append(this.parentColumnReader.parentReader.getFragmentContext().getFragIdString())
+       .append(":")
+       .append(this.parentColumnReader.parentReader.getOperatorContext().getStats().getId() )
+       .append(this.parentColumnReader.columnChunkMetaData.toString() )
+       .toString();
     try {
       inputStream  = fs.open(path);
       BufferAllocator allocator =  parentColumnReader.parentReader.getOperatorContext().getAllocator();
       columnChunkMetaData.getTotalUncompressedSize();
-      useBufferedReader  = parentColumnReader.parentReader.getFragmentContext().getOptions()
-          .getOption(ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ).bool_val;
-      scanBufferSize = parentColumnReader.parentReader.getFragmentContext().getOptions()
-          .getOption(ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE).num_val.intValue();
-      useFadvise = parentColumnReader.parentReader.getFragmentContext().getOptions()
-          .getOption(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE).bool_val;
+      useBufferedReader  = parentColumnReader.parentReader.useBufferedReader;
+      scanBufferSize = parentColumnReader.parentReader.bufferedReadSize;
+      useFadvise = parentColumnReader.parentReader.useFadvise;
+      enforceTotalSize = parentColumnReader.parentReader.enforceTotalSize;
       if (useBufferedReader) {
         this.dataReader = new BufferedDirectBufInputStream(inputStream, allocator, path.getName(),
             columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), scanBufferSize,
-            useFadvise);
+            enforceTotalSize, useFadvise);
       } else {
         this.dataReader = new DirectBufInputStream(inputStream, allocator, path.getName(),
-            columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), useFadvise);
+            columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), enforceTotalSize,
+            useFadvise);
       }
       dataReader.init();
 
@@ -201,21 +207,22 @@ class PageReader {
       pageDataBuf=allocateTemporaryBuffer(uncompressedSize);
 
       try {
-      timer.start();
-      compressedData = dataReader.getNext(compressedSize);
-      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
-      timer.reset();
-      this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, compressedSize);
-      start=dataReader.getPos();
-      timer.start();
-      codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData
-          .getCodec()).decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
-          pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize);
+        timer.start();
+        compressedData = dataReader.getNext(compressedSize);
+        timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+
+        timer.reset();
+        this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, compressedSize);
+        start = dataReader.getPos();
+        timer.start();
+        codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec())
+            .decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
+                pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize);
         pageDataBuf.writerIndex(uncompressedSize);
         timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
         this.updateStats(pageHeader, "Decompress", start, timeToRead, compressedSize, uncompressedSize);
       } finally {
-        if(compressedData != null) {
+        if (compressedData != null) {
           compressedData.release();
         }
       }
@@ -247,9 +254,6 @@ class PageReader {
           this.parentColumnReader.parentReader.hadoopPath,
           this.parentColumnReader.columnDescriptor.toString(), start, 0, 0, timeToRead);
       timer.reset();
-      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-        readDictionaryPage(pageHeader, parentColumnReader);
-      }
     } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
 
     int compressedSize = pageHeader.getCompressed_page_size();
@@ -272,12 +276,17 @@ class PageReader {
 
     // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause
     // and submit a bug report
-    if(parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) {
+    long totalValueCount = parentColumnReader.columnChunkMetaData.getValueCount();
+    if(parentColumnReader.totalValuesRead >= totalValueCount) {
       return false;
     }
     clearBuffers();
 
     nextInternal();
+    if(pageData == null || pageHeader == null){
+      //TODO: Is this an error condition or a normal condition??
+      return false;
+    }
 
     timer.start();
     currentPageCount = pageHeader.data_page_header.num_values;

http://git-wip-us.apache.org/repos/asf/drill/blob/05201010/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 69f6a62..79901ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -23,9 +23,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -119,6 +120,17 @@ public class ParquetRecordReader extends AbstractRecordReader {
   private final FragmentContext fragmentContext;
   ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus;
 
+  public boolean useAsyncColReader;
+  public boolean useAsyncPageReader;
+  public boolean useBufferedReader;
+  public int bufferedReadSize;
+  public boolean useFadvise;
+  public boolean enforceTotalSize;
+  public long readQueueSize;
+
+  private String name;
+
+
   public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
 
   public enum Metric implements MetricDef {
@@ -138,7 +150,10 @@ public class ParquetRecordReader extends AbstractRecordReader {
     TIME_DICT_PAGES_DECOMPRESSED,  // Time in nanos in decompressing dictionary pages
     TIME_DATA_PAGES_DECOMPRESSED,  // Time in nanos in decompressing data pages
     TIME_DISK_SCAN_WAIT,           // Time in nanos spent in waiting for an async disk read to complete
-    TIME_DISK_SCAN;                // Time in nanos spent in reading data from disk.
+    TIME_DISK_SCAN,                // Time in nanos spent in reading data from disk.
+    TIME_FIXEDCOLUMN_READ,         // Time in nanos spent in converting fixed width data to value vectors
+    TIME_VARCOLUMN_READ,           // Time in nanos spent in converting varwidth data to value vectors
+    TIME_PROCESS;                  // Time in nanos spent in processing
 
     @Override public int metricId() {
       return ordinal();
@@ -182,6 +197,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
       ParquetMetadata footer,
       List<SchemaPath> columns,
       ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException {
+    this.name = path;
     this.hadoopPath = new Path(path);
     this.fileSystem = fs;
     this.codecFactory = codecFactory;
@@ -197,6 +213,21 @@ public class ParquetRecordReader extends AbstractRecordReader {
       assert (numRecordsToRead >= 0);
       this.numRecordsToRead = Math.min(numRecordsToRead, footer.getBlocks().get(rowGroupIndex).getRowCount());
     }
+    useAsyncColReader =
+        fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
+    useAsyncPageReader =
+        fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
+    useBufferedReader =
+        fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ).bool_val;
+    bufferedReadSize =
+        fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE).num_val.intValue();
+    useFadvise =
+        fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE).bool_val;
+    readQueueSize =
+        fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_QUEUE_SIZE).num_val;
+    enforceTotalSize =
+        fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ENFORCETOTALSIZE).bool_val;
+
     setColumns(columns);
   }
 
@@ -399,9 +430,11 @@ public class ParquetRecordReader extends AbstractRecordReader {
                 getTypeLengthInBits(column.getType()), -1, column, columnChunkMetaData, false, repeatedVector, schemaElement));
           }
           else {
-            columnStatuses.add(ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
+
+           ColumnReader<?> cr = ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
                 column, columnChunkMetaData, recordsPerBatch, vector,
-                schemaElement));
+                schemaElement) ;
+            columnStatuses.add(cr);
           }
         } else {
           // create a reader and add it to the appropriate list
@@ -471,13 +504,13 @@ public class ParquetRecordReader extends AbstractRecordReader {
   }
 
  public void readAllFixedFields(long recordsToRead) throws IOException {
-   boolean useAsyncColReader =
-       fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
+   Stopwatch timer = Stopwatch.createStarted();
    if(useAsyncColReader){
-    readAllFixedFieldsParallel(recordsToRead) ;
+     readAllFixedFieldsParallel(recordsToRead) ;
    } else {
-     readAllFixedFieldsSerial(recordsToRead); ;
+     readAllFixedFieldsSerial(recordsToRead);
    }
+   parquetReaderStats.timeFixedColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
  }
 
   public void readAllFixedFieldsSerial(long recordsToRead) throws IOException {
@@ -514,6 +547,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
   public int next() {
     resetBatch();
     long recordsToRead = 0;
+    Stopwatch timer = Stopwatch.createStarted();
     try {
       ColumnReader<?> firstColumnStatus;
       if (columnStatuses.size() > 0) {
@@ -530,7 +564,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
       // No columns found in the file were selected, simply return a full batch of null records for each column requested
       if (firstColumnStatus == null) {
         if (mockRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount()) {
-          updateStats();
+          parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
           return 0;
         }
         recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead);
@@ -544,7 +578,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
         mockRecordsRead += recordsToRead;
         totalRecordsRead += recordsToRead;
         numRecordsToRead -= recordsToRead;
-        updateStats();
+        parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
         return (int) recordsToRead;
       }
 
@@ -576,7 +610,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
 //      logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
       totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass();
       numRecordsToRead -= firstColumnStatus.getRecordsReadInCurrentPass();
-      updateStats();
+      parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
+
       return firstColumnStatus.getRecordsReadInCurrentPass();
     } catch (Exception e) {
       handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() +
@@ -618,7 +653,9 @@ public class ParquetRecordReader extends AbstractRecordReader {
 
 
     if(parquetReaderStats != null) {
-      logger.trace("ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
+      updateStats();
+      logger.trace(
+          "ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
           hadoopPath,
           parquetReaderStats.numDictPageLoads,
           parquetReaderStats.numDataPageLoads,
@@ -636,7 +673,9 @@ public class ParquetRecordReader extends AbstractRecordReader {
           parquetReaderStats.timeDictPagesDecompressed,
           parquetReaderStats.timeDataPagesDecompressed,
           parquetReaderStats.timeDiskScanWait,
-          parquetReaderStats.timeDiskScan
+          parquetReaderStats.timeDiskScan,
+          parquetReaderStats.timeFixedColumnRead,
+          parquetReaderStats.timeVarColumnRead
       );
       parquetReaderStats=null;
     }
@@ -645,37 +684,40 @@ public class ParquetRecordReader extends AbstractRecordReader {
 
   private void updateStats(){
 
-    operatorContext.getStats().setLongStat(Metric.NUM_DICT_PAGE_LOADS,
+    operatorContext.getStats().addLongStat(Metric.NUM_DICT_PAGE_LOADS,
         parquetReaderStats.numDictPageLoads.longValue());
-    operatorContext.getStats().setLongStat(Metric.NUM_DATA_PAGE_lOADS, parquetReaderStats.numDataPageLoads.longValue());
-    operatorContext.getStats().setLongStat(Metric.NUM_DATA_PAGES_DECODED, parquetReaderStats.numDataPagesDecoded.longValue());
-    operatorContext.getStats().setLongStat(Metric.NUM_DICT_PAGES_DECOMPRESSED,
+    operatorContext.getStats().addLongStat(Metric.NUM_DATA_PAGE_lOADS, parquetReaderStats.numDataPageLoads.longValue());
+    operatorContext.getStats().addLongStat(Metric.NUM_DATA_PAGES_DECODED, parquetReaderStats.numDataPagesDecoded.longValue());
+    operatorContext.getStats().addLongStat(Metric.NUM_DICT_PAGES_DECOMPRESSED,
         parquetReaderStats.numDictPagesDecompressed.longValue());
-    operatorContext.getStats().setLongStat(Metric.NUM_DATA_PAGES_DECOMPRESSED,
+    operatorContext.getStats().addLongStat(Metric.NUM_DATA_PAGES_DECOMPRESSED,
         parquetReaderStats.numDataPagesDecompressed.longValue());
-    operatorContext.getStats().setLongStat(Metric.TOTAL_DICT_PAGE_READ_BYTES,
+    operatorContext.getStats().addLongStat(Metric.TOTAL_DICT_PAGE_READ_BYTES,
         parquetReaderStats.totalDictPageReadBytes.longValue());
-    operatorContext.getStats().setLongStat(Metric.TOTAL_DATA_PAGE_READ_BYTES,
+    operatorContext.getStats().addLongStat(Metric.TOTAL_DATA_PAGE_READ_BYTES,
         parquetReaderStats.totalDataPageReadBytes.longValue());
-    operatorContext.getStats().setLongStat(Metric.TOTAL_DICT_DECOMPRESSED_BYTES,
+    operatorContext.getStats().addLongStat(Metric.TOTAL_DICT_DECOMPRESSED_BYTES,
         parquetReaderStats.totalDictDecompressedBytes.longValue());
-    operatorContext.getStats().setLongStat(Metric.TOTAL_DATA_DECOMPRESSED_BYTES,
+    operatorContext.getStats().addLongStat(Metric.TOTAL_DATA_DECOMPRESSED_BYTES,
         parquetReaderStats.totalDataDecompressedBytes.longValue());
-    operatorContext.getStats().setLongStat(Metric.TIME_DICT_PAGE_LOADS,
+    operatorContext.getStats().addLongStat(Metric.TIME_DICT_PAGE_LOADS,
         parquetReaderStats.timeDictPageLoads.longValue());
-    operatorContext.getStats().setLongStat(Metric.TIME_DATA_PAGE_LOADS,
+    operatorContext.getStats().addLongStat(Metric.TIME_DATA_PAGE_LOADS,
         parquetReaderStats.timeDataPageLoads.longValue());
-    operatorContext.getStats().setLongStat(Metric.TIME_DATA_PAGE_DECODE,
+    operatorContext.getStats().addLongStat(Metric.TIME_DATA_PAGE_DECODE,
         parquetReaderStats.timeDataPageDecode.longValue());
-    operatorContext.getStats().setLongStat(Metric.TIME_DICT_PAGE_DECODE,
+    operatorContext.getStats().addLongStat(Metric.TIME_DICT_PAGE_DECODE,
         parquetReaderStats.timeDictPageDecode.longValue());
-    operatorContext.getStats().setLongStat(Metric.TIME_DICT_PAGES_DECOMPRESSED,
+    operatorContext.getStats().addLongStat(Metric.TIME_DICT_PAGES_DECOMPRESSED,
         parquetReaderStats.timeDictPagesDecompressed.longValue());
-    operatorContext.getStats().setLongStat(Metric.TIME_DATA_PAGES_DECOMPRESSED,
+    operatorContext.getStats().addLongStat(Metric.TIME_DATA_PAGES_DECOMPRESSED,
         parquetReaderStats.timeDataPagesDecompressed.longValue());
-    operatorContext.getStats().setLongStat(Metric.TIME_DISK_SCAN_WAIT,
+    operatorContext.getStats().addLongStat(Metric.TIME_DISK_SCAN_WAIT,
         parquetReaderStats.timeDiskScanWait.longValue());
-    operatorContext.getStats().setLongStat(Metric.TIME_DISK_SCAN, parquetReaderStats.timeDiskScan.longValue());
+    operatorContext.getStats().addLongStat(Metric.TIME_DISK_SCAN, parquetReaderStats.timeDiskScan.longValue());
+    operatorContext.getStats().addLongStat(Metric.TIME_FIXEDCOLUMN_READ, parquetReaderStats.timeFixedColumnRead.longValue());
+    operatorContext.getStats().addLongStat(Metric.TIME_VARCOLUMN_READ, parquetReaderStats.timeVarColumnRead.longValue());
+    operatorContext.getStats().addLongStat(Metric.TIME_PROCESS, parquetReaderStats.timeProcess.longValue());
 
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/05201010/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index 7bcce11..9bfc3aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -17,15 +17,16 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.vector.ValueVector;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 public class VarLenBinaryReader {
 
@@ -36,8 +37,7 @@ public class VarLenBinaryReader {
   public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn<? extends ValueVector>> columns) {
     this.parentReader = parentReader;
     this.columns = columns;
-    useAsyncTasks = parentReader.getFragmentContext().getOptions()
-        .getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
+    useAsyncTasks = parentReader.useAsyncColReader;
   }
 
   /**
@@ -56,6 +56,7 @@ public class VarLenBinaryReader {
     for (VarLengthColumn<?> columnReader : columns) {
       columnReader.reset();
     }
+    Stopwatch timer = Stopwatch.createStarted();
 
     recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass);
     if(useAsyncTasks){
@@ -63,6 +64,9 @@ public class VarLenBinaryReader {
     }else{
       readRecordsSerial(recordsReadInCurrentPass);
     }
+
+    parentReader.parquetReaderStats.timeVarColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
+
     return recordsReadInCurrentPass;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/05201010/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
index 327c9a1..d9f1401 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.util.filereader;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.parquet.hadoop.util.CompatibilityUtil;
@@ -26,6 +27,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
 
 /**
  * <code>BufferedDirectBufInputStream</code>  reads from the
@@ -43,7 +45,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
 
   private static final int DEFAULT_BUFFER_SIZE = 8192 * 1024; // 8 MiB
   private static final int DEFAULT_TEMP_BUFFER_SIZE = 8192; // 8 KiB
-  private static final int SMALL_BUFFER_SIZE = 64 * 1024; // 64 KiB
+  private static final int SMALL_BUFFER_SIZE = 256 * 1024; // 256 KiB
 
   /**
    * The internal buffer to keep data read from the underlying inputStream.
@@ -89,8 +91,8 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
    * with the default (8 MiB) buffer size.
    */
   public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id,
-      long startOffset, long totalByteSize, boolean enableHints) {
-    this(in, allocator, id, startOffset, totalByteSize, DEFAULT_BUFFER_SIZE, enableHints);
+      long startOffset, long totalByteSize, boolean enforceTotalByteSize, boolean enableHints) {
+    this(in, allocator, id, startOffset, totalByteSize, DEFAULT_BUFFER_SIZE, enforceTotalByteSize, enableHints);
   }
 
   /**
@@ -98,8 +100,8 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
    * with the specified buffer size.
    */
   public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id,
-      long startOffset, long totalByteSize, int bufSize, boolean enableHints) {
-    super(in, allocator, id, startOffset, totalByteSize, enableHints);
+      long startOffset, long totalByteSize, int bufSize, boolean enforceTotalByteSize, boolean enableHints) {
+    super(in, allocator, id, startOffset, totalByteSize, enforceTotalByteSize, enableHints);
     Preconditions.checkArgument(bufSize >= 0);
     // We make the buffer size the smaller of the buffer Size parameter or the total Byte Size
     // rounded to next highest pwoer of two
@@ -120,10 +122,6 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
     super.init();
     this.internalBuffer = this.allocator.buffer(this.bufSize);
     this.tempBuffer = this.allocator.buffer(DEFAULT_TEMP_BUFFER_SIZE);
-    int bytesRead = getNextBlock();
-    if (bytesRead <= 0) {
-      throw new IOException("End of stream reached while initializing buffered reader.");
-    }
   }
 
   private DrillBuf reallocBuffer(int newSize ){
@@ -148,16 +146,29 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
     buffer.clear();
     this.count = this.curPosInBuffer = 0;
 
+    if(logger.isTraceEnabled()) {
+      logger.trace(
+          "PERF: Disk read start. {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, Count: {}, " + "CurPosInStream: {}, CurPosInBuffer: {}", this.streamId, this.startOffset,
+          this.totalByteSize, this.bufSize, this.count, this.curPosInStream, this.curPosInBuffer);
+    }
+    Stopwatch timer = Stopwatch.createStarted();
+    int bytesToRead = 0;
     // We *cannot* rely on the totalByteSize being correct because
     // metadata for Parquet files is incorrect (sometimes). So we read
     // beyond the totalByteSize parameter. However, to prevent ourselves from reading too
     // much data, we reduce the size of the buffer, down to 64KiB.
-    if (buffer.capacity() >= (totalByteSize + startOffset - curPosInStream)) {
-      if (buffer.capacity() > SMALL_BUFFER_SIZE) {
-        buffer = this.reallocBuffer(SMALL_BUFFER_SIZE);
+    if(enforceTotalByteSize) {
+      bytesToRead = (buffer.capacity() >= (totalByteSize + startOffset - curPosInStream)) ?
+          (int) (totalByteSize + startOffset - curPosInStream ):
+          buffer.capacity();
+    } else {
+      if (buffer.capacity() >= (totalByteSize + startOffset - curPosInStream)) {
+        if (buffer.capacity() > SMALL_BUFFER_SIZE) {
+          buffer = this.reallocBuffer(SMALL_BUFFER_SIZE);
+        }
       }
+      bytesToRead = buffer.capacity();
     }
-    int bytesToRead = buffer.capacity();
 
     ByteBuffer directBuffer = buffer.nioBuffer(curPosInBuffer, bytesToRead);
     // The DFS can return *more* bytes than requested if the capacity of the buffer is greater.
@@ -178,11 +189,13 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
         this.count = nBytes + this.curPosInBuffer;
         this.curPosInStream = getInputStream().getPos();
         bytesRead = nBytes;
-        logger.trace(
-            "Stream: {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, "
-                + "CurPosInStream: {}, CurPosInBuffer: {}", this.streamId, this.startOffset,
-            this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream,
-            this.curPosInBuffer);
+        if(logger.isTraceEnabled()) {
+          logger.trace(
+              "PERF: Disk read complete. {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, "
+                  + "CurPosInStream: {}, CurPosInBuffer: {}, Time: {} ms", this.streamId, this.startOffset,
+              this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, this.curPosInBuffer, ((double) timer.elapsed(TimeUnit.MICROSECONDS))
+                  / 1000);
+        }
       }
     }
     return this.count - this.curPosInBuffer;

http://git-wip-us.apache.org/repos/asf/drill/blob/05201010/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
index f11ad1f..265f7a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.util.filereader;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.hadoop.fs.ByteBufferReadable;
@@ -30,6 +31,7 @@ import java.io.InputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
 
 public class DirectBufInputStream extends FilterInputStream {
 
@@ -46,13 +48,16 @@ public class DirectBufInputStream extends FilterInputStream {
    */
   protected final long totalByteSize;
 
+  // if true, the input stream willreturn EOF if we have read upto totalByteSize bytes
+  protected final boolean enforceTotalByteSize;
+
   /**
    * The offset in the underlying stream to start reading from
    */
   protected final long startOffset;
 
   public DirectBufInputStream(InputStream in, BufferAllocator allocator, String id, long startOffset,
-      long totalByteSize, boolean enableHints) {
+      long totalByteSize, boolean enforceTotalByteSize, boolean enableHints) {
     super(in);
     Preconditions.checkArgument(startOffset >= 0);
     Preconditions.checkArgument(totalByteSize >= 0);
@@ -60,6 +65,7 @@ public class DirectBufInputStream extends FilterInputStream {
     this.allocator = allocator;
     this.startOffset = startOffset;
     this.totalByteSize = totalByteSize;
+    this.enforceTotalByteSize = enforceTotalByteSize;
     this.enableHints = enableHints;
   }
 
@@ -81,7 +87,18 @@ public class DirectBufInputStream extends FilterInputStream {
     ByteBuffer directBuffer = buf.nioBuffer(0, len);
     int lengthLeftToRead = len;
     while (lengthLeftToRead > 0) {
-      lengthLeftToRead -= CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead);
+      if(logger.isTraceEnabled()) {
+        logger.trace("PERF: Disk read start. {}, StartOffset: {}, TotalByteSize: {}", this.streamId, this.startOffset, this.totalByteSize);
+      }
+      Stopwatch timer = Stopwatch.createStarted();
+      int bytesRead = CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead);
+      lengthLeftToRead -= bytesRead;
+      if(logger.isTraceEnabled()) {
+        logger.trace(
+            "PERF: Disk read complete. {}, StartOffset: {}, TotalByteSize: {}, BytesRead: {}, Time: {} ms",
+            this.streamId, this.startOffset, this.totalByteSize, bytesRead,
+            ((double) timer.elapsed(TimeUnit.MICROSECONDS)) / 1000);
+      }
     }
     buf.writerIndex(len);
     return len;