You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/11/05 00:11:30 UTC

[03/10] drill git commit: DRILL-4800: Parallelize column reading. Read/Decode fixed width fields in parallel Decoding var length columns in parallel Use simplified decompress method for Gzip and Snappy decompression. Avoids concurrency issue with Parquet

DRILL-4800: Parallelize column reading. Read/Decode fixed width fields in parallel Decoding var length columns in parallel Use simplified decompress method for Gzip and Snappy decompression. Avoids concurrency issue with Parquet decompression. (It's also faster). Stress test Parquet read write Parallel column reader is disabled by default (may perform less well under higher concurrency)


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7f5acf8f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7f5acf8f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7f5acf8f

Branch: refs/heads/master
Commit: 7f5acf8f06f4ab2a2efc9801d322b81436794004
Parents: f9a443d
Author: Parth Chandra <pa...@apache.org>
Authored: Wed Aug 24 10:46:37 2016 -0700
Committer: Parth Chandra <pa...@apache.org>
Committed: Wed Nov 2 18:00:55 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/drill/test/DrillTest.java   |   2 +-
 .../org/apache/drill/exec/ExecConstants.java    |   3 +
 .../server/options/SystemOptionManager.java     |   1 +
 .../parquet/columnreaders/AsyncPageReader.java  |  88 +++++++++++-
 .../parquet/columnreaders/ColumnReader.java     | 113 ++++++++++++++-
 .../columnreaders/FixedByteAlignedReader.java   |   2 +-
 .../store/parquet/columnreaders/PageReader.java |   1 +
 .../columnreaders/ParquetRecordReader.java      |  34 ++++-
 .../columnreaders/VarLenBinaryReader.java       | 140 +++++++++++++++++--
 .../physical/impl/writer/TestParquetWriter.java | 109 ++++++++++-----
 .../src/test/resources/supplier_snappy.parquet  | Bin 0 -> 10467 bytes
 11 files changed, 436 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7f5acf8f/common/src/test/java/org/apache/drill/test/DrillTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java b/common/src/test/java/org/apache/drill/test/DrillTest.java
index 18c2c1a..ccc297d 100644
--- a/common/src/test/java/org/apache/drill/test/DrillTest.java
+++ b/common/src/test/java/org/apache/drill/test/DrillTest.java
@@ -55,7 +55,7 @@ public class DrillTest {
   static MemWatcher memWatcher;
   static String className;
 
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(50000);
+  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(50000*10000);
   @Rule public final TestLogReporter logOutcome = LOG_OUTCOME;
 
   @Rule public final TestRule REPEAT_RULE = TestTools.getRepeatRule(false);

http://git-wip-us.apache.org/repos/asf/drill/blob/7f5acf8f/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index a13fd71..5f62781 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -154,6 +154,9 @@ public interface ExecConstants {
   String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async";
   OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC, true);
 
+  String PARQUET_COLUMNREADER_ASYNC = "store.parquet.reader.columnreader.async";
+  OptionValidator PARQUET_COLUMNREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_COLUMNREADER_ASYNC, false);
+
   // Use a buffering reader for parquet page reader
   String PARQUET_PAGEREADER_USE_BUFFERED_READ = "store.parquet.reader.pagereader.bufferedread";
   OptionValidator PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR = new  BooleanValidator(PARQUET_PAGEREADER_USE_BUFFERED_READ, true);

http://git-wip-us.apache.org/repos/asf/drill/blob/7f5acf8f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 1981d24..d803fa3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -101,6 +101,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
       ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
       ExecConstants.PARQUET_PAGEREADER_ASYNC_VALIDATOR,
+      ExecConstants.PARQUET_COLUMNREADER_ASYNC_VALIDATOR,
       ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR,
       ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR,
       ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/7f5acf8f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index 3f47f04..b2bdef3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -19,8 +19,14 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 
 import com.google.common.base.Stopwatch;
 import io.netty.buffer.DrillBuf;
