You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/11/05 00:11:29 UTC

[02/10] drill git commit: DRILL-4800: Add AsyncPageReader to pipeline PageRead Use non tracking input stream for Parquet scans. Make choice between async and sync reader configurable. Make various options user configurable - choose between sync and async

DRILL-4800: Add AsyncPageReader to pipeline PageRead Use non tracking input stream for Parquet scans. Make choice between async and sync reader configurable. Make various options user configurable - choose between sync and async page reader, enable/disable fadvise Add Parquet Scan metrics to track time spent in various operations


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f9a443d8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f9a443d8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f9a443d8

Branch: refs/heads/master
Commit: f9a443d8a3d8e81b7e76f161b611003d16a53a4d
Parents: fe2334e
Author: Parth Chandra <pa...@apache.org>
Authored: Tue Sep 27 14:03:35 2016 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed Nov 2 17:57:46 2016 -0700

----------------------------------------------------------------------
 .../src/resources/drill-override-example.conf   |   4 +
 .../org/apache/drill/exec/ExecConstants.java    |  15 +
 .../apache/drill/exec/ops/OperatorContext.java  |   6 +
 .../drill/exec/ops/OperatorContextImpl.java     |  21 +-
 .../drill/exec/ops/OperatorMetricRegistry.java  |   2 +
 .../apache/drill/exec/ops/OperatorStats.java    |  16 +-
 .../drill/exec/server/BootStrapContext.java     |  33 +-
 .../drill/exec/server/DrillbitContext.java      |   6 +
 .../server/options/SystemOptionManager.java     |   5 +-
 .../exec/store/parquet/ParquetReaderStats.java  |  42 +--
 .../store/parquet/ParquetScanBatchCreator.java  |  14 +-
 .../parquet/columnreaders/AsyncPageReader.java  | 332 +++++++++++++++++++
 .../parquet/columnreaders/ColumnReader.java     |  36 +-
 .../store/parquet/columnreaders/PageReader.java | 160 +++++----
 .../columnreaders/ParquetRecordReader.java      | 102 +++++-
 .../columnreaders/VarLenBinaryReader.java       |   6 +-
 .../BufferedDirectBufInputStream.java           |  51 ++-
 .../src/main/resources/drill-module.conf        |   4 +
 .../src/main/resources/rest/profile/profile.ftl |   2 +-
 19 files changed, 684 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index 52949db..4be4aa2 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -166,6 +166,10 @@ drill.exec: {
       initial: 20000000
     }
   },
+  scan: {
+    threadpool_size: 8,
+    decode_threadpool_size: 1
+  },
   debug.error_on_leak: true
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index ba6b084..a13fd71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -96,6 +96,10 @@ public interface ExecConstants {
   /** Size of JDBC batch queue (in batches) above which throttling begins. */
   String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
       "drill.jdbc.batch_queue_throttling_threshold";
+  // Thread pool size for scan threads. Used by the Parquet scan.
+  String SCAN_THREADPOOL_SIZE = "drill.exec.scan.threadpool_size";
+  // The size of the thread pool used by a scan to decode the data. Used by Parquet
+  String SCAN_DECODE_THREADPOOL_SIZE = "drill.exec.scan.decode_threadpool_size";
 
   /**
    * Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or
@@ -147,10 +151,21 @@ public interface ExecConstants {
   String PARQUET_READER_INT96_AS_TIMESTAMP = "store.parquet.reader.int96_as_timestamp";
   OptionValidator PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP, false);
 
+  String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async";
+  OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC, true);
+
   // Use a buffering reader for parquet page reader
   String PARQUET_PAGEREADER_USE_BUFFERED_READ = "store.parquet.reader.pagereader.bufferedread";
   OptionValidator PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR = new  BooleanValidator(PARQUET_PAGEREADER_USE_BUFFERED_READ, true);
 
+  // Size in MiB of the buffer the Parquet page reader will use to read from disk. Default is 8 MiB
+  String PARQUET_PAGEREADER_BUFFER_SIZE = "store.parquet.reader.pagereader.buffersize";
+  OptionValidator PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR = new  LongValidator(PARQUET_PAGEREADER_BUFFER_SIZE, 4*1024*1024);
+
+  // try to use fadvise if available
+  String PARQUET_PAGEREADER_USE_FADVISE = "store.parquet.reader.pagereader.usefadvise";
+  OptionValidator PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR = new  BooleanValidator(PARQUET_PAGEREADER_USE_FADVISE, false);
+
   OptionValidator COMPILE_SCALAR_REPLACEMENT = new BooleanValidator("exec.compile.scalar_replacement", false);
 
   String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 33fa288..92a7269 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -47,10 +47,16 @@ public abstract class OperatorContext {
 
   public abstract ExecutorService getExecutor();
 
+  public abstract ExecutorService getScanExecutor();
+
+  public abstract ExecutorService getScanDecodeExecutor();
+
   public abstract ExecutionControls getExecutionControls();
 
   public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException;
 
+  public abstract DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException;
+
   /**
    * Run the callable as the given proxy user.
    *

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 85f0ccb..38ddd16 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -50,6 +50,8 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
   private final BufferManager manager;
   private DrillFileSystem fs;
   private final ExecutorService executor;
+  private final ExecutorService scanExecutor;
+  private final ExecutorService scanDecodeExecutor;
 
   /**
    * This lazily initialized executor service is used to submit a {@link Callable task} that needs a proxy user. There
@@ -70,6 +72,8 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     stats = context.getStats().newOperatorStats(def, allocator);
     executionControls = context.getExecutionControls();
     executor = context.getDrillbitContext().getExecutor();
+    scanExecutor = context.getDrillbitContext().getScanExecutor();
+    scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
   }
 
   public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats)
@@ -81,6 +85,8 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     this.stats     = stats;
     executionControls = context.getExecutionControls();
     executor = context.getDrillbitContext().getExecutor();
+    scanExecutor = context.getDrillbitContext().getScanExecutor();
+    scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
   }
 
   public DrillBuf replace(DrillBuf old, int newSize) {
@@ -95,10 +101,16 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     return manager.getManagedBuffer(size);
   }
 
-  // Allow and operator to use the thread pool
+  // Allow an operator to use the thread pool
   public ExecutorService getExecutor() {
     return executor;
   }
+  public ExecutorService getScanExecutor() {
+    return scanExecutor;
+  }
+  public ExecutorService getScanDecodeExecutor() {
+    return scanDecodeExecutor;
+  }
 
   public ExecutionControls getExecutionControls() {
     return executionControls;
@@ -179,4 +191,11 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     return fs;
   }
 
+  @Override
+  public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException {
+    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
+    fs = new DrillFileSystem(conf, null);
+    return fs;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
index b704bb6..0424332 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
 import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
 import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
+import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 
 /**
@@ -47,6 +48,7 @@ public class OperatorMetricRegistry {
     register(CoreOperatorType.HASH_AGGREGATE_VALUE, HashAggTemplate.Metric.class);
     register(CoreOperatorType.HASH_JOIN_VALUE, HashJoinBatch.Metric.class);
     register(CoreOperatorType.EXTERNAL_SORT_VALUE, ExternalSortBatch.Metric.class);
+    register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, ParquetRecordReader.Metric.class);
   }
 
   private static void register(final int operatorType, final Class<? extends MetricDef> metricDef) {

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index 271f734..b565774 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -131,7 +131,7 @@ public class OperatorStats {
   /**
    * Clear stats
    */
-  public void clear() {
+  public synchronized void clear() {
     processingNanos = 0l;
     setupNanos = 0l;
     waitNanos = 0l;
@@ -139,47 +139,47 @@ public class OperatorStats {
     doubleMetrics.clear();
   }
 
-  public void startSetup() {
+  public synchronized void startSetup() {
     assert !inSetup  : assertionError("starting setup");
     stopProcessing();
     inSetup = true;
     setupMark = System.nanoTime();
   }
 
-  public void stopSetup() {
+  public synchronized void stopSetup() {
     assert inSetup :  assertionError("stopping setup");
     startProcessing();
     setupNanos += System.nanoTime() - setupMark;
     inSetup = false;
   }
 
-  public void startProcessing() {
+  public synchronized void startProcessing() {
     assert !inProcessing : assertionError("starting processing");
     processingMark = System.nanoTime();
     inProcessing = true;
   }
 
-  public void stopProcessing() {
+  public synchronized void stopProcessing() {
     assert inProcessing : assertionError("stopping processing");
     processingNanos += System.nanoTime() - processingMark;
     inProcessing = false;
   }
 
-  public void startWait() {
+  public synchronized void startWait() {
     assert !inWait : assertionError("starting waiting");
     stopProcessing();
     inWait = true;
     waitMark = System.nanoTime();
   }
 
-  public void stopWait() {
+  public synchronized void stopWait() {
     assert inWait : assertionError("stopping waiting");
     startProcessing();
     waitNanos += System.nanoTime() - waitMark;
     inWait = false;
   }
 
-  public void batchReceived(int inputIndex, long records, boolean newSchema) {
+  public synchronized void batchReceived(int inputIndex, long records, boolean newSchema) {
     recordsReceivedByInput[inputIndex] += records;
     batchesReceivedByInput[inputIndex]++;
     if(newSchema){

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index 6554e33..adb6323 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -20,11 +20,11 @@ package org.apache.drill.exec.server;
 import com.codahale.metrics.MetricRegistry;
 import io.netty.channel.EventLoopGroup;
 
+import java.util.concurrent.Executors;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.SynchronousQueue;
 import org.apache.drill.common.DrillAutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.scanner.persistence.ScanResult;
@@ -37,6 +37,7 @@ import org.apache.drill.exec.rpc.TransportCheck;
 
 public class BootStrapContext implements AutoCloseable {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BootStrapContext.class);
+  private static final int MIN_SCAN_THREADPOOL_SIZE = 8; // Magic num
 
   private final DrillConfig config;
   private final EventLoopGroup loop;
@@ -45,12 +46,15 @@ public class BootStrapContext implements AutoCloseable {
   private final BufferAllocator allocator;
   private final ScanResult classpathScan;
   private final ExecutorService executor;
+  private final ExecutorService scanExecutor;
+  private final ExecutorService scanDecodeExecutor;
 
   public BootStrapContext(DrillConfig config, ScanResult classpathScan) {
     this.config = config;
     this.classpathScan = classpathScan;
     this.loop = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitServer-");
-    this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitClient-");
+    this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS),
+        "BitClient-");
     // Note that metrics are stored in a static instance
     this.metrics = DrillMetrics.getRegistry();
     this.allocator = RootAllocatorFactory.newRoot(config);
@@ -65,12 +69,35 @@ public class BootStrapContext implements AutoCloseable {
         super.afterExecute(r, t);
       }
     };
+    // Setup two threadpools one for reading raw data from disk and another for decoding the data
+    // A good guideline is to have the number threads in the scan pool to be a multiple (fractional
+    // numbers are ok) of the number of disks.
+    // A good guideline is to have the number threads in the decode pool to be a small multiple (fractional
+    // numbers are ok) of the number of cores.
+    final int numCores = Runtime.getRuntime().availableProcessors();
+    final int numScanThreads = (int) (config.getDouble(ExecConstants.SCAN_THREADPOOL_SIZE));
+    final int numScanDecodeThreads = (int) config.getDouble(ExecConstants.SCAN_DECODE_THREADPOOL_SIZE);
+    final int scanThreadPoolSize =
+        MIN_SCAN_THREADPOOL_SIZE > numScanThreads ? MIN_SCAN_THREADPOOL_SIZE : numScanThreads;
+    final int scanDecodeThreadPoolSize = numCores > numScanDecodeThreads ? numCores : numScanDecodeThreads;
+
+    this.scanExecutor = Executors.newFixedThreadPool(scanThreadPoolSize, new NamedThreadFactory("scan-"));
+    this.scanDecodeExecutor =
+        Executors.newFixedThreadPool(scanDecodeThreadPoolSize, new NamedThreadFactory("scan-decode-"));
   }
 
   public ExecutorService getExecutor() {
     return executor;
   }
 
+  public ExecutorService getScanExecutor() {
+    return scanExecutor;
+  }
+
+  public ExecutorService getScanDecodeExecutor() {
+    return scanDecodeExecutor;
+  }
+
   public DrillConfig getConfig() {
     return config;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 3eb87ea..ffe6c28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -172,6 +172,12 @@ public class DrillbitContext implements AutoCloseable {
   public ExecutorService getExecutor() {
     return context.getExecutor();
   }
+  public ExecutorService getScanExecutor() {
+    return context.getScanExecutor();
+  }
+  public ExecutorService getScanDecodeExecutor() {
+    return context.getScanDecodeExecutor();
+  }
 
   public LogicalPlanPersistence getLpPersistence() {
     return lpPersistence;

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 8b67fdb..1981d24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -99,8 +99,11 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR,
       ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR,
       ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
-      ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR,
       ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
+      ExecConstants.PARQUET_PAGEREADER_ASYNC_VALIDATOR,
+      ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR,
+      ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR,
+      ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR,
       ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR,
       ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR,
       ExecConstants.ENABLE_UNION_TYPE,

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
index e95b0c8..c2711cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java
@@ -17,28 +17,30 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 public class ParquetReaderStats {
 
-  public long numDictPageHeaders;
-  public long numPageHeaders;
-  public long numDictPageLoads;
-  public long numPageLoads;
-  public long numDictPagesDecompressed;
-  public long numPagesDecompressed;
-
-  public long totalDictPageHeaderBytes;
-  public long totalPageHeaderBytes;
-  public long totalDictPageReadBytes;
-  public long totalPageReadBytes;
-  public long totalDictDecompressedBytes;
-  public long totalDecompressedBytes;
-
-  public long timeDictPageHeaders;
-  public long timePageHeaders;
-  public long timeDictPageLoads;
-  public long timePageLoads;
-  public long timeDictPagesDecompressed;
-  public long timePagesDecompressed;
+  public AtomicLong numDictPageLoads = new AtomicLong();
+  public AtomicLong numDataPageLoads = new AtomicLong();
+  public AtomicLong numDataPagesDecoded = new AtomicLong();
+  public AtomicLong numDictPagesDecompressed = new AtomicLong();
+  public AtomicLong numDataPagesDecompressed = new AtomicLong();
+
+  public AtomicLong totalDictPageReadBytes = new AtomicLong();
+  public AtomicLong totalDataPageReadBytes = new AtomicLong();
+  public AtomicLong totalDictDecompressedBytes = new AtomicLong();
+  public AtomicLong totalDataDecompressedBytes = new AtomicLong();
+
+  public AtomicLong timeDictPageLoads = new AtomicLong();
+  public AtomicLong timeDataPageLoads = new AtomicLong();
+  public AtomicLong timeDataPageDecode = new AtomicLong();
+  public AtomicLong timeDictPageDecode = new AtomicLong();
+  public AtomicLong timeDictPagesDecompressed = new AtomicLong();
+  public AtomicLong timeDataPagesDecompressed = new AtomicLong();
+
+  public AtomicLong timeDiskScanWait = new AtomicLong();
+  public AtomicLong timeDiskScan = new AtomicLong();
 
   public ParquetReaderStats() {
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index a98c660..a14bab5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -73,10 +73,18 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
 
     DrillFileSystem fs;
     try {
-      fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf());
-    } catch(IOException e) {
-      throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
+      boolean useAsyncPageReader =
+          context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
+      if (useAsyncPageReader) {
+        fs = oContext.newNonTrackingFileSystem(rowGroupScan.getStorageEngine().getFsConf());
+      } else {
+        fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf());
+      }
+    } catch (IOException e) {
+      throw new ExecutionSetupException(
+          String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
     }
+
     Configuration conf = new Configuration(fs.getConf());
     conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
     conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
new file mode 100644
index 0000000..3f47f04
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -0,0 +1,332 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import com.google.common.base.Stopwatch;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.parquet.column.Encoding.valueOf;
+
+class AsyncPageReader extends PageReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class);
+
+
+  private ExecutorService threadPool;
+  private Future<ReadStatus> asyncPageRead;
+
+  AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path,
+      ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException {
+    super(parentStatus, fs, path, columnChunkMetaData);
+    if (threadPool == null) {
+      threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+    }
+    asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+  }
+
+  @Override protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
+      final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException {
+    if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+      try {
+        dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos());
+      } catch (IOException e) {
+        handleAndThrowException(e, "Error Reading dictionary page.");
+      }
+      // parent constructor may call this method before the thread pool is set.
+      if (threadPool == null) {
+        threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
+      }
+      asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+      readDictionaryPage(asyncPageRead, parentStatus);
+      asyncPageRead = null; // reset after consuming
+    }
+  }
+
+  private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
+    DrillBuf data;
+    boolean isDictionary = false;
+    synchronized (this) {
+      data = readStatus.getPageData();
+      readStatus.setPageData(null);
+      isDictionary = readStatus.isDictionaryPage;
+    }
+    if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) {
+      DrillBuf uncompressedData = data;
+      data = decompress(readStatus.getPageHeader(), uncompressedData);
+      synchronized (this) {
+        readStatus.setPageData(null);
+      }
+      uncompressedData.release();
+    } else {
+      if (isDictionary) {
+        stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
+      } else {
+        stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead);
+      }
+    }
+    return data;
+  }
+
+  // Read and decode the dictionary and the header
+  private void readDictionaryPage(final Future<ReadStatus> asyncPageRead,
+      final ColumnReader<?> parentStatus) throws UserException {
+    try {
+      Stopwatch timer = Stopwatch.createStarted();
+      ReadStatus readStatus = asyncPageRead.get();
+      long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
+      stats.timeDiskScanWait.addAndGet(timeBlocked);
+      stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
+      stats.numDictPageLoads.incrementAndGet();
+      stats.timeDictPageLoads.addAndGet(timeBlocked+readStatus.getDiskScanTime());
+      readDictionaryPageData(readStatus, parentStatus);
+    } catch (Exception e) {
+      handleAndThrowException(e, "Error reading dictionary page.");
+    }
+  }
+
+  // Read and decode the dictionary data
+  private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader<?> parentStatus)
+      throws UserException {
+    try {
+      pageHeader = readStatus.getPageHeader();
+      int uncompressedSize = pageHeader.getUncompressed_page_size();
+      final DrillBuf dictionaryData = getDecompressedPageData(readStatus);
+      Stopwatch timer = Stopwatch.createStarted();
+      allocatedDictionaryBuffers.add(dictionaryData);
+      DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0, uncompressedSize),
+          pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values,
+          valueOf(pageHeader.dictionary_page_header.encoding.name()));
+      this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
+      long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS);
+      stats.timeDictPageDecode.addAndGet(timeToDecode);
+    } catch (Exception e) {
+      handleAndThrowException(e, "Error decoding dictionary page.");
+    }
+  }
+
+  private void handleAndThrowException(Exception e, String msg) throws UserException {
+    UserException ex = UserException.dataReadError(e).message(msg)
+        .pushContext("Row Group Start: ", this.parentColumnReader.columnChunkMetaData.getStartingPos())
+        .pushContext("Column: ", this.parentColumnReader.schemaElement.getName())
+        .pushContext("File: ", this.fileName).build(logger);
+    throw ex;
+  }
+
+  private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) {
+    DrillBuf pageDataBuf = null;
+    Stopwatch timer = Stopwatch.createUnstarted();
+    long timeToRead;
+    int compressedSize = pageHeader.getCompressed_page_size();
+    int uncompressedSize = pageHeader.getUncompressed_page_size();
+    pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
+    try {
+      timer.start();
+      codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec())
+          .decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
+              pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize);
+      timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+      this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize);
+    } catch (IOException e) {
+      handleAndThrowException(e, "Error decompressing data.");
+    }
+    return pageDataBuf;
+  }
+
+  @Override protected void nextInternal() throws IOException {
+    ReadStatus readStatus = null;
+    try {
+      Stopwatch timer = Stopwatch.createStarted();
+      readStatus = asyncPageRead.get();
+      long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
+      stats.timeDiskScanWait.addAndGet(timeBlocked);
+      stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
+      if (readStatus.isDictionaryPage) {
+        stats.numDictPageLoads.incrementAndGet();
+        stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+      } else {
+        stats.numDataPageLoads.incrementAndGet();
+        stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
+      }
+      pageHeader = readStatus.getPageHeader();
+      // reset this. At the time of calling close, if this is not null then a pending asyncPageRead needs to be consumed
+      asyncPageRead = null;
+    } catch (Exception e) {
+      handleAndThrowException(e, "Error reading page data.");
+    }
+
+    // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
+    // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
+
+    do {
+      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
+        readDictionaryPageData(readStatus, parentColumnReader);
+        // Ugly. Use the Async task to make a synchronous read call.
+        readStatus = new AsyncPageReaderTask().call();
+        pageHeader = readStatus.getPageHeader();
+      }
+    } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
+
+    if (dataReader.hasRemainder() && parentColumnReader.totalValuesRead + readStatus.getValuesRead()
+        < parentColumnReader.columnChunkMetaData.getValueCount()) {
+      asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
+    }
+
+    pageHeader = readStatus.getPageHeader();
+    pageData = getDecompressedPageData(readStatus);
+
+  }
+
+
+  @Override public void clear() {
+    if (asyncPageRead != null) {
+      asyncPageRead.cancel(true);
+      try {
+        ReadStatus r = asyncPageRead.get();
+        r.getPageData().release();
+      } catch (Exception e) {
+        // Do nothing.
+      }
+    }
+    super.clear();
+  }
+
+  public static class ReadStatus {
+    private PageHeader pageHeader;
+    private DrillBuf pageData;
+    private boolean isDictionaryPage = false;
+    private long bytesRead = 0;
+    private long valuesRead = 0;
+    private long diskScanTime = 0;
+
+    public synchronized PageHeader getPageHeader() {
+      return pageHeader;
+    }
+
+    public synchronized void setPageHeader(PageHeader pageHeader) {
+      this.pageHeader = pageHeader;
+    }
+
+    public synchronized DrillBuf getPageData() {
+      return pageData;
+    }
+
+    public synchronized void setPageData(DrillBuf pageData) {
+      this.pageData = pageData;
+    }
+
+    public synchronized boolean isDictionaryPage() {
+      return isDictionaryPage;
+    }
+
+    public synchronized void setIsDictionaryPage(boolean isDictionaryPage) {
+      this.isDictionaryPage = isDictionaryPage;
+    }
+
+    public synchronized long getBytesRead() {
+      return bytesRead;
+    }
+
+    public synchronized void setBytesRead(long bytesRead) {
+      this.bytesRead = bytesRead;
+    }
+
+    public synchronized long getValuesRead() {
+      return valuesRead;
+    }
+
+    public synchronized void setValuesRead(long valuesRead) {
+      this.valuesRead = valuesRead;
+    }
+
+    public long getDiskScanTime() {
+      return diskScanTime;
+    }
+
+    public void setDiskScanTime(long diskScanTime) {
+      this.diskScanTime = diskScanTime;
+    }
+  }
+
+
+  private class AsyncPageReaderTask implements Callable<ReadStatus> {
+
+    private final AsyncPageReader parent = AsyncPageReader.this;
+
+    public AsyncPageReaderTask() {
+    }
+
+    @Override public ReadStatus call() throws IOException {
+      ReadStatus readStatus = new ReadStatus();
+
+      String oldname = Thread.currentThread().getName();
+      Thread.currentThread().setName(parent.parentColumnReader.columnChunkMetaData.toString());
+
+      long bytesRead = 0;
+      long valuesRead = 0;
+      Stopwatch timer = Stopwatch.createStarted();
+
+      DrillBuf pageData = null;
+      try {
+        PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
+        int compressedSize = pageHeader.getCompressed_page_size();
+        pageData = parent.dataReader.getNext(compressedSize);
+        bytesRead = compressedSize;
+        synchronized (parent) {
+          if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
+            readStatus.setIsDictionaryPage(true);
+            valuesRead += pageHeader.getDictionary_page_header().getNum_values();
+          } else {
+            valuesRead += pageHeader.getData_page_header().getNum_values();
+          }
+          long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
+          readStatus.setPageHeader(pageHeader);
+          readStatus.setPageData(pageData);
+          readStatus.setBytesRead(bytesRead);
+          readStatus.setValuesRead(valuesRead);
+          readStatus.setDiskScanTime(timeToRead);
+        }
+
+      } catch (Exception e) {
+        if (pageData != null) {
+          pageData.release();
+        }
+        throw e;
+      }
+      Thread.currentThread().setName(oldname);
+      return readStatus;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index f62f424..6572c78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -22,6 +22,8 @@ import io.netty.buffer.DrillBuf;
 import java.io.IOException;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -70,7 +72,7 @@ public abstract class ColumnReader<V extends ValueVector> {
   protected DrillBuf vectorData;
   // when reading definition levels for nullable columns, it is a one-way stream of integers
   // when reading var length data, where we don't know if all of the records will fit until we've read all of them
-  // we must store the last definition level an use it in at the start of the next batch
+  // we must store the last definition level and use it at the start of the next batch
   int currDefLevel;
 
   // variables for a single read pass
@@ -84,7 +86,17 @@ public abstract class ColumnReader<V extends ValueVector> {
     this.isFixedLength = fixedLength;
     this.schemaElement = schemaElement;
     this.valueVec =  v;
-    this.pageReader = new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(), columnChunkMetaData);
+    boolean useAsyncPageReader  = parentReader.getFragmentContext().getOptions()
+        .getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
+    if (useAsyncPageReader) {
+      this.pageReader =
+          new AsyncPageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(),
+              columnChunkMetaData);
+    } else {
+      this.pageReader =
+          new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(),
+              columnChunkMetaData);
+    }
 
     if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
       if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