+import io.netty.buffer.ByteBufUtil;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.codec.SnappyCodec;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.drill.exec.util.filereader.DirectBufInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,8 +36,10 @@ import org.apache.parquet.format.PageHeader;
 import org.apache.parquet.format.PageType;
 import org.apache.parquet.format.Util;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.xerial.snappy.Snappy;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -152,10 +160,24 @@ class AsyncPageReader extends PageReader {
     pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
     try {
       timer.start();
-      codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec())
-          .decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
-              pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize);
-      timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
+      if (logger.isTraceEnabled()) {
+        logger.trace("Decompress (1)==> Col: {}  readPos: {}  compressed_size: {}  compressedPageData: {}",
+            parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(),
+            pageHeader.getCompressed_page_size(), ByteBufUtil.hexDump(compressedData));
+      }
+      CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
+      ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
+      ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
+      DecompressionHelper decompressionHelper = new DecompressionHelper(codecName);
+      decompressionHelper.decompress(input, compressedSize, output, uncompressedSize);
+      pageDataBuf.writerIndex(uncompressedSize);
+      if (logger.isTraceEnabled()) {
+        logger.trace(
+            "Decompress (2)==> Col: {}  readPos: {}  uncompressed_size: {}  uncompressedPageData: {}",
+            parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(),
+            pageHeader.getUncompressed_page_size(), ByteBufUtil.hexDump(pageDataBuf));
+      }
+      timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
       this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize);
     } catch (IOException e) {
       handleAndThrowException(e, "Error decompressing data.");
@@ -204,6 +226,13 @@ class AsyncPageReader extends PageReader {
 
     pageHeader = readStatus.getPageHeader();
     pageData = getDecompressedPageData(readStatus);
+    if (logger.isTraceEnabled()) {
+      logger.trace("AsyncPageReader: Col: {}  pageData: {}",
+          this.parentColumnReader.columnChunkMetaData.toString(), ByteBufUtil.hexDump(pageData));
+      logger.trace("AsyncPageReaderTask==> Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}",
+          parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(),
+          pageHeader.getUncompressed_page_size(), ByteBufUtil.hexDump(pageData));
+    }
 
   }
 
@@ -324,9 +353,60 @@ class AsyncPageReader extends PageReader {
         throw e;
       }
       Thread.currentThread().setName(oldname);
+      if(logger.isTraceEnabled()) {
+        logger.trace("AsyncPageReaderTask==> Col: {}  readPos: {}  bytesRead: {}  pageData: {}", parent.parentColumnReader.columnChunkMetaData.toString(),
+            parent.dataReader.getPos(), bytesRead, ByteBufUtil.hexDump(pageData));
+      }
       return readStatus;
     }
 
   }
 
+  private class DecompressionHelper {
+    final CompressionCodecName codecName;
+
+    public DecompressionHelper(CompressionCodecName codecName){
+      this.codecName = codecName;
+    }
+
+    public void decompress (ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
+        throws IOException {
+      // GZip != thread_safe, so we go off and do our own thing.
+      // The hadoop interface does not support ByteBuffer so we incur some
+      // expensive copying.
+      if (codecName == CompressionCodecName.GZIP) {
+        GzipCodec codec = new GzipCodec();
+        DirectDecompressor directDecompressor = codec.createDirectDecompressor();
+        if (directDecompressor != null) {
+          logger.debug("Using GZIP direct decompressor.");
+          directDecompressor.decompress(input, output);
+        } else {
+          logger.debug("Using GZIP (in)direct decompressor.");
+          Decompressor decompressor = codec.createDecompressor();
+          decompressor.reset();
+          byte[] inputBytes = new byte[compressedSize];
+          input.position(0);
+          input.get(inputBytes);
+          decompressor.setInput(inputBytes, 0, inputBytes.length);
+          byte[] outputBytes = new byte[uncompressedSize];
+          decompressor.decompress(outputBytes, 0, uncompressedSize);
+          output.clear();
+          output.put(outputBytes);
+        }
+      } else if (codecName == CompressionCodecName.SNAPPY) {
+        // For Snappy, just call the Snappy decompressor directly.
+        // It is thread safe. The Hadoop layers though, appear to be
+        // not quite reliable in a multithreaded environment
+        output.clear();
+        int size = Snappy.uncompress(input, output);
+        output.limit(size);
+      } else {
+        CodecFactory.BytesDecompressor decompressor = codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec());
+        decompressor.decompress(input, compressedSize, output, uncompressedSize);
+      }
+    }
+
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7f5acf8f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index 6572c78..29e23bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -20,6 +20,9 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
@@ -77,6 +80,7 @@ public abstract class ColumnReader<V extends ValueVector> {
 
   // variables for a single read pass
   long readStartInBytes = 0, readLength = 0, readLengthInBits = 0, recordsReadInThisIteration = 0;
+  private ExecutorService threadPool;
 
   protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
       ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
@@ -105,13 +109,20 @@ public abstract class ColumnReader<V extends ValueVector> {
         dataTypeLengthInBits = ParquetRecordReader.getTypeLengthInBits(columnDescriptor.getType());
       }
     }
-
+    if(threadPool == null) {
+      threadPool = parentReader.getOperatorContext().getScanDecodeExecutor();
+    }
   }
 
   public int getRecordsReadInCurrentPass() {
     return valuesReadInCurrentPass;
   }
 