@@ -117,11 +129,23 @@ public abstract class ColumnReader<V extends ValueVector> {
   }
 
   public void readValues(long recordsToRead) {
-    readField(recordsToRead);
+    try {
+      readField(recordsToRead);
+
+      valuesReadInCurrentPass += recordsReadInThisIteration;
+      pageReader.valuesRead += recordsReadInThisIteration;
+      pageReader.readPosInBytes = readStartInBytes + readLength;
+    } catch (Exception e) {
+      UserException ex = UserException.dataReadError(e)
+          .message("Error reading from Parquet file")
+          .pushContext("Row Group Start: ", this.columnChunkMetaData.getStartingPos())
+          .pushContext("Column: ", this.schemaElement.getName())
+          .pushContext("File: ", this.parentReader.getHadoopPath().toString() )
+          .build(logger);
+      throw ex;
+
+    }
 
-    valuesReadInCurrentPass += recordsReadInThisIteration;
-    pageReader.valuesRead += recordsReadInThisIteration;
-    pageReader.readPosInBytes = readStartInBytes + readLength;
   }
 
   protected abstract void readField(long recordsToRead);

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 078e4ce..c34ebd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -17,7 +17,6 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
-import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream;
@@ -36,7 +35,6 @@ import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.ValuesType;
 import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
 import org.apache.parquet.format.PageHeader;
@@ -58,25 +56,23 @@ import static org.apache.parquet.column.Encoding.valueOf;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
 
 // class to keep track of the read position of variable length columns
-final class PageReader {
+class PageReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(
       org.apache.drill.exec.store.parquet.columnreaders.PageReader.class);
 
   public static final ParquetMetadataConverter METADATA_CONVERTER = ParquetFormatPlugin.parquetMetadataConverter;
 
-  private final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentColumnReader;
+  protected final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentColumnReader;
   //private final ColumnDataReader dataReader;
-  private final DirectBufInputStream dataReader;
+  protected final DirectBufInputStream dataReader;
   //der; buffer to store bytes of current page
-  DrillBuf pageData;
+  protected DrillBuf pageData;
 
   // for variable length data we need to keep track of our current position in the page data
   // as the values and lengths are intermixed, making random access to the length data impossible
   long readyToReadPosInBytes;
   // read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData
   long readPosInBytes;
-  // bit shift needed for the next page if the last one did not line up with a byte boundary
-  int bitShift;
   // storage space for extra bits at the end of a page if they did not line up with a byte boundary
   // prevents the need to keep the entire last page, as these pageDataByteArray need to be added to the next batch
   //byte extraBits;
@@ -103,14 +99,18 @@ final class PageReader {
 
   int currentPageCount = -1;
 
-  private FSDataInputStream inputStream;
+  protected FSDataInputStream inputStream;
 
   // These need to be held throughout reading of the entire column chunk
   List<ByteBuf> allocatedDictionaryBuffers;
 
-  private final CodecFactory codecFactory;
+  protected final CodecFactory codecFactory;
+  protected final String fileName;
 
-  private final ParquetReaderStats stats;
+  protected final ParquetReaderStats stats;
+  private final boolean useBufferedReader;
+  private final int scanBufferSize;
+  private final boolean useFadvise;
 
   PageReader(org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
     throws ExecutionSetupException {
@@ -118,21 +118,24 @@ final class PageReader {
     allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
     codecFactory = parentColumnReader.parentReader.getCodecFactory();
     this.stats = parentColumnReader.parentReader.parquetReaderStats;
-    long start = columnChunkMetaData.getFirstDataPageOffset();
+    this.fileName = path.toString();
     try {
       inputStream  = fs.open(path);
       BufferAllocator allocator =  parentColumnReader.parentReader.getOperatorContext().getAllocator();
-      //TODO: make read batch size configurable
       columnChunkMetaData.getTotalUncompressedSize();
-      boolean useBufferedReader  = parentColumnReader.parentReader.getFragmentContext().getOptions()
+      useBufferedReader  = parentColumnReader.parentReader.getFragmentContext().getOptions()
           .getOption(ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ).bool_val;
+      scanBufferSize = parentColumnReader.parentReader.getFragmentContext().getOptions()
+          .getOption(ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE).num_val.intValue();
+      useFadvise = parentColumnReader.parentReader.getFragmentContext().getOptions()
+          .getOption(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE).bool_val;
       if (useBufferedReader) {
         this.dataReader = new BufferedDirectBufInputStream(inputStream, allocator, path.getName(),
-            columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), 8 * 1024 * 1024,
-            true);
+            columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), scanBufferSize,
+            useFadvise);
       } else {
         this.dataReader = new DirectBufInputStream(inputStream, allocator, path.getName(),
-            columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), true);
+            columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), useFadvise);
       }
       dataReader.init();
 