+  public Future<Long> processPagesAsync(long recordsToReadInThisPass){
+    Future<Long> r = threadPool.submit(new ColumnReaderProcessPagesTask(recordsToReadInThisPass));
+    return r;
+  }
+
   public void processPages(long recordsToReadInThisPass) throws IOException {
     reset();
     if(recordsToReadInThisPass>0) {
@@ -120,6 +131,8 @@ public abstract class ColumnReader<V extends ValueVector> {
 
       } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.hasPage());
     }
+    logger.trace("Column Reader: {} - Values read in this pass: {} - ",
+        this.getColumnDescriptor().toString(), valuesReadInCurrentPass);
     valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
   }
 
@@ -150,13 +163,22 @@ public abstract class ColumnReader<V extends ValueVector> {
 
   protected abstract void readField(long recordsToRead);
 
+  /*
+  public Future<Boolean> determineSizeAsync(long recordsReadInCurrentPass,
+      Integer lengthVarFieldsInCurrentRecord) throws IOException {
+    Future<Boolean> r = threadPool.submit(
+        new ColumnReaderDetermineSizeTask(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord));
+    return r;
+  }
+  */
+
   /**
    * Determines the size of a single value in a variable column.
    *
    * Return value indicates if we have finished a row group and should stop reading
    *
    * @param recordsReadInCurrentPass
-   * @param lengthVarFieldsInCurrentRecord
+   * @ param lengthVarFieldsInCurrentRecord
    * @return - true if we should stop reading
    * @throws IOException
    */
@@ -172,7 +194,8 @@ public abstract class ColumnReader<V extends ValueVector> {
       return true;
     }
 
-    lengthVarFieldsInCurrentRecord += dataTypeLengthInBits;
+    //lengthVarFieldsInCurrentRecord += dataTypeLengthInBits;
+    lengthVarFieldsInCurrentRecord = -1;
 
     doneReading = checkVectorCapacityReached();
     if (doneReading) {
@@ -182,6 +205,11 @@ public abstract class ColumnReader<V extends ValueVector> {
     return false;
   }
 
+  protected Future<Integer> readRecordsAsync(int recordsToRead){
+    Future<Integer> r = threadPool.submit(new ColumnReaderReadRecordsTask(recordsToRead));
+    return r;
+  }
+
   protected void readRecords(int recordsToRead) {
     for (int i = 0; i < recordsToRead; i++) {
       readField(i);
@@ -211,6 +239,15 @@ public abstract class ColumnReader<V extends ValueVector> {
     return (int) (valueVec.getValueCapacity() * dataTypeLengthInBits / 8.0);
   }
 
+  public Future<Boolean> readPageAsync() {
+    Future<Boolean> f = threadPool.submit(new Callable<Boolean>() {
+      @Override public Boolean call() throws Exception {
+        return new Boolean(readPage());
+      }
+    });
+    return f;
+  }
+
   // Read a page if we need more data, returns true if we need to exit the read loop
   public boolean readPage() throws IOException {
     if (!pageReader.hasPage()
@@ -258,4 +295,74 @@ public abstract class ColumnReader<V extends ValueVector> {
     return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
   }
 
+  private class ColumnReaderProcessPagesTask implements Callable<Long> {
+
+    private final ColumnReader parent = ColumnReader.this;
+    private final long recordsToReadInThisPass;
+
+    public ColumnReaderProcessPagesTask(long recordsToReadInThisPass){
+      this.recordsToReadInThisPass = recordsToReadInThisPass;
+    }
+
+    @Override public Long call() throws IOException{
+
+      String oldname = Thread.currentThread().getName();
+      Thread.currentThread().setName(oldname+"Decode-"+this.parent.columnChunkMetaData.toString());
+
+      this.parent.processPages(recordsToReadInThisPass);
+
+      Thread.currentThread().setName(oldname);
+      return recordsToReadInThisPass;
+    }
+
+  }
+
+  /*
+  private class ColumnReaderDetermineSizeTask implements Callable<Boolean> {
+
+    private final ColumnReader parent = ColumnReader.this;
+    private final long recordsReadInCurrentPass;
+    private final Integer lengthVarFieldsInCurrentRecord;
+
+    public ColumnReaderDetermineSizeTask(long recordsReadInCurrentPass, Integer lengthVarFieldsInCurrentRecord){
+      this.recordsReadInCurrentPass = recordsReadInCurrentPass;
+      this.lengthVarFieldsInCurrentRecord = lengthVarFieldsInCurrentRecord;
+    }
+
+    @Override public Boolean call() throws IOException{
+
+      String oldname = Thread.currentThread().getName();
+      Thread.currentThread().setName(oldname+"Decode-"+this.parent.columnChunkMetaData.toString());
+
+      boolean b = this.parent.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord);
+
+      Thread.currentThread().setName(oldname);
+      return b;
+    }
+
+  }
+  */
+
+  private class ColumnReaderReadRecordsTask implements Callable<Integer> {
+
+    private final ColumnReader parent = ColumnReader.this;
+    private final int recordsToRead;
+
+    public ColumnReaderReadRecordsTask(int recordsToRead){
+      this.recordsToRead = recordsToRead;
+    }
+
+    @Override public Integer call() throws IOException{
+
+      String oldname = Thread.currentThread().getName();
+      Thread.currentThread().setName("Decode-"+this.parent.columnChunkMetaData.toString());
+
+      this.parent.readRecords(recordsToRead);
+
+      Thread.currentThread().setName(oldname);
+      return recordsToRead;
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7f5acf8f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index f806ee4..0416a05 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -251,4 +251,4 @@ class FixedByteAlignedReader<V extends ValueVector> extends ColumnReader<V> {
       valueVec.getMutator().setSafe(index, bytebuf.getInt(start), bytebuf.getInt(start + 4), bytebuf.getInt(start + 8));
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7f5acf8f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index c34ebd1..0736f01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -206,6 +206,7 @@ class PageReader {
       codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData
           .getCodec()).decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
           pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize);
+        pageDataBuf.writerIndex(uncompressedSize);
         timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
         this.updateStats(pageHeader, "Decompress", start, timeToRead, compressedSize, uncompressedSize);
       } finally {

http://git-wip-us.apache.org/repos/asf/drill/blob/7f5acf8f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 1eca00f..4f0e3b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -23,6 +23,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import com.google.common.collect.ImmutableList;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -32,6 +34,7 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -468,12 +471,37 @@ public class ParquetRecordReader extends AbstractRecordReader {
   }
 
  public void readAllFixedFields(long recordsToRead) throws IOException {
-
-   for (ColumnReader<?> crs : columnStatuses) {
-     crs.processPages(recordsToRead);
+   boolean useAsyncColReader =
+       fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
+   if(useAsyncColReader){
+    readAllFixedFieldsParallel(recordsToRead) ;
+   } else {
+     readAllFixedFieldsiSerial(recordsToRead); ;
    }
  }
 
+  public void readAllFixedFieldsiSerial(long recordsToRead) throws IOException {
+    for (ColumnReader<?> crs : columnStatuses) {
+      crs.processPages(recordsToRead);
+    }
+  }
+
+  public void readAllFixedFieldsParallel(long recordsToRead) throws IOException {
+    ArrayList<Future<Long>> futures = Lists.newArrayList();
+    for (ColumnReader<?> crs : columnStatuses) {
+      Future<Long> f = crs.processPagesAsync(recordsToRead);
+      futures.add(f);
+    }
+    for(Future f: futures){
+      try {
+        f.get();
+      } catch (Exception e) {
+        f.cancel(true);
+        handleAndRaise(null, e);
+      }
+    }
+  }
+
   @Override
   public int next() {
     resetBatch();

http://git-wip-us.apache.org/repos/asf/drill/blob/7f5acf8f/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index e03d930..c78dc7a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -17,19 +17,27 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.vector.ValueVector;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Future;
 
 public class VarLenBinaryReader {
 
   ParquetRecordReader parentReader;
   final List<VarLengthColumn<? extends ValueVector>> columns;
+  final boolean useAsyncTasks;
 
   public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn<? extends ValueVector>> columns) {
     this.parentReader = parentReader;
     this.columns = columns;
+    useAsyncTasks = parentReader.getFragmentContext().getOptions()
+        .getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
   }
 
   /**
@@ -43,29 +51,92 @@ public class VarLenBinaryReader {
   public long readFields(long recordsToReadInThisPass, ColumnReader<?> firstColumnStatus) throws IOException {
 
     long recordsReadInCurrentPass = 0;
-    int lengthVarFieldsInCurrentRecord;
-    long totalVariableLengthData = 0;
-    boolean exitLengthDeterminingLoop = false;
+
     // write the first 0 offset
     for (VarLengthColumn<?> columnReader : columns) {
       columnReader.reset();
     }
 
+    //if(useAsyncTasks){
+    //  recordsReadInCurrentPass = determineSizesParallel(recordsToReadInThisPass);
+    //} else {
+      recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass);
+    //}
+    if(useAsyncTasks){
+      readRecordsParallel(recordsReadInCurrentPass);
+    }else{
+      readRecordsSerial(recordsReadInCurrentPass);
+    }
+    return recordsReadInCurrentPass;
+  }
+
+
+  private long determineSizesSerial(long recordsToReadInThisPass) throws IOException {
+    int lengthVarFieldsInCurrentRecord = 0;
+    boolean exitLengthDeterminingLoop = false;
+    long totalVariableLengthData = 0;
+    long recordsReadInCurrentPass = 0;
     do {
-      lengthVarFieldsInCurrentRecord = 0;
       for (VarLengthColumn<?> columnReader : columns) {
-        if ( !exitLengthDeterminingLoop ) {
-          exitLengthDeterminingLoop = columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord);
+        if (!exitLengthDeterminingLoop) {
+          exitLengthDeterminingLoop =
+              columnReader.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord);
         } else {
           break;
         }
       }
       // check that the next record will fit in the batch
-      if (exitLengthDeterminingLoop || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + totalVariableLengthData
-          + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) {
+      if (exitLengthDeterminingLoop ||
+          (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields()
+              + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) {
+        break;
+      }
+      for (VarLengthColumn<?> columnReader : columns) {
+        columnReader.updateReadyToReadPosition();
+        columnReader.currDefLevel = -1;
+      }
+      recordsReadInCurrentPass++;
+      totalVariableLengthData += lengthVarFieldsInCurrentRecord;
+    } while (recordsReadInCurrentPass < recordsToReadInThisPass);
+
+    return recordsReadInCurrentPass;
+  }
+
+
+  public long determineSizesParallel(long recordsToReadInThisPass ) throws IOException {
+    boolean doneReading = false;
+    int lengthVarFieldsInCurrentRecord = 0;
+    boolean exitLengthDeterminingLoop = false;
+    long totalVariableLengthData = 0;
+    long recordsReadInCurrentPass = 0;
+
+    do {
+    doneReading = readPagesParallel();
+
+    if (!doneReading) {
+      lengthVarFieldsInCurrentRecord = 0;
+      for (VarLengthColumn<?> columnReader : columns) {
+        doneReading = columnReader.processPageData((int) recordsReadInCurrentPass);
+        if(doneReading) {
+          break;
+        }
+        lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
+        doneReading = columnReader.checkVectorCapacityReached();
+        if(doneReading) {
+          break;
+        }
+      }
+    }
+
+    exitLengthDeterminingLoop = doneReading;
+
+      // check that the next record will fit in the batch
+      if (exitLengthDeterminingLoop ||
+          (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields()
+              + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) {
         break;
       }
-      for (VarLengthColumn<?> columnReader : columns ) {
+      for (VarLengthColumn<?> columnReader : columns) {
         columnReader.updateReadyToReadPosition();
         columnReader.currDefLevel = -1;
       }
@@ -73,13 +144,60 @@ public class VarLenBinaryReader {
       totalVariableLengthData += lengthVarFieldsInCurrentRecord;
     } while (recordsReadInCurrentPass < recordsToReadInThisPass);
 
+    return recordsReadInCurrentPass;
+  }
+
+  public boolean readPagesParallel() {
+
+    boolean isDone = false;
+    ArrayList<Future<Boolean>> futures = Lists.newArrayList();
+    for (VarLengthColumn<?> columnReader : columns) {
+      Future<Boolean> f = columnReader.readPageAsync();
+      futures.add(f);
+    }
+    for (Future<Boolean> f : futures) {
+      try {
+        isDone = isDone || f.get().booleanValue();
+      } catch (Exception e) {
+        f.cancel(true);
+        handleAndRaise(null, e);
+      }
+    }
+    return isDone;
+  }
+
+
+  private void readRecordsSerial(long recordsReadInCurrentPass) {
     for (VarLengthColumn<?> columnReader : columns) {
       columnReader.readRecords(columnReader.pageReader.valuesReadyToRead);
     }
     for (VarLengthColumn<?> columnReader : columns) {
-      columnReader.valueVec.getMutator().setValueCount((int) recordsReadInCurrentPass);
+      columnReader.valueVec.getMutator().setValueCount((int)recordsReadInCurrentPass);
     }
-    return recordsReadInCurrentPass;
+  }
+
+  private void readRecordsParallel(long recordsReadInCurrentPass){
+    ArrayList<Future<Integer>> futures = Lists.newArrayList();
+    for (VarLengthColumn<?> columnReader : columns) {
+      Future<Integer> f = columnReader.readRecordsAsync(columnReader.pageReader.valuesReadyToRead);
+      futures.add(f);
+    }
+    for (Future f : futures) {
+      try {
+        f.get();
+      } catch (Exception e) {
+        f.cancel(true);
+        handleAndRaise(null, e);
+      }
+    }
+    for (VarLengthColumn<?> columnReader : columns) {
+      columnReader.valueVec.getMutator().setValueCount((int)recordsReadInCurrentPass);
+    }
+  }
+
+  protected void handleAndRaise(String s, Exception e) {
+    String message = "Error in parquet record reader.\nMessage: " + s;
+    throw new DrillRuntimeException(message, e);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7f5acf8f/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 4556b4e..56b94d7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -25,7 +25,9 @@ import java.io.File;
 import java.io.FileWriter;
 import java.math.BigDecimal;
 import java.sql.Date;
+import java.util.Arrays;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -38,13 +40,12 @@ import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.fn.interp.TestConstantFolding;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.store.parquet.ParquetRecordWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
+import org.apache.log4j.Level;
+import org.apache.parquet.Log;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.joda.time.DateTime;
@@ -55,10 +56,19 @@ import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class TestParquetWriter extends BaseTestQuery {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestParquetWriter.class);
 
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] { {100} });
+  }
+
   @Rule
   public TemporaryFolder folder = new TemporaryFolder();
   static FileSystem fs;
@@ -106,6 +116,9 @@ public class TestParquetWriter extends BaseTestQuery {
 
   private String allTypesTable = "cp.`/parquet/alltypes.json`";
 
+  @Parameterized.Parameter
+  public int repeat = 1;
+
   @BeforeClass
   public static void initFs() throws Exception {
     Configuration conf = new Configuration();
@@ -360,7 +373,7 @@ public class TestParquetWriter extends BaseTestQuery {
     runTestAndValidate("*", "*", inputTable, "nullable_test");
   }
 
-  @Ignore("Binary file too large for version control, TODO - make available on S3 bucket or similar service")
+  @Ignore("Test file not available")
   @Test
   public void testBitError_Drill_2031() throws Exception {
     compareParquetReadersHyperVector("*", "dfs.`/tmp/wide2/0_0_3.parquet`");
@@ -372,8 +385,7 @@ public class TestParquetWriter extends BaseTestQuery {
         "cast(salary as decimal(24,2)) as decimal24, cast(salary as decimal(38,2)) as decimal38";
     String validateSelection = "decimal8, decimal15, decimal24, decimal38";
     String inputTable = "cp.`employee.json`";
-    runTestAndValidate(selection, validateSelection, inputTable,
-        "parquet_decimal");
+    runTestAndValidate(selection, validateSelection, inputTable, "parquet_decimal");
   }
 
   @Test
@@ -440,17 +452,13 @@ public class TestParquetWriter extends BaseTestQuery {
       testBuilder()
         .ordered()
         .sqlQuery(query)
-        .optionSettingQueriesForTestQuery(
-            "alter system set `store.parquet.use_new_reader` = false")
+        .optionSettingQueriesForTestQuery("alter system set `store.parquet.use_new_reader` = false")
         .sqlBaselineQuery(query)
-        .optionSettingQueriesForBaseline(
-            "alter system set `store.parquet.use_new_reader` = true")
+        .optionSettingQueriesForBaseline("alter system set `store.parquet.use_new_reader` = true")
         .build().run();
     } finally {
-      test("alter system set `%s` = %b",
-          ExecConstants.PARQUET_NEW_RECORD_READER,
-          ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR
-              .getDefault().bool_val);
+      test("alter system set `%s` = %b", ExecConstants.PARQUET_NEW_RECORD_READER,
+          ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR.getDefault().bool_val);
     }
   }
 
@@ -476,47 +484,44 @@ public class TestParquetWriter extends BaseTestQuery {
     }
   }
 
-  @Ignore
+  @Ignore("Binary file too large for version control")
   @Test
   public void testReadVoter() throws Exception {
     compareParquetReadersHyperVector("*", "dfs.`/tmp/voter.parquet`");
   }
 
-  @Ignore
+  @Ignore("Test file not available")
   @Test
   public void testReadSf_100_supplier() throws Exception {
     compareParquetReadersHyperVector("*", "dfs.`/tmp/sf100_supplier.parquet`");
   }
 
-  @Ignore
+  @Ignore("Binary file too large for version control")
   @Test
   public void testParquetRead_checkNulls_NullsFirst() throws Exception {
     compareParquetReadersColumnar("*",
         "dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`");
   }
 
-  @Ignore
+  @Ignore("Test file not available")
   @Test
   public void testParquetRead_checkNulls() throws Exception {
-    compareParquetReadersColumnar("*",
-        "dfs.`/tmp/parquet_with_nulls_should_sum_100000.parquet`");
+    compareParquetReadersColumnar("*", "dfs.`/tmp/parquet_with_nulls_should_sum_100000.parquet`");
   }
 
-  @Ignore
+  @Ignore("Binary file too large for version control")
   @Test
   public void test958_sql() throws Exception {
-    compareParquetReadersHyperVector("ss_ext_sales_price",
-        "dfs.`/tmp/store_sales`");
+    compareParquetReadersHyperVector("ss_ext_sales_price", "dfs.`/tmp/store_sales`");
   }
 
-  @Ignore
+  @Ignore("Binary file too large for version control")
   @Test
   public void testReadSf_1_supplier() throws Exception {
-    compareParquetReadersHyperVector("*",
-        "dfs.`/tmp/orders_part-m-00001.parquet`");
+    compareParquetReadersHyperVector("*", "dfs.`/tmp/orders_part-m-00001.parquet`");
   }
 
-  @Ignore
+  @Ignore("Binary file too large for version control")
   @Test
   public void test958_sql_all_columns() throws Exception {
     compareParquetReadersHyperVector("*", "dfs.`/tmp/store_sales`");
@@ -527,13 +532,13 @@ public class TestParquetWriter extends BaseTestQuery {
 //        "dfs.`/tmp/store_sales`");
   }
 
-  @Ignore
+  @Ignore("Binary file too large for version control")
   @Test
   public void testDrill_1314() throws Exception {
     compareParquetReadersColumnar("l_partkey ", "dfs.`/tmp/drill_1314.parquet`");
   }
 
-  @Ignore
+  @Ignore("Binary file too large for version control")
   @Test
   public void testDrill_1314_all_columns() throws Exception {
     compareParquetReadersHyperVector("*", "dfs.`/tmp/drill_1314.parquet`");
@@ -542,19 +547,19 @@ public class TestParquetWriter extends BaseTestQuery {
         "dfs.`/tmp/drill_1314.parquet`");
   }
 
-  @Ignore
+  @Ignore("Test file not available")
   @Test
   public void testParquetRead_checkShortNullLists() throws Exception {
     compareParquetReadersColumnar("*", "dfs.`/tmp/short_null_lists.parquet`");
   }
 
-  @Ignore
+  @Ignore("Test file not available")
   @Test
   public void testParquetRead_checkStartWithNull() throws Exception {
     compareParquetReadersColumnar("*", "dfs.`/tmp/start_with_null.parquet`");
   }
 
-  @Ignore
+  @Ignore("Binary file too large for version control")
   @Test
   public void testParquetReadWebReturns() throws Exception {
     compareParquetReadersColumnar("wr_returning_customer_sk", "dfs.`/tmp/web_returns`");
@@ -753,7 +758,7 @@ public class TestParquetWriter extends BaseTestQuery {
       compareParquetReadersColumnar("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`");
     } finally {
       test("alter session reset %s", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
-    }
+  }
   }
 
   /*
@@ -823,7 +828,8 @@ public class TestParquetWriter extends BaseTestQuery {
   */
   @Test
   public void testHiveParquetTimestampAsInt96_compare() throws Exception {
-    compareParquetReadersColumnar("convert_from(timestamp_field, 'TIMESTAMP_IMPALA')", "cp.`parquet/part1/hive_all_types.parquet`");
+    compareParquetReadersColumnar("convert_from(timestamp_field, 'TIMESTAMP_IMPALA')",
+        "cp.`parquet/part1/hive_all_types.parquet`");
   }
 
   /*
@@ -923,5 +929,40 @@ public class TestParquetWriter extends BaseTestQuery {
     }
   }
 
+  @Test
+  public void testTPCHReadWriteRunRepeated() throws Exception {
+    for (int i = 1; i <= repeat; i++) {
+      if(i%100 == 0) {
+        System.out.println("\n\n Iteration : "+i +"\n");
+      }
+      testTPCHReadWriteGzip();
+      testTPCHReadWriteSnappy();
+    }
+  }
+
+  @Test
+  public void testTPCHReadWriteGzip() throws Exception {
+    try {
+      test(String.format("alter session set `%s` = 'gzip'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE));
+      String inputTable = "cp.`tpch/supplier.parquet`";
+//      runTestAndValidate("s_suppkey, s_nationkey, s_acctbal", "s_suppkey, s_nationkey, s_acctbal", inputTable, "suppkey_parquet_dict_gzip");
+        runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_gzip");
+    } finally {
+      test(String.format("alter session set `%s` = '%s'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR.getDefault().string_val));
+    }
+  }
+
+  @Test
+  public void testTPCHReadWriteSnappy() throws Exception {
+    try {
+      test(String.format("alter session set `%s` = 'snappy'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE));
+      String inputTable = "cp.`supplier_snappy.parquet`";
+      //      runTestAndValidate("s_suppkey, s_nationkey, s_acctbal", "s_suppkey, s_nationkey, s_acctbal", inputTable, "suppkey_parquet_dict_gzip");
+      runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_snappy");
+    } finally {
+      test(String.format("alter session set `%s` = '%s'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR.getDefault().string_val));
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7f5acf8f/exec/java-exec/src/test/resources/supplier_snappy.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/supplier_snappy.parquet b/exec/java-exec/src/test/resources/supplier_snappy.parquet
new file mode 100644
index 0000000..5a01d9a
Binary files /dev/null and b/exec/java-exec/src/test/resources/supplier_snappy.parquet differ