@@ -145,7 +148,7 @@ final class PageReader {
 
   }
 
-  private void loadDictionaryIfExists(final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentStatus,
+  protected void loadDictionaryIfExists(final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentStatus,
       final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws IOException {
     Stopwatch timer = Stopwatch.createUnstarted();
     if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
@@ -153,7 +156,7 @@ final class PageReader {
       long start=dataReader.getPos();
       timer.start();
       final PageHeader pageHeader = Util.readPageHeader(f);
-      long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+      long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
       long pageHeaderBytes=dataReader.getPos()-start;
       this.updateStats(pageHeader, "Page Header", start, timeToRead, pageHeaderBytes, pageHeaderBytes);
       assert pageHeader.type == PageType.DICTIONARY_PAGE;
@@ -178,7 +181,7 @@ final class PageReader {
     this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
   }
 
-  public DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompressedSize) throws IOException {
+  private DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompressedSize) throws IOException {
     DrillBuf pageDataBuf = null;
     Stopwatch timer = Stopwatch.createUnstarted();
     long timeToRead;
@@ -186,7 +189,7 @@ final class PageReader {
     if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED) {
       timer.start();
       pageDataBuf = dataReader.getNext(compressedSize);
-      timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
       this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, uncompressedSize);
     } else {
       DrillBuf compressedData = null;
@@ -195,8 +198,7 @@ final class PageReader {
       try {
       timer.start();
       compressedData = dataReader.getNext(compressedSize);
-       // dataReader.loadPage(compressedData, compressedSize);
-      timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
       timer.reset();
       this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, compressedSize);
       start=dataReader.getPos();
@@ -204,7 +206,7 @@ final class PageReader {
       codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData
           .getCodec()).decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
           pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize);
-        timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+        timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
         this.updateStats(pageHeader, "Decompress", start, timeToRead, compressedSize, uncompressedSize);
       } finally {
         if(compressedData != null) {
@@ -219,25 +221,12 @@ final class PageReader {
     return BytesInput.from(buf.nioBuffer(offset, length), 0, length);
   }
 
+
   /**
-   * Grab the next page.
-   *
-   * @return - if another page was present
-   * @throws IOException
+   * Get the page header and the pageData (uncompressed) for the next page
    */
-  public boolean next() throws IOException {
+  protected void nextInternal() throws IOException{
     Stopwatch timer = Stopwatch.createUnstarted();
-    currentPageCount = -1;
-    valuesRead = 0;
-    valuesReadyToRead = 0;
-
-    // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause
-    // and submit a bug report
-    if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) {
-      return false;
-    }
-    clearBuffers();
-
     // next, we need to decompress the bytes
     // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
     // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
@@ -245,7 +234,7 @@ final class PageReader {
       long start=dataReader.getPos();
       timer.start();
       pageHeader = Util.readPageHeader(dataReader);
-      long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+      long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
       long pageHeaderBytes=dataReader.getPos()-start;
       this.updateStats(pageHeader, "Page Header", start, timeToRead, pageHeaderBytes, pageHeaderBytes);
       logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}","Page Header Read","",
@@ -264,14 +253,33 @@ final class PageReader {
     int uncompressedSize = pageHeader.getUncompressed_page_size();
     pageData = readPage(pageHeader, compressedSize, uncompressedSize);
 
-    currentPageCount = pageHeader.data_page_header.num_values;
-    final int uncompressedPageSize = pageHeader.uncompressed_page_size;
-    final Statistics<?> stats = fromParquetStatistics(pageHeader.data_page_header.getStatistics(), parentColumnReader
-        .getColumnDescriptor().getType());
+  }
+
+  /**
+   * Grab the next page.
+   *
+   * @return - if another page was present
+   * @throws IOException
+   */
+  public boolean next() throws IOException {
+    Stopwatch timer = Stopwatch.createUnstarted();
+    currentPageCount = -1;
+    valuesRead = 0;
+    valuesReadyToRead = 0;
 
+    // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause
+    // and submit a bug report
+    if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) {
+      return false;
+    }
+    clearBuffers();
 
-    final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
+    nextInternal();
 
+    timer.start();
+    currentPageCount = pageHeader.data_page_header.num_values;
+
+    final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
     final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
     final Encoding valueEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.encoding);
 
@@ -321,40 +329,24 @@ final class PageReader {
     // fit one record at a time, such as for variable length data. Both operations must start in the same location after the
     // definition and repetition level data which is stored alongside the page data itself
     readyToReadPosInBytes = readPosInBytes;
+    long timeDecode = timer.elapsed(TimeUnit.NANOSECONDS);
+    stats.numDataPagesDecoded.incrementAndGet();
+    stats.timeDataPageDecode.addAndGet(timeDecode);
     return true;
   }
 
   /**
-   * Allocate a page data buffer. Note that only one page data buffer should be active at a time. The reader will ensure
-   * that the page data is released after the reader is completed.
-   */
-  private void allocatePageData(int size) {
-    Preconditions.checkArgument(pageData == null);
-    pageData = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
-  }
-
-  /**
    * Allocate a buffer which the user should release immediately. The reader does not manage release of these buffers.
    */
-  private DrillBuf allocateTemporaryBuffer(int size) {
+  protected DrillBuf allocateTemporaryBuffer(int size) {
     return parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
   }
 
-  /**
-   * Allocate and return a dictionary buffer. These are maintained for the life of the reader and then released when the
-   * reader is cleared.
-   */
-  private DrillBuf allocateDictionaryBuffer(int size) {
-    DrillBuf buf = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
-    allocatedDictionaryBuffers.add(buf);
-    return buf;
-  }
-
   protected boolean hasPage() {
     return currentPageCount != -1;
   }
 
-  private void updateStats(PageHeader pageHeader, String op, long start, long time, long bytesin, long bytesout) {
+  protected void updateStats(PageHeader pageHeader, String op, long start, long time, long bytesin, long bytesout) {
     String pageType = "Data Page";
     if (pageHeader.type == PageType.DICTIONARY_PAGE) {
       pageType = "Dictionary Page";
@@ -362,37 +354,38 @@ final class PageReader {
     logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}", op, pageType.toString(),
         this.parentColumnReader.parentReader.hadoopPath,
         this.parentColumnReader.columnDescriptor.toString(), start, bytesin, bytesout, time);
+
     if (pageHeader.type != PageType.DICTIONARY_PAGE) {
       if (bytesin == bytesout) {
-        this.stats.timePageLoads += time;
-        this.stats.numPageLoads++;
-        this.stats.totalPageReadBytes += bytesin;
+        this.stats.timeDataPageLoads.addAndGet(time);
+        this.stats.numDataPageLoads.incrementAndGet();
+        this.stats.totalDataPageReadBytes.addAndGet(bytesin);
       } else {
-        this.stats.timePagesDecompressed += time;
-        this.stats.numPagesDecompressed++;
-        this.stats.totalDecompressedBytes += bytesin;
+        this.stats.timeDataPagesDecompressed.addAndGet(time);
+        this.stats.numDataPagesDecompressed.incrementAndGet();
+        this.stats.totalDataDecompressedBytes.addAndGet(bytesin);
       }
     } else {
       if (bytesin == bytesout) {
-        this.stats.timeDictPageLoads += time;
-        this.stats.numDictPageLoads++;
-        this.stats.totalDictPageReadBytes += bytesin;
+        this.stats.timeDictPageLoads.addAndGet(time);
+        this.stats.numDictPageLoads.incrementAndGet();
+        this.stats.totalDictPageReadBytes.addAndGet(bytesin);
       } else {
-        this.stats.timeDictPagesDecompressed += time;
-        this.stats.numDictPagesDecompressed++;
-        this.stats.totalDictDecompressedBytes += bytesin;
+        this.stats.timeDictPagesDecompressed.addAndGet(time);
+        this.stats.numDictPagesDecompressed.incrementAndGet();
+        this.stats.totalDictDecompressedBytes.addAndGet(bytesin);
       }
     }
   }
 
-  public void clearBuffers() {
+  protected void clearBuffers() {
     if (pageData != null) {
       pageData.release();
       pageData = null;
     }
   }
 
-  public void clearDictionaryBuffers() {
+  protected void clearDictionaryBuffers() {
     for (ByteBuf b : allocatedDictionaryBuffers) {
       b.release();
     }
@@ -401,15 +394,14 @@ final class PageReader {
 
   public void clear(){
     try {
+      this.inputStream.close();
       this.dataReader.close();
     } catch (IOException e) {
-      //TODO: Throw UserException
+      //Swallow the exception which is OK for input streams
     }
     // Free all memory, including fixed length types. (Data is being copied for all types not just var length types)
-    //if(!this.parentColumnReader.isFixedLength) {
     clearBuffers();
     clearDictionaryBuffers();
-    //}
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 924887e..1eca00f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.MetricDef;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
@@ -117,16 +118,39 @@ public class ParquetRecordReader extends AbstractRecordReader {
 
   public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
 
+  public enum Metric implements MetricDef {
+    NUM_DICT_PAGE_LOADS,         // Number of dictionary pages read
+    NUM_DATA_PAGE_lOADS,         // Number of data pages read
+    NUM_DATA_PAGES_DECODED,      // Number of data pages decoded
+    NUM_DICT_PAGES_DECOMPRESSED, // Number of dictionary pages decompressed
+    NUM_DATA_PAGES_DECOMPRESSED, // Number of data pages decompressed
+    TOTAL_DICT_PAGE_READ_BYTES,  // Total bytes read from disk for dictionary pages
+    TOTAL_DATA_PAGE_READ_BYTES,  // Total bytes read from disk for data pages
+    TOTAL_DICT_DECOMPRESSED_BYTES, // Total bytes decompressed for dictionary pages (same as compressed bytes on disk)
+    TOTAL_DATA_DECOMPRESSED_BYTES, // Total bytes decompressed for data pages (same as compressed bytes on disk)
+    TIME_DICT_PAGE_LOADS,          // Time in nanos in reading dictionary pages from disk
+    TIME_DATA_PAGE_LOADS,          // Time in nanos in reading data pages from disk
+    TIME_DATA_PAGE_DECODE,         // Time in nanos in decoding data pages
+    TIME_DICT_PAGE_DECODE,         // Time in nanos in decoding dictionary pages
+    TIME_DICT_PAGES_DECOMPRESSED,  // Time in nanos in decompressing dictionary pages
+    TIME_DATA_PAGES_DECOMPRESSED,  // Time in nanos in decompressing data pages
+    TIME_DISK_SCAN_WAIT,           // Time in nanos spent in waiting for an async disk read to complete
+    TIME_DISK_SCAN;                // Time in nanos spent in reading data from disk.
+
+    @Override public int metricId() {
+      return ordinal();
+    }
+  }
+
   public ParquetRecordReader(FragmentContext fragmentContext,
       String path,
       int rowGroupIndex,
-                             long numRecordsToRead,
+      long numRecordsToRead,
       FileSystem fs,
       CodecFactory codecFactory,
       ParquetMetadata footer,
-                             List<SchemaPath> columns,
-                             ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus)
-                             throws ExecutionSetupException {
+      List<SchemaPath> columns,
+      ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException {
     this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, numRecordsToRead,
          path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
   }
@@ -470,6 +494,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
       // No columns found in the file were selected, simply return a full batch of null records for each column requested
       if (firstColumnStatus == null) {
         if (mockRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount()) {
+          updateStats();
           return 0;
         }
         recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead);
@@ -483,6 +508,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
         mockRecordsRead += recordsToRead;
         totalRecordsRead += recordsToRead;
         numRecordsToRead -= recordsToRead;
+        updateStats();
         return (int) recordsToRead;
       }
 
@@ -514,6 +540,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
 //      logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
       totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass();
       numRecordsToRead -= firstColumnStatus.getRecordsReadInCurrentPass();
+      updateStats();
       return firstColumnStatus.getRecordsReadInCurrentPass();
     } catch (Exception e) {
       handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() +
@@ -530,7 +557,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
 
   @Override
   public void close() {
-    logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath());
+    logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex,
+        hadoopPath.toUri().getPath());
     // enable this for debugging when it is know that a whole file will be read
     // limit kills upstream operators once it has enough records, so this assert will fail
 //    assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount();
@@ -552,29 +580,67 @@ public class ParquetRecordReader extends AbstractRecordReader {
       varLengthReader = null;
     }
 
+
     if(parquetReaderStats != null) {
-      logger.trace("ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
+      logger.trace("ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}",
           hadoopPath,
-          parquetReaderStats.numDictPageHeaders,
-          parquetReaderStats.numPageHeaders,
           parquetReaderStats.numDictPageLoads,
-          parquetReaderStats.numPageLoads,
+          parquetReaderStats.numDataPageLoads,
+          parquetReaderStats.numDataPagesDecoded,
           parquetReaderStats.numDictPagesDecompressed,
-          parquetReaderStats.numPagesDecompressed,
-          parquetReaderStats.totalDictPageHeaderBytes,
-          parquetReaderStats.totalPageHeaderBytes,
+          parquetReaderStats.numDataPagesDecompressed,
           parquetReaderStats.totalDictPageReadBytes,
-          parquetReaderStats.totalPageReadBytes,
+          parquetReaderStats.totalDataPageReadBytes,
           parquetReaderStats.totalDictDecompressedBytes,
-          parquetReaderStats.totalDecompressedBytes,
-          parquetReaderStats.timeDictPageHeaders,
-          parquetReaderStats.timePageHeaders,
+          parquetReaderStats.totalDataDecompressedBytes,
           parquetReaderStats.timeDictPageLoads,
-          parquetReaderStats.timePageLoads,
+          parquetReaderStats.timeDataPageLoads,
+          parquetReaderStats.timeDataPageDecode,
+          parquetReaderStats.timeDictPageDecode,
           parquetReaderStats.timeDictPagesDecompressed,
-          parquetReaderStats.timePagesDecompressed);
+          parquetReaderStats.timeDataPagesDecompressed,
+          parquetReaderStats.timeDiskScanWait,
+          parquetReaderStats.timeDiskScan
+      );
       parquetReaderStats=null;
     }
+
+  }
+
+  private void updateStats(){
+
+    operatorContext.getStats().setLongStat(Metric.NUM_DICT_PAGE_LOADS,
+        parquetReaderStats.numDictPageLoads.longValue());
+    operatorContext.getStats().setLongStat(Metric.NUM_DATA_PAGE_lOADS, parquetReaderStats.numDataPageLoads.longValue());
+    operatorContext.getStats().setLongStat(Metric.NUM_DATA_PAGES_DECODED, parquetReaderStats.numDataPagesDecoded.longValue());
+    operatorContext.getStats().setLongStat(Metric.NUM_DICT_PAGES_DECOMPRESSED,
+        parquetReaderStats.numDictPagesDecompressed.longValue());
+    operatorContext.getStats().setLongStat(Metric.NUM_DATA_PAGES_DECOMPRESSED,
+        parquetReaderStats.numDataPagesDecompressed.longValue());
+    operatorContext.getStats().setLongStat(Metric.TOTAL_DICT_PAGE_READ_BYTES,
+        parquetReaderStats.totalDictPageReadBytes.longValue());
+    operatorContext.getStats().setLongStat(Metric.TOTAL_DATA_PAGE_READ_BYTES,
+        parquetReaderStats.totalDataPageReadBytes.longValue());
+    operatorContext.getStats().setLongStat(Metric.TOTAL_DICT_DECOMPRESSED_BYTES,
+        parquetReaderStats.totalDictDecompressedBytes.longValue());
+    operatorContext.getStats().setLongStat(Metric.TOTAL_DATA_DECOMPRESSED_BYTES,
+        parquetReaderStats.totalDataDecompressedBytes.longValue());
+    operatorContext.getStats().setLongStat(Metric.TIME_DICT_PAGE_LOADS,
+        parquetReaderStats.timeDictPageLoads.longValue());
+    operatorContext.getStats().setLongStat(Metric.TIME_DATA_PAGE_LOADS,
+        parquetReaderStats.timeDataPageLoads.longValue());
+    operatorContext.getStats().setLongStat(Metric.TIME_DATA_PAGE_DECODE,
+        parquetReaderStats.timeDataPageDecode.longValue());
+    operatorContext.getStats().setLongStat(Metric.TIME_DICT_PAGE_DECODE,
+        parquetReaderStats.timeDictPageDecode.longValue());
+    operatorContext.getStats().setLongStat(Metric.TIME_DICT_PAGES_DECOMPRESSED,
+        parquetReaderStats.timeDictPagesDecompressed.longValue());
+    operatorContext.getStats().setLongStat(Metric.TIME_DATA_PAGES_DECOMPRESSED,
+        parquetReaderStats.timeDataPagesDecompressed.longValue());
+    operatorContext.getStats().setLongStat(Metric.TIME_DISK_SCAN_WAIT,
+        parquetReaderStats.timeDiskScanWait.longValue());
+    operatorContext.getStats().setLongStat(Metric.TIME_DISK_SCAN, parquetReaderStats.timeDiskScan.longValue());
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index 6ca0205..e03d930 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -17,15 +17,17 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import org.apache.drill.exec.vector.ValueVector;
+
 import java.io.IOException;
 import java.util.List;
 
 public class VarLenBinaryReader {
 
   ParquetRecordReader parentReader;
-  final List<VarLengthColumn<?>> columns;
+  final List<VarLengthColumn<? extends ValueVector>> columns;
 
-  public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn<?>> columns) {
+  public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn<? extends ValueVector>> columns) {
     this.parentReader = parentReader;
     this.columns = columns;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
index 6aa968a..a5a6b81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
@@ -23,7 +23,6 @@ import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -73,7 +72,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
   /**
    * The current read position in the buffer; the index of the next
    * character to be read from the <code>internalBuffer</code> array.
-   * <p>
+   * <p/>
    * This value is always in the range <code>[0,count]</code>.
    * If <code>curPosInBuffer</code> is equal to <code>count></code> then we have read
    * all the buffered data and the next read (or skip) will require more data to be read
@@ -128,8 +127,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
 
   }
 
-  @Override
-  public void init() throws UnsupportedOperationException, IOException {
+  @Override public void init() throws UnsupportedOperationException, IOException {
     super.init();
     this.internalBuffer = this.allocator.buffer(this.bufSize);
     this.tempBuffer = this.allocator.buffer(defaultTempBufferSize);
@@ -180,10 +178,10 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
         this.curPosInStream = getInputStream().getPos();
         bytesRead = nBytes;
         logger.trace(
-            "Stream: {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, " +
-            "CurPosInStream: {}, CurPosInBuffer: {}",
-            this.streamId, this.startOffset, this.totalByteSize, this.bufSize, bytesRead, this.count,
-            this.curPosInStream, this.curPosInBuffer);
+            "Stream: {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, "
+                + "CurPosInStream: {}, CurPosInBuffer: {}", this.streamId, this.startOffset,
+            this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream,
+            this.curPosInBuffer);
       }
     }
     return this.count - this.curPosInBuffer;
@@ -252,8 +250,8 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
   }
 
   /**
-   Has the same contract as {@link java.io.InputStream#read(byte[], int, int)}
-   Except with DrillBuf
+   * Has the same contract as {@link java.io.InputStream#read(byte[], int, int)}
+   * Except with DrillBuf
    */
   public synchronized int read(DrillBuf buf, int off, int len) throws IOException {
     checkInputStreamState();
@@ -296,7 +294,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
       return 0;
     }
     DrillBuf byteBuf;
-    if(len <= defaultTempBufferSize){
+    if (len <= defaultTempBufferSize) {
       byteBuf = tempBuffer;
     } else {
       byteBuf = this.allocator.buffer(len);
@@ -318,7 +316,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
       }
     } while (bytesRead < len);
 
-    if(len > defaultTempBufferSize){
+    if (len > defaultTempBufferSize) {
       byteBuf.release();
     }
 
@@ -327,12 +325,11 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
 
 
   /**
-   Has the same contract as {@link java.io.InputStream#skip(long)}
+   * Has the same contract as {@link java.io.InputStream#skip(long)}
    * Skips upto the next n bytes.
    * Skip may return with less than n bytes skipped
    */
-  @Override
-  public synchronized long skip(long n) throws IOException {
+  @Override public synchronized long skip(long n) throws IOException {
     checkInputStreamState();
     long bytesAvailable = this.count - this.curPosInBuffer;
     long bytesSkipped = 0;
@@ -353,8 +350,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
   }
 
 
-  @Override
-  public synchronized int available() throws IOException {
+  @Override public synchronized int available() throws IOException {
     checkInputStreamState();
     int bytesAvailable = this.count - this.curPosInBuffer;
     int underlyingAvailable = getInputStream().available();
@@ -365,18 +361,15 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
     return available;
   }
 
-  @Override
-  public synchronized void mark(int readlimit) {
+  @Override public synchronized void mark(int readlimit) {
     throw new UnsupportedOperationException("Mark/reset is not supported.");
   }
 
-  @Override
-  public synchronized void reset() throws IOException {
+  @Override public synchronized void reset() throws IOException {
     throw new UnsupportedOperationException("Mark/reset is not supported.");
   }
 
-  @Override
-  public boolean markSupported() {
+  @Override public boolean markSupported() {
     return false;
   }
 
@@ -384,7 +377,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
     Returns the current position from the beginning of the underlying input stream
    */
   public long getPos() throws IOException {
-    return curPosInBuffer+startOffset;
+    return curPosInBuffer + startOffset;
   }
 
   public boolean hasRemainder() throws IOException {
@@ -412,6 +405,11 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
     }
   }
 
+  /**
+   * Uncomment For testing Parquet files that are too big to use in unit tests
+   * @param args
+   */
+  /*
   public static void main(String[] args) {
     final DrillConfig config = DrillConfig.create();
     final BufferAllocator allocator = RootAllocatorFactory.newRoot(config);
@@ -433,8 +431,8 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
           long totalByteSize = columnMetadata.getTotalSize();
           String streamId = fileName + ":" + columnMetadata.toString();
           BufferedDirectBufInputStream reader =
-              new BufferedDirectBufInputStream(inputStream, allocator, streamId, startOffset,
-                  totalByteSize, BUFSZ, true);
+              new BufferedDirectBufInputStream(inputStream, allocator, streamId, startOffset, totalByteSize,
+                  BUFSZ, true);
           reader.init();
           while (true) {
             try {
@@ -457,4 +455,5 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
     allocator.close();
     return;
   }
+  */
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 60dcf15..deb31b3 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -194,6 +194,10 @@ drill.exec: {
   },
   debug: {
     return_error_for_failure_in_cancelled_fragments: false
+  },
+  scan: {
+    threadpool_size: 8,
+    decode_threadpool_size: 1
   }
   udf: {
     retry-attempts: 5,

http://git-wip-us.apache.org/repos/asf/drill/blob/f9a443d8/exec/java-exec/src/main/resources/rest/profile/profile.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/rest/profile/profile.ftl b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
index ef971fb..c0f2d8e 100644
--- a/exec/java-exec/src/main/resources/rest/profile/profile.ftl
+++ b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
@@ -221,7 +221,7 @@
             </h4>
           </div>
           <div id="${op.getId()}-metrics" class="panel-collapse collapse">
-            <div class="panel-body">
+            <div class="panel-body" style="display:block;overflow-x:auto">
               ${op.getMetricsTable()}
             </div>
           </div>