You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/07 10:56:37 UTC

[1/7] drill git commit: DRILL-2959: Make sure to close out compression codecs and associated resources.

Repository: drill
Updated Branches:
  refs/heads/master 3ba374a53 -> 6b98db386


DRILL-2959: Make sure to close out compression codecs and associated resources.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/151a7f45
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/151a7f45
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/151a7f45

Branch: refs/heads/master
Commit: 151a7f4506e7ea16be335549b89090e857103b4e
Parents: 3ba374a
Author: Jacques Nadeau <ja...@apache.org>
Authored: Mon May 4 18:14:38 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 7 00:06:32 2015 -0700

----------------------------------------------------------------------
 .../exec/store/parquet/ColumnDataReader.java    |   2 +
 .../exec/store/parquet/DirectCodecFactory.java  | 378 +++++++++++++++++++
 .../exec/store/parquet/DirectCodecPool.java     | 187 +++++++++
 .../ParquetDirectByteBufferAllocator.java       |  17 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |   7 -
 .../exec/store/parquet/ParquetRecordWriter.java |  11 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   3 +-
 .../store/parquet/columnreaders/PageReader.java |  32 +-
 .../columnreaders/ParquetRecordReader.java      |  41 +-
 .../exec/store/parquet2/DrillParquetReader.java |  23 +-
 .../parquet/hadoop/CodecFactoryExposer.java     | 160 --------
 .../parquet/hadoop/ColumnChunkIncReadStore.java |   7 +-
 .../ColumnChunkPageWriteStoreExposer.java       |  14 +-
 .../exec/store/TestDirectCodecFactory.java      | 155 ++++++++
 .../store/parquet/ParquetRecordReaderTest.java  |   3 +-
 pom.xml                                         |   2 +-
 16 files changed, 802 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index 1663cd9..262c94c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -53,10 +53,12 @@ public class ColumnDataReader {
   }
 
   public void loadPage(DrillBuf target, int pageLength) throws IOException {
+    target.clear();
     ByteBuffer directBuffer = target.nioBuffer(0, pageLength);
     while (directBuffer.remaining() > 0) {
       CompatibilityUtil.getBuf(input, directBuffer, directBuffer.remaining());
     }
+    target.writerIndex(pageLength);
   }
 
   public void clear(){

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java
new file mode 100644
index 0000000..7abe05a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.xerial.snappy.Snappy;
+
+import parquet.bytes.ByteBufferAllocator;
+import parquet.bytes.BytesInput;
+import parquet.hadoop.CodecFactory;
+import parquet.hadoop.CodecFactory.BytesCompressor;
+import parquet.hadoop.HeapCodecFactory.HeapBytesCompressor;
+import parquet.hadoop.HeapCodecFactory.HeapBytesDecompressor;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import com.google.common.base.Preconditions;
+
+public class DirectCodecFactory extends CodecFactory<BytesCompressor, DirectBytesDecompressor> implements AutoCloseable {
+
+  private final ByteBufferAllocator allocator;
+
+  public DirectCodecFactory(Configuration config, ByteBufferAllocator allocator) {
+    super(config);
+    Preconditions.checkNotNull(allocator);
+    this.allocator = allocator;
+  }
+
+  public DirectCodecFactory(Configuration config, BufferAllocator allocator) {
+    this(config, new ParquetDirectByteBufferAllocator(allocator));
+  }
+
+  private ByteBuffer ensure(ByteBuffer buffer, int size) {
+    if (buffer == null) {
+      buffer = allocator.allocate(size);
+    } else if (buffer.capacity() >= size) {
+      buffer.clear();
+    } else {
+      allocator.release(buffer);
+      release(buffer);
+      buffer = allocator.allocate(size);
+    }
+    return buffer;
+  }
+
+  ByteBuffer release(ByteBuffer buffer) {
+    if (buffer != null) {
+      allocator.release(buffer);
+    }
+    return null;
+  }
+
+  @Override
+  protected BytesCompressor createCompressor(final CompressionCodecName codecName, final CompressionCodec codec,
+      int pageSize) {
+
+    if (codec == null) {
+      return new NoopCompressor();
+    } else if (codecName == CompressionCodecName.SNAPPY) {
+      // avoid using the Parquet Snappy codec since it allocates direct buffers at awkward spots.
+      return new SnappyCompressor();
+    } else {
+
+      // todo: move zlib above since it also generates allocateDirect calls.
+      return new HeapBytesCompressor(codecName, codec, pageSize);
+    }
+  }
+
+  @Override
+  protected DirectBytesDecompressor createDecompressor(final CompressionCodec codec) {
+    // This is here so that debugging can be done if we see inconsistencies between our decompression and upstream
+    // decompression.
+    // if (true) {
+    // return new HeapFakeDirect(codec);
+    // }
+
+    if (codec == null) {
+      return new NoopDecompressor();
+    } else if (DirectCodecPool.INSTANCE.codec(codec).supportsDirectDecompression()) {
+      return new FullDirectDecompressor(codec);
+    } else {
+      return new IndirectDecompressor(codec);
+    }
+  }
+
+  public void close() {
+    release();
+  }
+
+  /**
+   * Keeping this here for future debugging versus using custom implementations below.
+   */
+  private class HeapFakeDirect extends DirectBytesDecompressor {
+
+    private final ExposedHeapBytesDecompressor innerCompressor;
+
+    public HeapFakeDirect(CompressionCodec codec){
+      innerCompressor = new ExposedHeapBytesDecompressor(codec);
+    }
+
+    @Override
+    public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+        throws IOException {
+      BytesInput uncompressed = decompress(new ByteBufBytesInput(input), uncompressedSize);
+      output.clear();
+      output.setBytes(0, uncompressed.toByteArray());
+      output.writerIndex((int) uncompressed.size());
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput paramBytesInput, int uncompressedSize) throws IOException {
+      return innerCompressor.decompress(paramBytesInput, uncompressedSize);
+    }
+
+    @Override
+    protected void release() {
+      innerCompressor.release();
+    }
+
+  }
+
+  private class ExposedHeapBytesDecompressor extends HeapBytesDecompressor {
+    public ExposedHeapBytesDecompressor(CompressionCodec codec) {
+      super(codec);
+    }
+
+    public void release() {
+      super.release();
+    }
+  }
+
+  public class IndirectDecompressor extends DirectBytesDecompressor {
+    private final Decompressor decompressor;
+
+    public IndirectDecompressor(CompressionCodec codec) {
+      this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor();
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+      decompressor.reset();
+      byte[] inputBytes = bytes.toByteArray();
+      decompressor.setInput(inputBytes, 0, inputBytes.length);
+      byte[] output = new byte[uncompressedSize];
+      decompressor.decompress(output, 0, uncompressedSize);
+      return BytesInput.from(output);
+    }
+
+    @Override
+    public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+        throws IOException {
+
+      decompressor.reset();
+      byte[] inputBytes = new byte[input.capacity()];
+      input.getBytes(0, inputBytes);
+      decompressor.setInput(inputBytes, 0, inputBytes.length);
+      byte[] outputBytes = new byte[uncompressedSize];
+      decompressor.decompress(outputBytes, 0, uncompressedSize);
+      output.clear();
+      output.writeBytes(outputBytes);
+    }
+
+    @Override
+    protected void release() {
+      DirectCodecPool.INSTANCE.returnDecompressor(decompressor);
+    }
+  }
+
+  public class FullDirectDecompressor extends DirectBytesDecompressor {
+    private final DirectDecompressor decompressor;
+    private ByteBuffer compressedBuffer;
+    private ByteBuffer uncompressedBuffer;
+    private ExposedHeapBytesDecompressor extraDecompressor;
+    public FullDirectDecompressor(CompressionCodec codec){
+      this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDirectDecompressor();
+      this.extraDecompressor = new ExposedHeapBytesDecompressor(codec);
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput compressedBytes, int uncompressedSize) throws IOException {
+
+      if(false){
+        // TODO: fix direct path. (currently, this code is causing issues when writing complex Parquet files.
+        ByteBuffer bufferIn = compressedBytes.toByteBuffer();
+        uncompressedBuffer = ensure(uncompressedBuffer, uncompressedSize);
+        uncompressedBuffer.clear();
+
+        if (bufferIn.isDirect()) {
+          decompressor.decompress(bufferIn, uncompressedBuffer);
+        } else {
+          compressedBuffer = ensure(this.compressedBuffer, (int) compressedBytes.size());
+          compressedBuffer.clear();
+          compressedBuffer.put(bufferIn);
+          compressedBuffer.flip();
+          decompressor.decompress(compressedBuffer, uncompressedBuffer);
+        }
+        return BytesInput.from(uncompressedBuffer, 0, uncompressedSize);
+
+      } else {
+        return extraDecompressor.decompress(compressedBytes, uncompressedSize);
+      }
+
+
+    }
+
+
+    @Override
+    public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+        throws IOException {
+      output.clear();
+      decompressor.decompress(input.nioBuffer(0, compressedSize), output.nioBuffer(0, uncompressedSize));
+      output.writerIndex(uncompressedSize);
+    }
+
+    @Override
+    protected void release() {
+      compressedBuffer = DirectCodecFactory.this.release(compressedBuffer);
+      uncompressedBuffer = DirectCodecFactory.this.release(uncompressedBuffer);
+      DirectCodecPool.INSTANCE.returnDecompressor(decompressor);
+      extraDecompressor.release();
+    }
+
+  }
+
+  public class NoopDecompressor extends DirectBytesDecompressor {
+
+    @Override
+    public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+        throws IOException {
+      Preconditions.checkArgument(compressedSize == uncompressedSize,
+          "Non-compressed data did not have matching compressed and uncompressed sizes.");
+      output.clear();
+      output.writeBytes(input, compressedSize);
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+      return bytes;
+    }
+
+    @Override
+    protected void release() {
+    }
+
+  }
+
+  public class SnappyCompressor extends BytesCompressor {
+
+    private ByteBuffer incoming;
+    private ByteBuffer outgoing;
+
+    public SnappyCompressor() {
+      super();
+    }
+
+    @Override
+    public BytesInput compress(BytesInput bytes) throws IOException {
+      int maxOutputSize = Snappy.maxCompressedLength((int) bytes.size());
+      ByteBuffer bufferIn = bytes.toByteBuffer();
+      outgoing = ensure(outgoing, maxOutputSize);
+      final int size;
+      if (bufferIn.isDirect()) {
+        size = Snappy.compress(bufferIn, outgoing);
+      } else {
+        this.incoming = ensure(this.incoming, (int) bytes.size());
+        this.incoming.put(bufferIn);
+        this.incoming.flip();
+        size = Snappy.compress(this.incoming, outgoing);
+      }
+
+      return BytesInput.from(outgoing, 0, (int) size);
+    }
+
+    @Override
+    public CompressionCodecName getCodecName() {
+      return CompressionCodecName.SNAPPY;
+    }
+
+    @Override
+    protected void release() {
+      outgoing = DirectCodecFactory.this.release(outgoing);
+      incoming = DirectCodecFactory.this.release(incoming);
+    }
+
+  }
+
+  public static class NoopCompressor extends BytesCompressor {
+
+    @Override
+    public BytesInput compress(BytesInput bytes) throws IOException {
+      return bytes;
+    }
+
+    @Override
+    public CompressionCodecName getCodecName() {
+      return CompressionCodecName.UNCOMPRESSED;
+    }
+
+    @Override
+    protected void release() {
+    }
+
+  }
+
+  public static class ByteBufBytesInput extends BytesInput {
+    private final ByteBuf buf;
+    private final int length;
+
+    public ByteBufBytesInput(ByteBuf buf) {
+      this(buf, 0, buf.capacity());
+    }
+
+    public ByteBufBytesInput(ByteBuf buf, int offset, int length) {
+      super();
+      if(buf.capacity() == length && offset == 0){
+        this.buf = buf;
+      }else{
+        this.buf = buf.slice(offset, length);
+      }
+
+      this.length = length;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      final WritableByteChannel outputChannel = Channels.newChannel(out);
+      outputChannel.write(buf.nioBuffer());
+    }
+
+    @Override
+    public ByteBuffer toByteBuffer() throws IOException {
+      return buf.nioBuffer();
+    }
+
+    @Override
+    public long size() {
+      return length;
+    }
+  }
+
+
+  public abstract class DirectBytesDecompressor extends CodecFactory.BytesDecompressor {
+    public abstract void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+        throws IOException;
+  }
+
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java
new file mode 100644
index 0000000..26d97c9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.commons.pool.BasePoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressionCodec;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class DirectCodecPool {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectCodecPool.class);
+
+  public static final DirectCodecPool INSTANCE = new DirectCodecPool();
+
+  @SuppressWarnings("unchecked")
+  private final Map<CompressionCodec, CodecPool> codecs = (Map<CompressionCodec, CodecPool>) (Object) Collections.synchronizedMap(Maps.newHashMap());
+
+  @SuppressWarnings("unchecked")
+  private final Map<Class<?>, GenericObjectPool> directDePools = (Map<Class<?>, GenericObjectPool>) (Object) Collections
+      .synchronizedMap(Maps.newHashMap());
+  private final Map<Class<?>, GenericObjectPool> dePools = (Map<Class<?>, GenericObjectPool>) (Object) Collections
+      .synchronizedMap(Maps.newHashMap());
+  private final Map<Class<?>, GenericObjectPool> cPools = (Map<Class<?>, GenericObjectPool>) (Object) Collections
+      .synchronizedMap(Maps.newHashMap());
+
+  private DirectCodecPool() {
+  }
+
+  public class CodecPool {
+    private final GenericObjectPool compressorPool;
+    private final GenericObjectPool decompressorPool;
+    private final GenericObjectPool directDecompressorPool;
+    private final boolean supportDirectDecompressor;
+
+    private CodecPool(final CompressionCodec codec){
+      try {
+        boolean supportDirectDecompressor = codec instanceof DirectDecompressionCodec;
+        compressorPool = new GenericObjectPool(new BasePoolableObjectFactory() {
+          public Object makeObject() throws Exception {
+            return codec.createCompressor();
+          }
+        }, Integer.MAX_VALUE);
+
+        Object com = compressorPool.borrowObject();
+        if (com != null) {
+          cPools.put(com.getClass(), compressorPool);
+          compressorPool.returnObject(com);
+        }else{
+          logger.warn("Unable to find compressor for codec {}", codec.getClass().getName());
+        }
+
+        decompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() {
+          public Object makeObject() throws Exception {
+            return codec.createDecompressor();
+          }
+        }, Integer.MAX_VALUE);
+
+        Object decom = decompressorPool.borrowObject();
+        if (decom != null) {
+          dePools.put(decom.getClass(), decompressorPool);
+          decompressorPool.returnObject(decom);
+        } else {
+          logger.warn("Unable to find decompressor for codec {}", codec.getClass().getName());
+        }
+
+        if (supportDirectDecompressor) {
+          directDecompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() {
+            public Object makeObject() throws Exception {
+              return ((DirectDecompressionCodec) codec).createDirectDecompressor();
+            }
+          }, Integer.MAX_VALUE);
+
+          Object ddecom = directDecompressorPool.borrowObject();
+          if (ddecom != null) {
+            directDePools.put(ddecom.getClass(), directDecompressorPool);
+            directDecompressorPool.returnObject(ddecom);
+
+          } else {
+            supportDirectDecompressor = false;
+            logger.warn("Unable to find direct decompressor for codec {}", codec.getClass().getName());
+          }
+
+        } else {
+          directDecompressorPool = null;
+        }
+
+        this.supportDirectDecompressor = supportDirectDecompressor;
+      } catch (Exception e) {
+        throw new DrillRuntimeException(e);
+      }
+    }
+
+    public DirectDecompressor borrowDirectDecompressor(){
+      Preconditions.checkArgument(supportDirectDecompressor, "Tried to get a direct Decompressor from a non-direct codec.");
+      try {
+        return (DirectDecompressor) directDecompressorPool.borrowObject();
+      } catch (Exception e) {
+        throw new DrillRuntimeException(e);
+      }
+    }
+
+    public boolean supportsDirectDecompression() {
+      return supportDirectDecompressor;
+    }
+
+    public Decompressor borrowDecompressor(){
+      try {
+        return (Decompressor) decompressorPool.borrowObject();
+      } catch (Exception e) {
+        throw new DrillRuntimeException(e);
+      }
+    }
+
+    public Compressor borrowCompressor(){
+      try {
+        return (Compressor) compressorPool.borrowObject();
+      } catch (Exception e) {
+        throw new DrillRuntimeException(e);
+      }
+    }
+  }
+
+  public CodecPool codec(CompressionCodec codec){
+    CodecPool pools = codecs.get(codec);
+    if(pools == null){
+      synchronized(this){
+        pools = codecs.get(codec);
+        if(pools == null){
+          pools = new CodecPool(codec);
+          codecs.put(codec, pools);
+        }
+      }
+    }
+    return pools;
+  }
+
+  private void returnToPool(Object obj, Map<Class<?>, GenericObjectPool> pools) {
+    try {
+      GenericObjectPool pool = pools.get(obj.getClass());
+      if (pool == null) {
+        throw new IllegalStateException("Received unexpected decompressor.");
+      }
+      pool.returnObject(obj);
+    } catch (Exception e) {
+      throw new DrillRuntimeException(e);
+    }
+
+  }
+
+  public void returnCompressor(Compressor compressor) {
+    returnToPool(compressor, cPools);
+  }
+
+  public void returnDecompressor(Decompressor decompressor) {
+    returnToPool(decompressor, dePools);
+  }
+
+  public void returnDecompressor(DirectDecompressor decompressor) {
+    returnToPool(decompressor, directDePools);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
index 45a1dc6..cf30db6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
@@ -25,6 +25,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
 import org.apache.drill.exec.ops.OperatorContext;
 
 import parquet.bytes.ByteBufferAllocator;
@@ -32,17 +34,24 @@ import parquet.bytes.ByteBufferAllocator;
 public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
 
-  private OperatorContext oContext;
-  private HashMap<Integer, ByteBuf> allocatedBuffers = new HashMap<Integer, ByteBuf>();
+  private final BufferAllocator allocator;
+  private final HashMap<Integer, ByteBuf> allocatedBuffers = new HashMap<Integer, ByteBuf>();
 
   public ParquetDirectByteBufferAllocator(OperatorContext o){
-    oContext=o;
+    allocator = o.getAllocator();
+  }
+
+  public ParquetDirectByteBufferAllocator(BufferAllocator allocator) {
+    this.allocator = allocator;
   }
 
 
   @Override
   public ByteBuffer allocate(int sz) {
-    ByteBuf bb = oContext.getAllocator().buffer(sz);
+    ByteBuf bb = allocator.buffer(sz);
+    if (bb == null) {
+      throw new OutOfMemoryRuntimeException();
+    }
     ByteBuffer b = bb.nioBuffer(0, sz);
     allocatedBuffers.put(System.identityHashCode(b), bb);
     logger.debug("ParquetDirectByteBufferAllocator: Allocated "+sz+" bytes. Allocated ByteBuffer id: "+System.identityHashCode(b));

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index cfa4c93..322a88d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 
 import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ParquetFileWriter;
 
 import com.google.common.collect.ImmutableSet;
@@ -74,7 +73,6 @@ public class ParquetFormatPlugin implements FormatPlugin{
   private static final List<MagicString> MAGIC_STRINGS = Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC));
 
   private final DrillbitContext context;
-  private final CodecFactoryExposer codecFactoryExposer;
   private final Configuration fsConf;
   private final ParquetFormatMatcher formatMatcher;
   private final ParquetFormatConfig config;
@@ -89,7 +87,6 @@ public class ParquetFormatPlugin implements FormatPlugin{
   public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
       StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
     this.context = context;
-    this.codecFactoryExposer = new CodecFactoryExposer(fsConf);
     this.config = formatConfig;
     this.formatMatcher = new ParquetFormatMatcher(this);
     this.storageConfig = storageConfig;
@@ -171,10 +168,6 @@ public class ParquetFormatPlugin implements FormatPlugin{
     return storageConfig;
   }
 
-  public CodecFactoryExposer getCodecFactoryExposer() {
-    return codecFactoryExposer;
-  }
-
   public String getName(){
     return name;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 3506ffa..8615eb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -80,6 +80,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private boolean validating = false;
   private CompressionCodecName codec = CompressionCodecName.SNAPPY;
   private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
+  private DirectCodecFactory codecFactory;
 
   private long recordCount = 0;
   private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
@@ -100,6 +101,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
     super();
     this.oContext = context.newOperatorContext(writer, true);
+    this.codecFactory = new DirectCodecFactory(writer.getFormatPlugin().getFsConf(), oContext.getAllocator());
   }
 
   @Override
@@ -156,10 +158,9 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
     int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5);
     pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext,
-      codec,
-      pageSize,
-      this.schema,
-      initialBlockBufferSize);
+        codecFactory.getCompressor(codec, pageSize),
+        schema,
+        initialBlockBufferSize);
     int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
     store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(this.schema);
@@ -332,6 +333,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
       ColumnChunkPageWriteStoreExposer.close(pageStore);
     }
 
+    codecFactory.close();
+
     if (!hasRecords) {
       // the very last file is empty, delete it (DRILL-2408)
       Path path = getPath();

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index d5586ce..d5b7303 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -39,7 +39,6 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
-import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
@@ -130,7 +129,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
           readers.add(
               new ParquetRecordReader(
                   context, e.getPath(), e.getRowGroupIndex(), fs,
-                  rowGroupScan.getStorageEngine().getCodecFactoryExposer(),
+                  new DirectCodecFactory(fs.getConf(), oContext.getAllocator()),
                   footers.get(e.getPath()),
                   rowGroupScan.getColumns()
               )

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 6a41a04..8c73b2a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -28,6 +28,9 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.store.parquet.ColumnDataReader;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.ByteBufBytesInput;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor;
 import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,8 +48,6 @@ import parquet.format.PageHeader;
 import parquet.format.PageType;
 import parquet.format.Util;
 import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
-import parquet.hadoop.CodecFactoryExposer.HadoopByteBufBytesInput;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.schema.PrimitiveType;
@@ -101,13 +102,13 @@ final class PageReader {
   // These need to be held throughout reading of the entire column chunk
   List<ByteBuf> allocatedDictionaryBuffers;
 
-  private final CodecFactoryExposer codecFactory;
+  private final DirectCodecFactory codecFactory;
 
   PageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
     throws ExecutionSetupException{
     this.parentColumnReader = parentStatus;
     allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
-    codecFactory = parentColumnReader.parentReader.getCodecFactoryExposer();
+    codecFactory = parentColumnReader.parentReader.getCodecFactory();
 
     long start = columnChunkMetaData.getFirstDataPageOffset();
     try {
@@ -137,10 +138,12 @@ final class PageReader {
         final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
         try {
           dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
-          codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+          DirectBytesDecompressor decompressor = codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData
+              .getCodec());
+          decompressor.decompress(
               compressedData,
-              dictionaryData,
               pageHeader.compressed_page_size,
+              dictionaryData,
               pageHeader.getUncompressed_page_size());
 
         } finally {
@@ -149,7 +152,7 @@ final class PageReader {
       }
 
       DictionaryPage page = new DictionaryPage(
-          getBytesInput(dictionaryData),
+          asBytesInput(dictionaryData, 0, pageHeader.uncompressed_page_size),
           pageHeader.uncompressed_page_size,
           pageHeader.dictionary_page_header.num_values,
           parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
@@ -158,9 +161,8 @@ final class PageReader {
     }
   }
 
-  public static BytesInput getBytesInput(DrillBuf uncompressedByteBuf) throws IOException {
-    final ByteBuffer outBuffer = uncompressedByteBuf.nioBuffer(0, uncompressedByteBuf.capacity());
-    return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit());
+  public static BytesInput asBytesInput(DrillBuf buf, int offset, int length) throws IOException {
+    return new ByteBufBytesInput(buf);
   }
 
   /**
@@ -197,17 +199,17 @@ final class PageReader {
           final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
           try{
             dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
-            codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+            codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec()).decompress(
                 compressedData,
-                uncompressedData,
                 pageHeader.compressed_page_size,
+                uncompressedData,
                 pageHeader.getUncompressed_page_size());
           } finally {
             compressedData.release();
           }
         }
         DictionaryPage page = new DictionaryPage(
-            getBytesInput(uncompressedData),
+            asBytesInput(uncompressedData, 0, pageHeader.uncompressed_page_size),
             pageHeader.uncompressed_page_size,
             pageHeader.dictionary_page_header.num_values,
             parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
@@ -225,10 +227,10 @@ final class PageReader {
     }else{
       final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
       dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
-      codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+      codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec()).decompress(
           compressedData,
-          pageData,
           pageHeader.compressed_page_size,
+          pageData,
           pageHeader.getUncompressed_page_size());
       compressedData.release();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 11d0042..2072aae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -30,7 +30,6 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -39,8 +38,8 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
 import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -51,7 +50,6 @@ import parquet.column.ColumnDescriptor;
 import parquet.format.FileMetaData;
 import parquet.format.SchemaElement;
 import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ParquetFileWriter;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
@@ -103,36 +101,41 @@ public class ParquetRecordReader extends AbstractRecordReader {
   // records specified in the row group metadata
   long mockRecordsRead;
 
-  private final CodecFactoryExposer codecFactoryExposer;
+  private final DirectCodecFactory codecFactory;
   int rowGroupIndex;
   long totalRecordsRead;
 
-  public ParquetRecordReader(FragmentContext fragmentContext, //
-                             String path, //
-                             int rowGroupIndex, //
-                             FileSystem fs, //
-                             CodecFactoryExposer codecFactoryExposer, //
-                             ParquetMetadata footer, //
+  public ParquetRecordReader(FragmentContext fragmentContext,
+      String path,
+      int rowGroupIndex,
+      FileSystem fs,
+      DirectCodecFactory codecFactory,
+      ParquetMetadata footer,
                              List<SchemaPath> columns) throws ExecutionSetupException {
-    this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer,
+    this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer,
         columns);
   }
 
-  public ParquetRecordReader(FragmentContext fragmentContext, long batchSize,
-                             String path, int rowGroupIndex, FileSystem fs,
-                             CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer,
-                             List<SchemaPath> columns) throws ExecutionSetupException {
+  public ParquetRecordReader(
+      FragmentContext fragmentContext,
+      long batchSize,
+      String path,
+      int rowGroupIndex,
+      FileSystem fs,
+      DirectCodecFactory codecFactory,
+      ParquetMetadata footer,
+      List<SchemaPath> columns) throws ExecutionSetupException {
     this.hadoopPath = new Path(path);
     this.fileSystem = fs;
-    this.codecFactoryExposer = codecFactoryExposer;
+    this.codecFactory = codecFactory;
     this.rowGroupIndex = rowGroupIndex;
     this.batchSize = batchSize;
     this.footer = footer;
     setColumns(columns);
   }
 
-  public CodecFactoryExposer getCodecFactoryExposer() {
-    return codecFactoryExposer;
+  public DirectCodecFactory getCodecFactory() {
+    return codecFactory;
   }
 
   public Path getHadoopPath() {
@@ -452,6 +455,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
     }
     columnStatuses.clear();
 
+    codecFactory.close();
+
     for (VarLengthColumn r : varLengthReader.columns) {
       r.clear();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 921d134..07950df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -18,16 +18,14 @@
 package org.apache.drill.exec.store.parquet2;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.Collection;
-import java.util.ArrayList;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
-import com.google.common.collect.Sets;
-
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
@@ -43,37 +41,30 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
 import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import parquet.column.ColumnDescriptor;
 import parquet.common.schema.ColumnPath;
-import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ColumnChunkIncReadStore;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.io.ColumnIOFactory;
-import parquet.io.InvalidRecordException;
 import parquet.io.MessageColumnIO;
 import parquet.schema.GroupType;
 import parquet.schema.MessageType;
 import parquet.schema.Type;
-import parquet.schema.PrimitiveType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-
-import parquet.schema.Types;
+import com.google.common.collect.Sets;
 
 public class DrillParquetReader extends AbstractRecordReader {
 
@@ -247,7 +238,6 @@ public class DrillParquetReader extends AbstractRecordReader {
         paths.put(md.getPath(), md);
       }
 
-      CodecFactoryExposer codecFactoryExposer = new CodecFactoryExposer(fileSystem.getConf());
       Path filePath = new Path(entry.getPath());
 
       BlockMetaData blockMetaData = footer.getBlocks().get(entry.getRowGroupIndex());
@@ -255,7 +245,8 @@ public class DrillParquetReader extends AbstractRecordReader {
       recordCount = (int) blockMetaData.getRowCount();
 
       pageReadStore = new ColumnChunkIncReadStore(recordCount,
-              codecFactoryExposer.getCodecFactory(), operatorContext.getAllocator(), fileSystem, filePath);
+          new DirectCodecFactory(fileSystem.getConf(), operatorContext.getAllocator()), operatorContext.getAllocator(),
+          fileSystem, filePath);
 
       for (String[] path : schema.getPaths()) {
         Type type = schema.getType(path);

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
deleted file mode 100644
index 5438660..0000000
--- a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.hadoop;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DirectDecompressionCodec;
-import org.apache.hadoop.io.compress.DirectDecompressor;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import parquet.bytes.BytesInput;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-public class CodecFactoryExposer{
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CodecFactoryExposer.class);
-
-  private CodecFactory codecFactory;
-  private final Map<String, org.apache.hadoop.io.compress.DirectDecompressionCodec> codecByName = new HashMap<String, org.apache.hadoop.io.compress.DirectDecompressionCodec>();
-  private Configuration configuration;
-
-  public CodecFactoryExposer(Configuration config){
-    codecFactory = new CodecFactory(config);configuration = config;
-  }
-
-  public CodecFactory getCodecFactory() {
-    return codecFactory;
-  }
-
-  public BytesInput decompress(BytesInput bytes, int uncompressedSize, CompressionCodecName codecName) throws IOException {
-    return codecFactory.getDecompressor(codecName).decompress(bytes, uncompressedSize);
-  }
-
-  public static BytesInput getBytesInput(ByteBuf uncompressedByteBuf, int uncompressedSize) throws IOException {
-    ByteBuffer outBuffer=uncompressedByteBuf.nioBuffer(0, uncompressedSize);
-    return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit());
-  }
-
-  public void decompress(CompressionCodecName codecName,
-      final DrillBuf compressedByteBuf,
-      final DrillBuf uncompressedByteBuf,
-                               int compressedSize,
-                               int uncompressedSize) throws IOException {
-    final ByteBuffer inpBuffer = compressedByteBuf.nioBuffer(0, compressedSize);
-    final ByteBuffer outBuffer = uncompressedByteBuf.nioBuffer(0, uncompressedSize);
-    CompressionCodec c = getCodec(codecName);
-    //TODO: Create the decompressor only once at init time.
-    Class<?> cx = c.getClass();
-
-    DirectDecompressionCodec d=null;
-    DirectDecompressor decompr=null;
-
-    if (DirectDecompressionCodec.class.isAssignableFrom(cx)) {
-      d=(DirectDecompressionCodec)c;
-    }
-
-    if(d!=null) {
-      decompr = d.createDirectDecompressor();
-    }
-
-    if(d!=null && decompr!=null){
-      decompr.decompress(inpBuffer, outBuffer);
-    }else{
-      logger.warn("This Hadoop implementation does not support a " + codecName +
-        " direct decompression codec interface. "+
-        "Direct decompression is available only on *nix systems with Hadoop 2.3 or greater. "+
-        "Read operations will be a little slower. ");
-      BytesInput outBytesInp = this.decompress(
-        new HadoopByteBufBytesInput(inpBuffer, 0, inpBuffer.limit()),
-        uncompressedSize,
-        codecName);
-      // COPY the data back into the output buffer.
-      // (DrillBufs can only refer to direct memory, so we cannot pass back a BytesInput backed
-      // by a byte array).
-      outBuffer.put(outBytesInp.toByteArray());
-    }
-  }
-
-  private DirectDecompressionCodec getCodec(CompressionCodecName codecName) {
-    String codecClassName = codecName.getHadoopCompressionCodecClassName();
-    if (codecClassName == null) {
-      return null;
-    }
-    DirectDecompressionCodec codec = codecByName.get(codecClassName);
-    if (codec != null) {
-      return codec;
-    }
-
-    try {
-      Class<?> codecClass = Class.forName(codecClassName);
-      codec = (DirectDecompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
-      codecByName.put(codecClassName, codec);
-      return codec;
-    } catch (ClassNotFoundException e) {
-      throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
-    }
-  }
-
-  public static class HadoopByteBufBytesInput extends BytesInput {
-
-    private final ByteBuffer byteBuf;
-    private final int length;
-    private final int offset;
-
-    public HadoopByteBufBytesInput(ByteBuffer byteBuf, int offset, int length) {
-      super();
-      this.byteBuf = byteBuf;
-      this.offset = offset;
-      this.length = length;
-    }
-
-    @Override
-    public void writeAllTo(OutputStream out) throws IOException {
-      final WritableByteChannel outputChannel = Channels.newChannel(out);
-      byteBuf.position(offset);
-      ByteBuffer tempBuf = byteBuf.slice();
-      tempBuf.limit(length);
-      outputChannel.write(tempBuf);
-    }
-
-    @Override
-    public ByteBuffer toByteBuffer() throws IOException {
-      byteBuf.position(offset);
-      ByteBuffer buf = byteBuf.slice();
-      buf.limit(length);
-      return buf;
-    }
-
-    @Override
-    public long size() {
-      return length;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
index 242cd28..6337d4c 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -28,7 +28,7 @@ import java.util.Map;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -52,14 +52,15 @@ public class ColumnChunkIncReadStore implements PageReadStore {
 
   private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
 
-  private CodecFactory codecFactory = new CodecFactory(new Configuration());
+  private DirectCodecFactory codecFactory;
   private BufferAllocator allocator;
   private FileSystem fs;
   private Path path;
   private long rowCount;
   private List<FSDataInputStream> streams = new ArrayList();
 
-  public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator, FileSystem fs, Path path) {
+  public ColumnChunkIncReadStore(long rowCount, DirectCodecFactory codecFactory, BufferAllocator allocator,
+      FileSystem fs, Path path) {
     this.codecFactory = codecFactory;
     this.allocator = allocator;
     this.fs = fs;

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
index 0e9dec0..743d185 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
@@ -21,21 +21,19 @@ import java.io.IOException;
 
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
-import org.apache.hadoop.conf.Configuration;
 
 import parquet.column.page.PageWriteStore;
 import parquet.hadoop.CodecFactory.BytesCompressor;
-import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.schema.MessageType;
 
 public class ColumnChunkPageWriteStoreExposer {
 
-  public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(OperatorContext oContext,
-                                                                       CompressionCodecName codec,
-                                                                       int pageSize,
-                                                                       MessageType schema,
-                                                                       int initialSize) {
-    BytesCompressor compressor = new CodecFactory(new Configuration()).getCompressor(codec, pageSize);
+  public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(
+      OperatorContext oContext,
+      BytesCompressor compressor,
+      MessageType schema,
+      int initialSize
+      ) {
     return new ColumnChunkPageWriteStore(compressor, schema, initialSize, new ParquetDirectByteBufferAllocator(oContext));
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java
new file mode 100644
index 0000000..644144e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import io.netty.buffer.DrillBuf;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.drill.common.DeferredException;
+import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.ByteBufBytesInput;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import parquet.bytes.BytesInput;
+import parquet.hadoop.CodecFactory.BytesCompressor;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+public class TestDirectCodecFactory extends ExecTest {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestDirectCodecFactory.class);
+
+  private static enum Decompression {
+    ON_HEAP, OFF_HEAP, DRILLBUF
+  }
+
+  private void test(int size, CompressionCodecName codec, boolean useOnHeapCompression, Decompression decomp) {
+    DrillBuf rawBuf = null;
+    DrillBuf outBuf = null;
+    try (BufferAllocator allocator = new TopLevelAllocator();
+        DirectCodecFactory codecFactory = new DirectCodecFactory(new Configuration(), allocator)) {
+      try {
+        rawBuf = allocator.buffer(size);
+        final byte[] rawArr = new byte[size];
+        outBuf = allocator.buffer(size * 2);
+        Random r = new Random();
+        byte[] random = new byte[1024];
+        int pos = 0;
+        while (pos < size) {
+          r.nextBytes(random);
+          rawBuf.writeBytes(random);
+          System.arraycopy(random, 0, rawArr, pos, random.length);
+          pos += random.length;
+        }
+
+        BytesCompressor c = codecFactory.getCompressor(codec, 64 * 1024);
+        DirectBytesDecompressor d = codecFactory.getDecompressor(codec);
+
+        BytesInput compressed;
+        if (useOnHeapCompression) {
+          compressed = c.compress(BytesInput.from(rawArr));
+        } else {
+          compressed = c.compress(new ByteBufBytesInput(rawBuf));
+        }
+
+        switch (decomp) {
+        case DRILLBUF: {
+          ByteBuffer buf = compressed.toByteBuffer();
+          DrillBuf b = allocator.buffer(buf.capacity());
+          try {
+            b.writeBytes(buf);
+            d.decompress(b, (int) compressed.size(), outBuf, size);
+            for (int i = 0; i < size; i++) {
+              Assert.assertTrue("Data didn't match at " + i, outBuf.getByte(i) == rawBuf.getByte(i));
+            }
+          } finally {
+            b.release();
+          }
+          break;
+        }
+
+        case OFF_HEAP: {
+          ByteBuffer buf = compressed.toByteBuffer();
+          DrillBuf b = allocator.buffer(buf.capacity());
+          try {
+            b.writeBytes(buf);
+            BytesInput input = d.decompress(new ByteBufBytesInput(b), size);
+            Assert.assertArrayEquals(input.toByteArray(), rawArr);
+          } finally {
+            b.release();
+          }
+          break;
+        }
+        case ON_HEAP: {
+          byte[] buf = compressed.toByteArray();
+          BytesInput input = d.decompress(BytesInput.from(buf), size);
+          Assert.assertArrayEquals(input.toByteArray(), rawArr);
+          break;
+        }
+        }
+      } catch (Exception e) {
+        String msg = String.format(
+            "Failure while testing Codec: %s, OnHeapCompressionInput: %s, Decompression Mode: %s, Data Size: %d",
+            codec.name(),
+            useOnHeapCompression, decomp.name(), size);
+        System.out.println(msg);
+        throw new RuntimeException(msg, e);
+      } finally {
+        if (rawBuf != null) {
+          rawBuf.release();
+        }
+        if (outBuf != null) {
+          outBuf.release();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void compressionCodecs() throws Exception {
+    int[] sizes = { 4 * 1024, 1 * 1024 * 1024 };
+    boolean[] comp = { true, false };
+
+    try (DeferredException ex = new DeferredException()) {
+      for (int size : sizes) {
+        for (boolean useOnHeapComp : comp) {
+          for (Decompression decomp : Decompression.values()) {
+            for (CompressionCodecName codec : CompressionCodecName.values()) {
+              if (codec == CompressionCodecName.LZO) {
+                // not installed as gpl.
+                continue;
+              }
+              try {
+                test(size, codec, useOnHeapComp, decomp);
+              } catch (Exception e) {
+                ex.addException(e);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index e50e3fb..83a1cb8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -70,7 +70,6 @@ import parquet.bytes.BytesInput;
 import parquet.column.page.DataPageV1;
 import parquet.column.page.PageReadStore;
 import parquet.column.page.PageReader;
-import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.Footer;
 import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.metadata.ParquetMetadata;
@@ -625,7 +624,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
     BufferAllocator allocator = new TopLevelAllocator();
     for(int i = 0; i < 25; i++) {
       ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs,
-          new CodecFactoryExposer(dfsConfig), f.getParquetMetadata(), columns);
+          new DirectCodecFactory(dfsConfig, allocator), f.getParquetMetadata(), columns);
       TestOutputMutator mutator = new TestOutputMutator(allocator);
       rr.setup(mutator);
       Stopwatch watch = new Stopwatch();

http://git-wip-us.apache.org/repos/asf/drill/blob/151a7f45/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index defa42f..7fdc794 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
     <proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path>
     <dep.junit.version>4.11</dep.junit.version>
     <dep.slf4j.version>1.7.6</dep.slf4j.version>
-    <parquet.version>1.6.0rc3-drill-r0.1</parquet.version>
+    <parquet.version>1.6.0rc3-drill-r0.3</parquet.version>
   </properties>
 
   <scm>


[2/7] drill git commit: DRILL-2006: Updated Text reader. Increases variations of text files Drill can work with.

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
new file mode 100644
index 0000000..e411461
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingSettings.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.univocity.parsers.common.TextParsingException;
+
+public class TextParsingSettings {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextParsingSettings.class);
+
+  public static final TextParsingSettings DEFAULT = new TextParsingSettings();
+
+  private String emptyValue = null;
+  private boolean parseUnescapedQuotes = true;
+  private byte quote = b('"');
+  private byte quoteEscape = b('"');
+  private byte delimiter = b(',');
+  private byte comment = b('#');
+
+  private long maxCharsPerColumn = Character.MAX_VALUE;
+  private byte normalizedNewLine = b('\n');
+  private byte[] newLineDelimiter = {normalizedNewLine};
+  private boolean ignoreLeadingWhitespaces = false;
+  private boolean ignoreTrailingWhitespaces = false;
+  private String lineSeparatorString = "\n";
+  private boolean skipFirstLine = false;
+
+  // these options are not yet supported
+  private boolean headerExtractionEnabled = false;
+  private boolean useRepeatedVarChar = true;
+  private int numberOfRecordsToRead = -1;
+
+  public void set(TextFormatConfig config){
+    this.quote = bSafe(config.getQuote(), "quote");
+    this.quoteEscape = bSafe(config.getEscape(), "escape");
+    this.newLineDelimiter = config.getLineDelimiter().getBytes(Charsets.UTF_8);
+    Preconditions.checkArgument(newLineDelimiter.length == 1 || newLineDelimiter.length == 2,
+        String.format("Line delimiter must be 1 or 2 bytes in length.  The provided delimiter was %d bytes long.", newLineDelimiter.length));
+    this.delimiter = bSafe(config.getFieldDelimiter(), "fieldDelimiter");
+    this.comment = bSafe(config.getComment(), "comment");
+    this.skipFirstLine = config.isSkipFirstLine();
+  }
+
+  public byte getComment(){
+    return comment;
+  }
+
+  public boolean isSkipFirstLine() {
+    return skipFirstLine;
+  }
+
+  public void setSkipFirstLine(boolean skipFirstLine) {
+    this.skipFirstLine = skipFirstLine;
+  }
+
+  public boolean isUseRepeatedVarChar() {
+    return useRepeatedVarChar;
+  }
+
+  public void setUseRepeatedVarChar(boolean useRepeatedVarChar) {
+    this.useRepeatedVarChar = useRepeatedVarChar;
+  }
+
+
+  private static byte bSafe(char c, String name){
+    if(c > Byte.MAX_VALUE) {
+      throw new IllegalArgumentException(String.format("Failure validating configuration option %s.  Expected a "
+          + "character between 0 and 127 but value was actually %d.", name, (int) c));
+    }
+    return (byte) c;
+  }
+
+  private static byte b(char c){
+    return (byte) c;
+  }
+
+  public byte[] getNewLineDelimiter() {
+    return newLineDelimiter;
+  }
+
+  /**
+   * Returns the character used for escaping values where the field delimiter is part of the value. Defaults to '"'
+   * @return the quote character
+   */
+  public byte getQuote() {
+    return quote;
+  }
+
+  /**
+   * Defines the character used for escaping values where the field delimiter is part of the value. Defaults to '"'
+   * @param quote the quote character
+   */
+  public void setQuote(byte quote) {
+    this.quote = quote;
+  }
+
+  public String getLineSeparatorString(){
+    return lineSeparatorString;
+  }
+
+
+  /**
+   * Identifies whether or not a given character is used for escaping values where the field delimiter is part of the value
+   * @param ch the character to be verified
+   * @return true if the given character is the character used for escaping values, false otherwise
+   */
+  public boolean isQuote(byte ch) {
+    return this.quote == ch;
+  }
+
+  /**
+   * Returns the character used for escaping quotes inside an already quoted value. Defaults to '"'
+   * @return the quote escape character
+   */
+  public byte getQuoteEscape() {
+    return quoteEscape;
+  }
+
+  /**
+   * Defines the character used for escaping quotes inside an already quoted value. Defaults to '"'
+   * @param quoteEscape the quote escape character
+   */
+  public void setQuoteEscape(byte quoteEscape) {
+    this.quoteEscape = quoteEscape;
+  }
+
+  /**
+   * Identifies whether or not a given character is used for escaping quotes inside an already quoted value.
+   * @param ch the character to be verified
+   * @return true if the given character is the quote escape character, false otherwise
+   */
+  public boolean isQuoteEscape(byte ch) {
+    return this.quoteEscape == ch;
+  }
+
+  /**
+   * Returns the field delimiter character. Defaults to ','
+   * @return the field delimiter character
+   */
+  public byte getDelimiter() {
+    return delimiter;
+  }
+
+  /**
+   * Defines the field delimiter character. Defaults to ','
+   * @param delimiter the field delimiter character
+   */
+  public void setDelimiter(byte delimiter) {
+    this.delimiter = delimiter;
+  }
+
+  /**
+   * Identifies whether or not a given character represents a field delimiter
+   * @param ch the character to be verified
+   * @return true if the given character is the field delimiter character, false otherwise
+   */
+  public boolean isDelimiter(byte ch) {
+    return this.delimiter == ch;
+  }
+
+  /**
+   * Returns the String representation of an empty value (defaults to null)
+   *
+   * <p>When reading, if the parser does not read any character from the input, and the input is within quotes, the empty is used instead of an empty string
+   *
+   * @return the String representation of an empty value
+   */
+  public String getEmptyValue() {
+    return emptyValue;
+  }
+
+  /**
+   * Sets the String representation of an empty value (defaults to null)
+   *
+   * <p>When reading, if the parser does not read any character from the input, and the input is within quotes, the empty is used instead of an empty string
+   *
+   * @param emptyValue the String representation of an empty value
+   */
+  public void setEmptyValue(String emptyValue) {
+    this.emptyValue = emptyValue;
+  }
+
+
+  /**
+   * Indicates whether the CSV parser should accept unescaped quotes inside quoted values and parse them normally. Defaults to {@code true}.
+   * @return a flag indicating whether or not the CSV parser should accept unescaped quotes inside quoted values.
+   */
+  public boolean isParseUnescapedQuotes() {
+    return parseUnescapedQuotes;
+  }
+
+  /**
+   * Configures how to handle unescaped quotes inside quoted values. If set to {@code true}, the parser will parse the quote normally as part of the value.
+   * If set the {@code false}, a {@link TextParsingException} will be thrown. Defaults to {@code true}.
+   * @param parseUnescapedQuotes indicates whether or not the CSV parser should accept unescaped quotes inside quoted values.
+   */
+  public void setParseUnescapedQuotes(boolean parseUnescapedQuotes) {
+    this.parseUnescapedQuotes = parseUnescapedQuotes;
+  }
+
+  /**
+   * Indicates whether or not the first valid record parsed from the input should be considered as the row containing the names of each column
+   * @return true if the first valid record parsed from the input should be considered as the row containing the names of each column, false otherwise
+   */
+  public boolean isHeaderExtractionEnabled() {
+    return headerExtractionEnabled;
+  }
+
+  /**
+   * Defines whether or not the first valid record parsed from the input should be considered as the row containing the names of each column
+   * @param headerExtractionEnabled a flag indicating whether the first valid record parsed from the input should be considered as the row containing the names of each column
+   */
+  public void setHeaderExtractionEnabled(boolean headerExtractionEnabled) {
+    this.headerExtractionEnabled = headerExtractionEnabled;
+  }
+
+  /**
+   * The number of valid records to be parsed before the process is stopped. A negative value indicates there's no limit (defaults to -1).
+   * @return the number of records to read before stopping the parsing process.
+   */
+  public int getNumberOfRecordsToRead() {
+    return numberOfRecordsToRead;
+  }
+
+  /**
+   * Defines the number of valid records to be parsed before the process is stopped. A negative value indicates there's no limit (defaults to -1).
+   * @param numberOfRecordsToRead the number of records to read before stopping the parsing process.
+   */
+  public void setNumberOfRecordsToRead(int numberOfRecordsToRead) {
+    this.numberOfRecordsToRead = numberOfRecordsToRead;
+  }
+
+  public long getMaxCharsPerColumn() {
+    return maxCharsPerColumn;
+  }
+
+  public void setMaxCharsPerColumn(long maxCharsPerColumn) {
+    this.maxCharsPerColumn = maxCharsPerColumn;
+  }
+
+  public void setComment(byte comment) {
+    this.comment = comment;
+  }
+
+  public byte getNormalizedNewLine() {
+    return normalizedNewLine;
+  }
+
+  public void setNormalizedNewLine(byte normalizedNewLine) {
+    this.normalizedNewLine = normalizedNewLine;
+  }
+
+  public boolean isIgnoreLeadingWhitespaces() {
+    return ignoreLeadingWhitespaces;
+  }
+
+  public void setIgnoreLeadingWhitespaces(boolean ignoreLeadingWhitespaces) {
+    this.ignoreLeadingWhitespaces = ignoreLeadingWhitespaces;
+  }
+
+  public boolean isIgnoreTrailingWhitespaces() {
+    return ignoreTrailingWhitespaces;
+  }
+
+  public void setIgnoreTrailingWhitespaces(boolean ignoreTrailingWhitespaces) {
+    this.ignoreTrailingWhitespaces = ignoreTrailingWhitespaces;
+  }
+
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
new file mode 100644
index 0000000..fec0ab4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextReader.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.UserException;
+
+import com.univocity.parsers.common.TextParsingException;
+import com.univocity.parsers.csv.CsvParserSettings;
+
+/*******************************************************************************
+ * Portions Copyright 2014 uniVocity Software Pty Ltd
+ ******************************************************************************/
+
+/**
+ * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and
+ * DrillBuf support.
+ */
+final class TextReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextReader.class);
+
+  private static final byte NULL_BYTE = (byte) '\0';
+
+  private final TextParsingContext context;
+
+  private final long recordsToRead;
+  private final TextParsingSettings settings;
+
+  private final TextInput input;
+  private final TextOutput output;
+  private final DrillBuf workBuf;
+
+  private byte ch;
+
+  // index of the field within this record
+  private int fieldIndex;
+
+  /** Behavior settings **/
+  private final boolean ignoreTrailingWhitespace;
+  private final boolean ignoreLeadingWhitespace;
+  private final boolean parseUnescapedQuotes;
+
+  /** Key Characters **/
+  private final byte comment;
+  private final byte delimiter;
+  private final byte quote;
+  private final byte quoteEscape;
+  private final byte newLine;
+
+  /**
+   * The CsvParser supports all settings provided by {@link CsvParserSettings}, and requires this configuration to be
+   * properly initialized.
+   * @param settings  the parser configuration
+   * @param input  input stream
+   * @param output  interface to produce output record batch
+   * @param workBuf  working buffer to handle whitespaces
+   */
+  public TextReader(TextParsingSettings settings, TextInput input, TextOutput output, DrillBuf workBuf) {
+    this.context = new TextParsingContext(input, output);
+    this.workBuf = workBuf;
+    this.settings = settings;
+
+    this.recordsToRead = settings.getNumberOfRecordsToRead() == -1 ? Long.MAX_VALUE : settings.getNumberOfRecordsToRead();
+
+    this.ignoreTrailingWhitespace = settings.isIgnoreTrailingWhitespaces();
+    this.ignoreLeadingWhitespace = settings.isIgnoreLeadingWhitespaces();
+    this.parseUnescapedQuotes = settings.isParseUnescapedQuotes();
+    this.delimiter = settings.getDelimiter();
+    this.quote = settings.getQuote();
+    this.quoteEscape = settings.getQuoteEscape();
+    this.newLine = settings.getNormalizedNewLine();
+    this.comment = settings.getComment();
+
+    this.input = input;
+    this.output = output;
+
+  }
+
+  public TextOutput getOutput(){
+    return output;
+  }
+
+  /* Check if the given byte is a white space. As per the univocity text reader
+   * any ASCII <= ' ' is considered a white space. However since byte in JAVA is signed
+   * we have an additional check to make sure its not negative
+   */
+  static final boolean isWhite(byte b){
+    return b <= ' ' && b > -1;
+  }
+
+  // Inform the output interface to indicate we are starting a new record batch
+  public void resetForNextBatch(){
+    output.startBatch();
+  }
+
+  public long getPos(){
+    return input.getPos();
+  }
+
+  /**
+   * Function encapsulates parsing an entire record, delegates parsing of the
+   * fields to parseField() function.
+   * We mark the start of the record and if there are any failures encountered (OOM for eg)
+   * then we reset the input stream to the marked position
+   * @return  true if parsing this record was successful; false otherwise
+   * @throws IOException
+   */
+  private boolean parseRecord() throws IOException {
+    final byte newLine = this.newLine;
+    final TextInput input = this.input;
+
+    input.mark();
+
+    fieldIndex = 0;
+    if (isWhite(ch) && ignoreLeadingWhitespace) {
+      skipWhitespace();
+    }
+
+    int fieldsWritten = 0;
+    try{
+      boolean earlyTerm = false;
+      while (ch != newLine) {
+        earlyTerm = !parseField();
+        fieldsWritten++;
+        if (ch != newLine) {
+          ch = input.nextChar();
+          if (ch == newLine) {
+            output.endEmptyField();
+            break;
+          }
+        }
+        if(earlyTerm){
+          if(ch != newLine){
+            input.skipLines(1);
+          }
+          break;
+        }
+      }
+    }catch(StreamFinishedPseudoException e){
+      // if we've written part of a field or all of a field, we should send this row.
+      if(fieldsWritten == 0 && !output.rowHasData()){
+        throw e;
+      }
+    }
+
+    output.finishRecord();
+    return true;
+  }
+
+  /**
+   * Function parses an individual field and ignores any white spaces encountered
+   * by not appending it to the output vector
+   * @throws IOException
+   */
+  private void parseValueIgnore() throws IOException {
+    final byte newLine = this.newLine;
+    final byte delimiter = this.delimiter;
+    final TextOutput output = this.output;
+    final TextInput input = this.input;
+
+    byte ch = this.ch;
+    while (ch != delimiter && ch != newLine) {
+      output.appendIgnoringWhitespace(ch);
+//      fieldSize++;
+      ch = input.nextChar();
+    }
+    this.ch = ch;
+  }
+
+  /**
+   * Function parses an individual field and appends all characters till the delimeter (or newline)
+   * to the output, including white spaces
+   * @throws IOException
+   */
+  private void parseValueAll() throws IOException {
+    final byte newLine = this.newLine;
+    final byte delimiter = this.delimiter;
+    final TextOutput output = this.output;
+    final TextInput input = this.input;
+
+    byte ch = this.ch;
+    while (ch != delimiter && ch != newLine) {
+      output.append(ch);
+      ch = input.nextChar();
+    }
+    this.ch = ch;
+  }
+
+  /**
+   * Function simply delegates the parsing of a single field to the actual implementation based on parsing config
+   * @throws IOException
+   */
+  private void parseValue() throws IOException {
+    if (ignoreTrailingWhitespace) {
+      parseValueIgnore();
+    }else{
+      parseValueAll();
+    }
+  }
+
+  /**
+   * Recursive function invoked when a quote is encountered. Function also
+   * handles the case when there are non-white space characters in the field
+   * after the quoted value.
+   * @param prev  previous byte read
+   * @throws IOException
+   */
+  private void parseQuotedValue(byte prev) throws IOException {
+    final byte newLine = this.newLine;
+    final byte delimiter = this.delimiter;
+    final TextOutput output = this.output;
+    final TextInput input = this.input;
+    final byte quote = this.quote;
+
+    ch = input.nextChar();
+
+    while (!(prev == quote && (ch == delimiter || ch == newLine || isWhite(ch)))) {
+      if (ch != quote) {
+        if (prev == quote) { // unescaped quote detected
+          if (parseUnescapedQuotes) {
+            output.append(quote);
+            output.append(ch);
+            parseQuotedValue(ch);
+            break;
+          } else {
+            throw new TextParsingException(
+                context,
+                "Unescaped quote character '"
+                    + quote
+                    + "' inside quoted value of CSV field. To allow unescaped quotes, set 'parseUnescapedQuotes' to 'true' in the CSV parser settings. Cannot parse CSV input.");
+          }
+        }
+        output.append(ch);
+        prev = ch;
+      } else if (prev == quoteEscape) {
+        output.append(quote);
+        prev = NULL_BYTE;
+      } else {
+        prev = ch;
+      }
+      ch = input.nextChar();
+    }
+
+    // handles whitespaces after quoted value: whitespaces are ignored. Content after whitespaces may be parsed if
+    // 'parseUnescapedQuotes' is enabled.
+    if (ch != newLine && ch <= ' ') {
+      final DrillBuf workBuf = this.workBuf;
+      workBuf.resetWriterIndex();
+      do {
+        // saves whitespaces after value
+        workBuf.writeByte(ch);
+        ch = input.nextChar();
+        // found a new line, go to next record.
+        if (ch == newLine) {
+          return;
+        }
+      } while (ch <= ' ');
+
+      // there's more stuff after the quoted value, not only empty spaces.
+      if (!(ch == delimiter || ch == newLine) && parseUnescapedQuotes) {
+
+        output.append(quote);
+        for(int i =0; i < workBuf.writerIndex(); i++){
+          output.append(workBuf.getByte(i));
+        }
+        // the next character is not the escape character, put it there
+        if (ch != quoteEscape) {
+          output.append(ch);
+        }
+        // sets this character as the previous character (may be escaping)
+        // calls recursively to keep parsing potentially quoted content
+        parseQuotedValue(ch);
+      }
+    }
+
+    if (!(ch == delimiter || ch == newLine)) {
+      throw new TextParsingException(context, "Unexpected character '" + ch
+          + "' following quoted value of CSV field. Expecting '" + delimiter + "'. Cannot parse CSV input.");
+    }
+  }
+
+  /**
+   * Captures the entirety of parsing a single field and based on the input delegates to the appropriate function
+   * @return
+   * @throws IOException
+   */
+  private final boolean parseField() throws IOException {
+
+    output.startField(fieldIndex++);
+
+    if (isWhite(ch) && ignoreLeadingWhitespace) {
+      skipWhitespace();
+    }
+
+    if (ch == delimiter) {
+      return output.endEmptyField();
+    } else {
+      if (ch == quote) {
+        parseQuotedValue(NULL_BYTE);
+      } else {
+        parseValue();
+      }
+
+      return output.endField();
+    }
+
+  }
+
+  /**
+   * Helper function to skip white spaces occurring at the current input stream.
+   * @throws IOException
+   */
+  private void skipWhitespace() throws IOException {
+    final byte delimiter = this.delimiter;
+    final byte newLine = this.newLine;
+    final TextInput input = this.input;
+
+    while (isWhite(ch) && ch != delimiter && ch != newLine) {
+      ch = input.nextChar();
+    }
+  }
+
+  /**
+   * Starting point for the reader. Sets up the input interface.
+   * @throws IOException
+   */
+  public final void start() throws IOException {
+    context.stopped = false;
+    input.start();
+  }
+
+
+  /**
+   * Parses the next record from the input. Will skip the line if its a comment,
+   * this is required when the file contains headers
+   * @throws IOException
+   */
+  public final boolean parseNext() throws IOException {
+    try {
+      while (!context.stopped) {
+        ch = input.nextChar();
+        if (ch == comment) {
+          input.skipLines(1);
+          continue;
+        }
+        break;
+      }
+      final long initialLineNumber = input.lineCount();
+      boolean success = parseRecord();
+      if (initialLineNumber + 1 < input.lineCount()) {
+        throw new TextParsingException(context, "Cannot use newline character within quoted string");
+      }
+
+      if(success){
+        if (recordsToRead > 0 && context.currentRecord() >= recordsToRead) {
+          context.stop();
+        }
+        return true;
+      }else{
+        return false;
+      }
+
+    } catch (StreamFinishedPseudoException ex) {
+      stopParsing();
+      return false;
+    } catch (Exception ex) {
+      try {
+        throw handleException(ex);
+      } finally {
+        stopParsing();
+      }
+    }
+  }
+
+  private void stopParsing(){
+
+  }
+
+  private String displayLineSeparators(String str, boolean addNewLine) {
+    if (addNewLine) {
+      if (str.contains("\r\n")) {
+        str = str.replaceAll("\\r\\n", "[\\\\r\\\\n]\r\n\t");
+      } else if (str.contains("\n")) {
+        str = str.replaceAll("\\n", "[\\\\n]\n\t");
+      } else {
+        str = str.replaceAll("\\r", "[\\\\r]\r\t");
+      }
+    } else {
+      str = str.replaceAll("\\n", "\\\\n");
+      str = str.replaceAll("\\r", "\\\\r");
+    }
+    return str;
+  }
+
+  /**
+   * Helper method to handle exceptions caught while processing text files and generate better error messages associated with
+   * the exception.
+   * @param ex  Exception raised
+   * @return
+   * @throws IOException
+   */
+  private TextParsingException handleException(Exception ex) throws IOException {
+
+    if (ex instanceof TextParsingException) {
+      throw (TextParsingException) ex;
+    }
+
+    if (ex instanceof ArrayIndexOutOfBoundsException) {
+      ex = UserException
+          .dataReadError(ex)
+          .message(
+              "Drill failed to read your text file.  Drill supports up to %d columns in a text file.  Your file appears to have more than that.",
+              RepeatedVarCharOutput.MAXIMUM_NUMBER_COLUMNS)
+          .build();
+    }
+
+    String message = null;
+    String tmp = input.getStringSinceMarkForError();
+    char[] chars = tmp.toCharArray();
+    if (chars != null) {
+      int length = chars.length;
+      if (length > settings.getMaxCharsPerColumn()) {
+        message = "Length of parsed input (" + length
+            + ") exceeds the maximum number of characters defined in your parser settings ("
+            + settings.getMaxCharsPerColumn() + "). ";
+      }
+
+      if (tmp.contains("\n") || tmp.contains("\r")) {
+        tmp = displayLineSeparators(tmp, true);
+        String lineSeparator = displayLineSeparators(settings.getLineSeparatorString(), false);
+        message += "\nIdentified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '"
+            + lineSeparator + "'. Parsed content:\n\t" + tmp;
+      }
+
+      int nullCharacterCount = 0;
+      // ensuring the StringBuilder won't grow over Integer.MAX_VALUE to avoid OutOfMemoryError
+      int maxLength = length > Integer.MAX_VALUE / 2 ? Integer.MAX_VALUE / 2 - 1 : length;
+      StringBuilder s = new StringBuilder(maxLength);
+      for (int i = 0; i < maxLength; i++) {
+        if (chars[i] == '\0') {
+          s.append('\\');
+          s.append('0');
+          nullCharacterCount++;
+        } else {
+          s.append(chars[i]);
+        }
+      }
+      tmp = s.toString();
+
+      if (nullCharacterCount > 0) {
+        message += "\nIdentified "
+            + nullCharacterCount
+            + " null characters ('\0') on parsed content. This may indicate the data is corrupt or its encoding is invalid. Parsed content:\n\t"
+            + tmp;
+      }
+
+    }
+
+    throw new TextParsingException(context, message, ex);
+  }
+
+  /**
+   * Finish the processing of a batch, indicates to the output
+   * interface to wrap up the batch
+   */
+  public void finishBatch(){
+    output.finishBatch();
+//    System.out.println(String.format("line %d, cnt %d", input.getLineCount(), output.getRecordCount()));
+  }
+
+  /**
+   * Invoked once there are no more records and we are done with the
+   * current record reader to clean up state.
+   * @throws IOException
+   */
+  public void close() throws IOException{
+    input.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 43e6416..fd97c48 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -82,7 +82,7 @@ public class MockRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
     try {
       this.output = output;
       int estimateRowSize = getEstimatedRecordSize(config.getTypes());

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index d5b7303..5e9c4ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -165,9 +165,6 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
     ScanBatch s =
         new ScanBatch(rowGroupScan, context, oContext, readers.iterator(), partitionColumns, selectedPartitionColumns);
 
-    for(RecordReader r  : readers){
-      r.setOperatorContext(s.getOperatorContext());
-    }
 
     return s;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 2072aae..ebfbd54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -204,10 +204,11 @@ public class ParquetRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    this.operatorContext = context;
     if (!isStarQuery()) {
       columnsFound = new boolean[getColumns().size()];
-      nullFilledVectors = new ArrayList();
+      nullFilledVectors = new ArrayList<>();
     }
     columnStatuses = new ArrayList<>();
 //    totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 07950df..8ad0d4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -199,9 +199,10 @@ public class DrillParquetReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
 
     try {
+      this.operatorContext = context;
       schema = footer.getFileMetaData().getSchema();
       MessageType projection = null;
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index b7ffbf0..cf98b83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -73,7 +73,7 @@ public class PojoRecordReader<T> extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
     try {
       Field[] fields = pojoClass.getDeclaredFields();
       List<PojoWriter> writers = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
index 04838bd..819f895 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
@@ -22,7 +22,7 @@ import org.apache.drill.exec.store.dfs.easy.FileWork;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-public class CompleteFileWork implements FileWork, CompleteWork{
+public class CompleteFileWork implements FileWork, CompleteWork {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompleteFileWork.class);
 
   private long start;
@@ -40,7 +40,22 @@ public class CompleteFileWork implements FileWork, CompleteWork{
 
   @Override
   public int compareTo(CompleteWork o) {
+    if(o instanceof CompleteFileWork){
+      CompleteFileWork c = (CompleteFileWork) o;
+      int cmp = path.compareTo(c.getPath());
+      if(cmp != 0){
+        return cmp;
+      }
+
+      cmp = Long.compare(start,  c.getStart());
+      if(cmp != 0){
+        return cmp;
+      }
+
+    }
+
     return Long.compare(getTotalBytes(), o.getTotalBytes());
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 87c78b2..0322f36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -131,7 +131,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
     MaterializedField field = MaterializedField.create(ref, Types.repeated(TypeProtos.MinorType.VARCHAR));
     try {
       vector = output.addField(field, RepeatedVarCharVector.class);
@@ -192,7 +192,7 @@ public class DrillTextRecordReader extends AbstractRecordReader {
         v.getMutator().setValueCount(recordCount);
       }
       vector.getMutator().setValueCount(recordCount);
-      logger.debug("text scan batch size {}", batchSize);
+//      logger.debug("text scan batch size {}", batchSize);
       return recordCount;
     } catch(Exception e) {
       cleanup();

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 83a1cb8..8fdaa72 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -626,7 +626,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
       ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs,
           new DirectCodecFactory(dfsConfig, allocator), f.getParquetMetadata(), columns);
       TestOutputMutator mutator = new TestOutputMutator(allocator);
-      rr.setup(mutator);
+      rr.setup(null, mutator);
       Stopwatch watch = new Stopwatch();
       watch.start();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
new file mode 100644
index 0000000..76674f9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestNewTextReader.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.text;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestNewTextReader extends BaseTestQuery {
+
+  @Test
+  public void fieldDelimiterWithinQuotes() throws Exception {
+    test("select columns[1] as col1 from cp.`textinput/input1.csv`");
+    testBuilder()
+        .sqlQuery("select columns[1] as col1 from cp.`textinput/input1.csv`")
+        .unOrdered()
+        .baselineColumns("col1")
+        .baselineValues("foo,bar")
+        .go();
+  }
+
+  @Test
+  public void ensureFailureOnNewLineDelimiterWithinQuotes() throws Exception {
+    try {
+      test("select columns[1] as col1 from cp.`textinput/input2.csv`");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Cannot use newline character within quoted string"));
+      return;
+    }
+    Assert.fail("Expected exception not thrown.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
index f07cf3b..882033a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
@@ -45,11 +45,12 @@ public class TestTextColumn extends BaseTestQuery{
       "from dfs_test.`[WORKING_PATH]/src/test/resources/store/text/data/letters.txt`");
 
     List<List<String>> expectedOutput = Arrays.asList(
-      Arrays.asList("\"a, b,\",\"c\",\"d,, \\n e\""),
-      Arrays.asList("\"d, e,\",\"f\",\"g,, \\n h\""),
-      Arrays.asList("\"g, h,\",\"i\",\"j,, \\n k\""));
+      Arrays.asList("a, b,\",\"c\",\"d,, \\n e"),
+      Arrays.asList("d, e,\",\"f\",\"g,, \\n h"),
+      Arrays.asList("g, h,\",\"i\",\"j,, \\n k"));
 
     List<List<String>> actualOutput = getOutput(batches);
+    System.out.println(actualOutput);
     validateOutput(expectedOutput, actualOutput);
   }
 
@@ -59,9 +60,9 @@ public class TestTextColumn extends BaseTestQuery{
       "columns[3] as col4 from dfs_test.`[WORKING_PATH]/src/test/resources/store/text/data/letters.csv`");
 
     List<List<String>> expectedOutput = Arrays.asList(
-      Arrays.asList("\"a, b,\"", "\"c\"", "\"d,, \\n e\"","\"f\\\"g\""),
-      Arrays.asList("\"d, e,\"", "\"f\"", "\"g,, \\n h\"","\"i\\\"j\""),
-      Arrays.asList("\"g, h,\"", "\"i\"", "\"j,, \\n k\"","\"l\\\"m\""));
+      Arrays.asList("a, b,", "c", "d,, \\n e","f\\\"g"),
+      Arrays.asList("d, e,", "f", "g,, \\n h","i\\\"j"),
+      Arrays.asList("g, h,", "i", "j,, \\n k","l\\\"m"));
 
     List<List<String>> actualOutput = getOutput(batches);
     validateOutput(expectedOutput, actualOutput);
@@ -81,7 +82,8 @@ public class TestTextColumn extends BaseTestQuery{
           output.add(new ArrayList<String>());
           for (VectorWrapper<?> vw: loader) {
             ValueVector.Accessor accessor = vw.getValueVector().getAccessor();
-            output.get(last).add(accessor.getObject(i).toString());
+            Object o = accessor.getObject(i);
+            output.get(last).add(o == null ? null: o.toString());
           }
           ++last;
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
index d4d81f6..4a7a53f 100644
--- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
@@ -37,7 +37,8 @@
         },
         "txt" : {
           type : "text",
-          extensions: [ "txt" ]
+          extensions: [ "txt" ],
+          delimiter: "\u0000"
         },
         "avro" : {
           type: "avro"

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/resources/store/text/data/letters.txt
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/store/text/data/letters.txt b/exec/java-exec/src/test/resources/store/text/data/letters.txt
index 14b9cb6..5421114 100644
--- a/exec/java-exec/src/test/resources/store/text/data/letters.txt
+++ b/exec/java-exec/src/test/resources/store/text/data/letters.txt
@@ -1,3 +1,3 @@
-"a, b,","c","d,, \n e"
-"d, e,","f","g,, \n h"
-"g, h,","i","j,, \n k"
\ No newline at end of file
+"a, b,"",""c"",""d,, \n e"
+"d, e,"",""f"",""g,, \n h"
+"g, h,"",""i"",""j,, \n k"

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/resources/textinput/input1.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/textinput/input1.csv b/exec/java-exec/src/test/resources/textinput/input1.csv
new file mode 100644
index 0000000..4b70783
--- /dev/null
+++ b/exec/java-exec/src/test/resources/textinput/input1.csv
@@ -0,0 +1 @@
+1,"foo,bar"

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/test/resources/textinput/input2.csv
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/textinput/input2.csv b/exec/java-exec/src/test/resources/textinput/input2.csv
new file mode 100644
index 0000000..20c8d3c
--- /dev/null
+++ b/exec/java-exec/src/test/resources/textinput/input2.csv
@@ -0,0 +1,2 @@
+1,"a
+b"


[6/7] drill git commit: DRILL-2944: Update drill-env to set fixed heap and use G1 garbage collection to reduce GC overhead.

Posted by ja...@apache.org.
DRILL-2944: Update drill-env to set fixed heap and use G1 garbage collection to reduce GC overhead.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e14d9896
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e14d9896
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e14d9896

Branch: refs/heads/master
Commit: e14d9896ebb1d3e6e35b4490cfa0d45b10f81164
Parents: b24b23e
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun May 3 10:27:57 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 7 00:12:26 2015 -0700

----------------------------------------------------------------------
 distribution/src/resources/drill-env.sh | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e14d9896/distribution/src/resources/drill-env.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-env.sh b/distribution/src/resources/drill-env.sh
index 1783d6d..2aede3f 100644
--- a/distribution/src/resources/drill-env.sh
+++ b/distribution/src/resources/drill-env.sh
@@ -14,10 +14,10 @@
 # limitations under the License.
 
 DRILL_MAX_DIRECT_MEMORY="8G"
-DRILL_MAX_HEAP="4G"
+DRILL_HEAP="4G"
 
-export DRILL_JAVA_OPTS="-Xms1G -Xmx$DRILL_MAX_HEAP -XX:MaxDirectMemorySize=$DRILL_MAX_DIRECT_MEMORY -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=1G -ea"
+export DRILL_JAVA_OPTS="-Xms$DRILL_HEAP -Xmx$DRILL_HEAP -XX:MaxDirectMemorySize=$DRILL_MAX_DIRECT_MEMORY -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=1G -ea"
 
 # Class unloading is disabled by default in Java 7
 # http://hg.openjdk.java.net/jdk7u/jdk7u60/hotspot/file/tip/src/share/vm/runtime/globals.hpp#l1622
-export SERVER_GC_OPTS="-XX:+CMSClassUnloadingEnabled -XX:+UseConcMarkSweepGC"
+export SERVER_GC_OPTS="-XX:+CMSClassUnloadingEnabled -XX:+UseG1GC "


[4/7] drill git commit: DRILL-2927: Correctly timeout query if a queue doesn't deplete within expected time.

Posted by ja...@apache.org.
DRILL-2927: Correctly timeout query if a queue doesn't deplete within expected time.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/21992b6b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/21992b6b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/21992b6b

Branch: refs/heads/master
Commit: 21992b6b6946f42d89924725dd65e301eebc7397
Parents: 7ec9987
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed May 6 14:09:01 2015 +0100
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 7 00:12:20 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    | 21 +++++++-------------
 .../apache/drill/exec/work/foreman/Foreman.java | 21 +++++++++++++++-----
 2 files changed, 23 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/21992b6b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index a577815..fb764c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -194,20 +194,13 @@ public interface ExecConstants {
   public static final String AVERAGE_FIELD_WIDTH_KEY = "planner.memory.average_field_width";
   public static final OptionValidator AVERAGE_FIELD_WIDTH = new PositiveLongValidator(AVERAGE_FIELD_WIDTH_KEY, Long.MAX_VALUE, 8);
 
-  public static final String ENABLE_QUEUE_KEY = "exec.queue.enable";
-  public static final OptionValidator ENABLE_QUEUE = new BooleanValidator(ENABLE_QUEUE_KEY, false);
-
-  public static final String LARGE_QUEUE_KEY = "exec.queue.large";
-  public static final OptionValidator LARGE_QUEUE_SIZE = new PositiveLongValidator(LARGE_QUEUE_KEY, 1000, 10);
-
-  public static final String SMALL_QUEUE_KEY = "exec.queue.small";
-  public static final OptionValidator SMALL_QUEUE_SIZE = new PositiveLongValidator(SMALL_QUEUE_KEY, 100000, 100);
-
-  public static final String QUEUE_THRESHOLD_KEY = "exec.queue.threshold";
-  public static final OptionValidator QUEUE_THRESHOLD_SIZE = new PositiveLongValidator(QUEUE_THRESHOLD_KEY, Long.MAX_VALUE, 30000000);
-
-  public static final String QUEUE_TIMEOUT_KEY = "exec.queue.timeout_millis";
-  public static final OptionValidator QUEUE_TIMEOUT = new PositiveLongValidator(QUEUE_TIMEOUT_KEY, Long.MAX_VALUE, 60*1000*5);
+  public static final BooleanValidator ENABLE_QUEUE = new BooleanValidator("exec.queue.enable", false);
+  public static final LongValidator LARGE_QUEUE_SIZE = new PositiveLongValidator("exec.queue.large", 1000, 10);
+  public static final LongValidator SMALL_QUEUE_SIZE = new PositiveLongValidator("exec.queue.small", 100000, 100);
+  public static final LongValidator QUEUE_THRESHOLD_SIZE = new PositiveLongValidator("exec.queue.threshold",
+      Long.MAX_VALUE, 30000000);
+  public static final LongValidator QUEUE_TIMEOUT = new PositiveLongValidator("exec.queue.timeout_millis",
+      Long.MAX_VALUE, 60 * 1000 * 5);
 
   public static final String ENABLE_VERBOSE_ERRORS_KEY = "exec.errors.verbose";
   public static final OptionValidator ENABLE_VERBOSE_ERRORS = new BooleanValidator(ENABLE_VERBOSE_ERRORS_KEY, false);

http://git-wip-us.apache.org/repos/asf/drill/blob/21992b6b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index d678cc5..49d0c94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -406,14 +406,16 @@ public class Foreman implements Runnable {
    */
   private void acquireQuerySemaphore(final PhysicalPlan plan) throws ForemanSetupException {
     final OptionManager optionManager = queryContext.getOptions();
-    final boolean queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val;
+    final boolean queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE);
     if (queuingEnabled) {
-      final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val;
+      final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
       double totalCost = 0;
       for (final PhysicalOperator ops : plan.getSortedOperators()) {
         totalCost += ops.getCost();
       }
 
+      final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
+
       try {
         @SuppressWarnings("resource")
         final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator();
@@ -421,18 +423,27 @@ public class Foreman implements Runnable {
 
         // get the appropriate semaphore
         if (totalCost > queueThreshold) {
-          final int largeQueue = optionManager.getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue();
+          final int largeQueue = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
           distributedSemaphore = clusterCoordinator.getSemaphore("query.large", largeQueue);
         } else {
-          final int smallQueue = optionManager.getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue();
+          final int smallQueue = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
           distributedSemaphore = clusterCoordinator.getSemaphore("query.small", smallQueue);
         }
 
-        final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val;
+
         lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS);
       } catch (final Exception e) {
         throw new ForemanSetupException("Unable to acquire slot for query.", e);
       }
+
+      if (lease == null) {
+        throw UserException
+            .resourceError()
+            .message("Unable to acquire queue resources for query within timeout.  Timeout was set at %d seconds.",
+                queueTimeout / 1000)
+            .build();
+      }
+
     }
   }
 


[5/7] drill git commit: DRILL-2969: Have scanners report metrics in profile.

Posted by ja...@apache.org.
DRILL-2969: Have scanners report metrics in profile.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/b24b23e0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/b24b23e0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/b24b23e0

Branch: refs/heads/master
Commit: b24b23e0ea42b08259edde6722a06387a22134ea
Parents: 21992b6
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed May 6 14:31:46 2015 +0100
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 7 00:12:25 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/physical/impl/OutputMutator.java    | 3 +--
 .../java/org/apache/drill/exec/physical/impl/ScanBatch.java   | 7 ++++++-
 .../impl/validate/IteratorValidatorBatchIterator.java         | 4 ++++
 3 files changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/b24b23e0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
index d3449ee..0fe79d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.physical.impl;
 
 import io.netty.buffer.DrillBuf;
 
-import java.util.List;
-
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.vector.ValueVector;
@@ -55,6 +53,7 @@ public interface OutputMutator {
    * Whether or not the fields added to the OutputMutator generated a new schema event.
    * @return
    */
+  // TODO(DRILL-2970)
   public boolean isNewSchema();
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/b24b23e0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 97e8d28..f56dae3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -207,7 +207,12 @@ public class ScanBatch implements CloseableRecordBatch {
       }
 
       populatePartitionVectors();
-      if (mutator.isNewSchema()) {
+
+      // this is a slight misuse of this metric but it will allow Readers to report how many records they generated.
+      final boolean isNewSchema = mutator.isNewSchema();
+      oContext.getStats().batchReceived(0, getRecordCount(), isNewSchema);
+
+      if (isNewSchema) {
         container.buildSchema(SelectionVectorMode.NONE);
         schema = container.getSchema();
         return IterOutcome.OK_NEW_SCHEMA;

http://git-wip-us.apache.org/repos/asf/drill/blob/b24b23e0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 2ae53aa..efd155e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -122,6 +122,10 @@ public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
 
     if (state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {
       BatchSchema schema = incoming.getSchema();
+      if (schema == null) {
+        return state;
+      }
+
       if (schema.getFieldCount() == 0) {
         throw new IllegalStateException ("Incoming batch has an empty schema. This is not allowed.");
       }


[3/7] drill git commit: DRILL-2006: Updated Text reader. Increases variations of text files Drill can work with.

Posted by ja...@apache.org.
DRILL-2006: Updated Text reader.  Increases variations of text files Drill can work with.

Text reader is heavily inspired by uniVocity parser although it is now byte based and customized for Drill's memory representations.

Also updated the RecordReader interface so that OperatorContext is presented at setup time rather than being a separate call.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7ec99871
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7ec99871
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7ec99871

Branch: refs/heads/master
Commit: 7ec99871b97c70793e2e5eb2e795040c5b6ade66
Parents: 151a7f4
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed May 6 02:56:45 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 7 00:06:33 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/common/config/DrillConfig.java |   3 +
 .../exec/store/hbase/HBaseRecordReader.java     |  11 +-
 .../drill/exec/store/hive/HiveRecordReader.java |  11 +-
 .../exec/store/mongo/MongoRecordReader.java     |  11 +-
 exec/java-exec/pom.xml                          |   7 +
 .../codegen/templates/FixedValueVectors.java    |   2 +-
 .../codegen/templates/RepeatedValueVectors.java |  10 +-
 .../templates/VariableLengthVectors.java        |  24 +-
 .../src/main/java/io/netty/buffer/DrillBuf.java |  16 +-
 .../org/apache/drill/exec/ExecConstants.java    |   3 +
 .../exec/client/PrintingResultsListener.java    |  16 +-
 .../drill/exec/physical/impl/ScanBatch.java     |   6 +-
 .../UnorderedReceiverBatch.java                 |   2 +-
 .../server/options/SystemOptionManager.java     |   1 +
 .../exec/store/LocalSyncableFileSystem.java     |  34 +-
 .../apache/drill/exec/store/RecordReader.java   |  10 +-
 .../drill/exec/store/avro/AvroRecordReader.java |  32 +-
 .../drill/exec/store/dfs/DrillFileSystem.java   |  20 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |   9 +-
 .../exec/store/dfs/easy/EasyGroupScan.java      |   6 +-
 .../exec/store/easy/json/JSONRecordReader.java  |  36 +-
 .../exec/store/easy/text/TextFormatPlugin.java  | 123 ++++-
 .../compliant/CompliantTextRecordReader.java    | 152 ++++++
 .../text/compliant/RepeatedVarCharOutput.java   | 324 ++++++++++++
 .../StreamFinishedPseudoException.java          |  29 ++
 .../store/easy/text/compliant/TextInput.java    | 392 +++++++++++++++
 .../store/easy/text/compliant/TextOutput.java   |  87 ++++
 .../easy/text/compliant/TextParsingContext.java | 124 +++++
 .../text/compliant/TextParsingSettings.java     | 291 +++++++++++
 .../store/easy/text/compliant/TextReader.java   | 498 +++++++++++++++++++
 .../drill/exec/store/mock/MockRecordReader.java |   2 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   3 -
 .../columnreaders/ParquetRecordReader.java      |   5 +-
 .../exec/store/parquet2/DrillParquetReader.java |   3 +-
 .../drill/exec/store/pojo/PojoRecordReader.java |   2 +-
 .../exec/store/schedule/CompleteFileWork.java   |  17 +-
 .../exec/store/text/DrillTextRecordReader.java  |   4 +-
 .../store/parquet/ParquetRecordReaderTest.java  |   2 +-
 .../exec/store/text/TestNewTextReader.java      |  50 ++
 .../drill/exec/store/text/TestTextColumn.java   |  16 +-
 .../resources/bootstrap-storage-plugins.json    |   3 +-
 .../test/resources/store/text/data/letters.txt  |   6 +-
 .../src/test/resources/textinput/input1.csv     |   1 +
 .../src/test/resources/textinput/input2.csv     |   2 +
 44 files changed, 2248 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index 2b9b740..522303f 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -49,6 +49,9 @@ public final class DrillConfig extends NestedConfig{
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
   private final ObjectMapper mapper;
   private final ImmutableList<String> startupArguments;
+
+  public static final boolean ON_OSX = System.getProperty("os.name").contains("OS X");
+
   @SuppressWarnings("restriction")  private static final long MAX_DIRECT_MEMORY = sun.misc.VM.maxDirectMemory();
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 42038e8..9458db2 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -126,16 +126,9 @@ public class HBaseRecordReader extends AbstractRecordReader implements DrillHBas
     return transformed;
   }
 
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    this.operatorContext = context;
     this.outputMutator = output;
     familyVectorMap = new HashMap<String, MapVector>();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 8c400ea..a4ad0c4 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -249,16 +249,9 @@ public class HiveRecordReader extends AbstractRecordReader {
     }
   }
 
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    this.operatorContext = context;
     try {
       for (int i = 0; i < selectedColumnNames.size(); i++) {
         MajorType type = getMajorTypeFromHiveTypeInfo(selectedColumnTypes.get(i), true);

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
index 3c4472c..53c576e 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/MongoRecordReader.java
@@ -147,7 +147,8 @@ public class MongoRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(OutputMutator output) throws ExecutionSetupException {
+  public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException {
+    this.operatorContext = context;
     this.writer = new VectorContainerWriter(output);
     this.jsonReader = new JsonReader(fragmentContext.getManagedBuffer(), Lists.newArrayList(getColumns()), enableAllTextMode, false);
     logger.info("Filters Applied : " + filters);
@@ -190,13 +191,5 @@ public class MongoRecordReader extends AbstractRecordReader {
   public void cleanup() {
   }
 
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 8839c54..57cd572 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -54,6 +54,13 @@
       <artifactId>commons-pool2</artifactId>
       <version>2.1</version>
     </dependency>
+    
+    <dependency>
+      <groupId>com.univocity</groupId>
+      <artifactId>univocity-parsers</artifactId>
+      <version>1.3.0</version>
+    </dependency>
+    
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-math</artifactId>

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index 6a924b7..1059bfb 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -218,7 +218,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
     copyFrom(fromIndex, thisIndex, from);
   }
 
-  private void decrementAllocationMonitor() {
+  public void decrementAllocationMonitor() {
     if (allocationMonitor > 0) {
       allocationMonitor = 0;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index c06e29c..c0fba66 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -84,8 +84,16 @@ public final class Repeated${minor.class}Vector extends BaseValueVector implemen
     return offsets.getBufferSize() + values.getBufferSize();
   }
 
+  public UInt4Vector getOffsetVector(){
+    return offsets;
+  }
+  
+  public ${minor.class}Vector getValuesVector(){
+    return values;
+  }
+  
   public DrillBuf getBuffer(){
-      return values.getBuffer();
+    return values.getBuffer();
   }
   
   public TransferPair getTransferPair(){

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 8a4b663..7aa7415 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -159,6 +159,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     return buffers;
   }
   
+  public long getOffsetAddr(){
+    return offsetVector.getBuffer().memoryAddress();
+  }
+  
+  public UInt${type.width}Vector getOffsetVector(){
+    return offsetVector;
+  }
+  
   public TransferPair getTransferPair(){
     return new TransferImpl(getField());
   }
@@ -304,16 +312,16 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
       data = newBuf;
     }
 
-    private void decrementAllocationMonitor() {
-      if (allocationMonitor > 0) {
-        allocationMonitor = 0;
-      }
-      --allocationMonitor;
+  public void decrementAllocationMonitor() {
+    if (allocationMonitor > 0) {
+      allocationMonitor = 0;
     }
+    --allocationMonitor;
+  }
 
-    private void incrementAllocationMonitor() {
-      ++allocationMonitor;
-    }
+  private void incrementAllocationMonitor() {
+    ++allocationMonitor;
+  }
 
   public Accessor getAccessor(){
     return accessor;

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
index 2016e1e..7f80f7a 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
@@ -38,7 +38,7 @@ import org.apache.drill.exec.util.AssertionUtil;
 
 import com.google.common.base.Preconditions;
 
-public final class DrillBuf extends AbstractByteBuf {
+public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBuf.class);
 
   private static final boolean BOUNDS_CHECKING_ENABLED = AssertionUtil.BOUNDS_CHECKING_ENABLED;
@@ -614,6 +614,15 @@ public final class DrillBuf extends AbstractByteBuf {
     return this;
   }
 
+  public void setByte(int index, byte b){
+    PlatformDependent.putByte(addr(index), b);
+  }
+
+  public void writeByteUnsafe(byte b){
+    PlatformDependent.putByte(addr(readerIndex), b);
+    readerIndex++;
+  }
+
   @Override
   protected byte _getByte(int index) {
     return getByte(index);
@@ -745,4 +754,9 @@ public final class DrillBuf extends AbstractByteBuf {
     return rootBuffer;
   }
 
+  @Override
+  public void close() throws Exception {
+    release();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 9f87b0b..a577815 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -212,6 +212,9 @@ public interface ExecConstants {
   public static final String ENABLE_VERBOSE_ERRORS_KEY = "exec.errors.verbose";
   public static final OptionValidator ENABLE_VERBOSE_ERRORS = new BooleanValidator(ENABLE_VERBOSE_ERRORS_KEY, false);
 
+  public static final String ENABLE_NEW_TEXT_READER_KEY = "exec.storage.enable_new_text_reader";
+  public static final OptionValidator ENABLE_NEW_TEXT_READER = new BooleanValidator(ENABLE_NEW_TEXT_READER_KEY, true);
+
   public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json";
   public static final String MAX_LOADING_CACHE_SIZE_CONFIG = "drill.exec.compile.cache_max_size";
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
index 875160b..f5a119d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
@@ -17,18 +17,20 @@
  */
 package org.apache.drill.exec.client;
 
+import io.netty.buffer.DrillBuf;
+
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.client.QuerySubmitter.Format;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryData;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
@@ -36,6 +38,8 @@ import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.util.VectorUtil;
 
+import com.google.common.base.Stopwatch;
+
 public class PrintingResultsListener implements UserResultsListener {
   AtomicInteger count = new AtomicInteger();
   private CountDownLatch latch = new CountDownLatch(1);
@@ -45,6 +49,7 @@ public class PrintingResultsListener implements UserResultsListener {
   BufferAllocator allocator;
   volatile UserException exception;
   QueryId queryId;
+  Stopwatch w = new Stopwatch();
 
   public PrintingResultsListener(DrillConfig config, Format format, int columnWidth) {
     this.allocator = new TopLevelAllocator(config);
@@ -56,7 +61,8 @@ public class PrintingResultsListener implements UserResultsListener {
   @Override
   public void submissionFailed(UserException ex) {
     exception = ex;
-    System.out.println("Exception (no rows returned): " + ex );
+    System.out.println("Exception (no rows returned): " + ex + ".  Returned in " + w.elapsed(TimeUnit.MILLISECONDS)
+        + "ms.");
     latch.countDown();
   }
 
@@ -64,7 +70,8 @@ public class PrintingResultsListener implements UserResultsListener {
   public void queryCompleted(QueryState state) {
     allocator.close();
     latch.countDown();
-    System.out.println("Total rows returned: " + count.get());
+    System.out.println("Total rows returned : " + count.get() + ".  Returned in " + w.elapsed(TimeUnit.MILLISECONDS)
+        + "ms.");
   }
 
   @Override
@@ -113,6 +120,7 @@ public class PrintingResultsListener implements UserResultsListener {
 
   @Override
   public void queryIdArrived(QueryId queryId) {
+    w.start();
     this.queryId = queryId;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 4700dbd..97e8d28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -92,12 +92,11 @@ public class ScanBatch implements CloseableRecordBatch {
     }
     this.currentReader = readers.next();
     this.oContext = oContext;
-    this.currentReader.setOperatorContext(this.oContext);
 
     boolean setup = false;
     try {
       oContext.getStats().startProcessing();
-      this.currentReader.setup(mutator);
+      this.currentReader.setup(oContext, mutator);
       setup = true;
     } finally {
       // if we had an exception during setup, make sure to release existing data.
@@ -188,8 +187,7 @@ public class ScanBatch implements CloseableRecordBatch {
           currentReader.cleanup();
           currentReader = readers.next();
           partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
-          currentReader.setup(mutator);
-          currentReader.setOperatorContext(oContext);
+          currentReader.setup(oContext, mutator);
           try {
             currentReader.allocate(fieldVectorMap);
           } catch (OutOfMemoryException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index c9d9c11..66a2092 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -80,7 +80,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
     oContext = context.newOperatorContext(config, false);
     this.batchLoader = new RecordBatchLoader(oContext.getAllocator());
 
-    this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null);
+    this.stats = oContext.getStats();
     this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
     this.config = config;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 33b2a4c..5dff828 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -108,6 +108,7 @@ public class SystemOptionManager extends BaseOptionManager {
       ExecConstants.ENABLE_WINDOW_FUNCTIONS_VALIDATOR,
       ExecConstants.DRILLBIT_CONTROLS_VALIDATOR,
       ClassTransformer.SCALAR_REPLACEMENT_VALIDATOR,
+      ExecConstants.ENABLE_NEW_TEXT_READER
   };
 
   private final PStoreConfig<OptionValue> config;

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
index 37b9c9d..b88cc28 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
@@ -28,8 +28,10 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -161,7 +163,7 @@ public class LocalSyncableFileSystem extends FileSystem {
     }
   }
 
-  public class LocalInputStream extends InputStream implements Seekable, PositionedReadable {
+  public class LocalInputStream extends InputStream implements Seekable, PositionedReadable, ByteBufferReadable {
 
     private BufferedInputStream input;
 
@@ -200,6 +202,36 @@ public class LocalSyncableFileSystem extends FileSystem {
       throw new IOException("seekToNewSource not supported");
     }
 
+
+
+    @Override
+    public int read(ByteBuffer buf) throws IOException {
+      buf.reset();
+
+      if(buf.hasArray()){
+        int read = read(buf.array(), buf.arrayOffset(), buf.capacity());
+        buf.limit(read);
+        return read;
+      }else{
+        byte[] b = new byte[buf.capacity()];
+        int read = read(b);
+        buf.put(b);
+        return read;
+      }
+
+    }
+
+
+    @Override
+    public int read(byte[] b) throws IOException {
+      return input.read(b);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      return input.read(b, off, len);
+    }
+
     @Override
     public int read() throws IOException {
       byte[] b = new byte[1];

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index 09495f5..61ccac5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -39,19 +39,11 @@ public interface RecordReader {
    *          mutating the set of schema values for that particular record.
    * @throws ExecutionSetupException
    */
-  public abstract void setup(OutputMutator output) throws ExecutionSetupException;
+  public abstract void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException;
 
   public abstract void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException;
 
   /**
-   * Set the operator context. The Reader can use this to access the operator context and allocate direct memory
-   * if needed
-   * @param operatorContext
-   */
-  public abstract void setOperatorContext(OperatorContext operatorContext);
-
-
-  /**
    * Increment record reader forward, writing into the provided output batch.
    *
    * @return The number of additional records added to the output.

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 59999ba..a52fd22 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -17,16 +17,20 @@
  */
 package org.apache.drill.exec.store.avro;
 
-import com.google.common.base.Stopwatch;
-
 import io.netty.buffer.DrillBuf;
 
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericContainer;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.mapred.FsInput;
 import org.apache.avro.util.Utf8;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -46,15 +50,10 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
+import com.google.common.base.Stopwatch;
 
 /**
  * A RecordReader implementation for Avro data files.
@@ -96,8 +95,8 @@ public class AvroRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(final OutputMutator output) throws ExecutionSetupException {
-
+  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+    operatorContext = context;
     writer = new VectorContainerWriter(output);
 
     try {
@@ -108,15 +107,6 @@ public class AvroRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setOperatorContext(OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  @Override
   public int next() {
     final Stopwatch watch = new Stopwatch().start();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index f8afe3f..b6a9c30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -19,14 +19,12 @@ package org.apache.drill.exec.store.dfs;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.util.AssertionUtil;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -53,11 +51,17 @@ import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Progressable;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 /**
  * DrillFileSystem is the wrapper around the actual FileSystem implementation.
  *
@@ -94,6 +98,7 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
 
   private final FileSystem underlyingFs;
   private final OperatorStats operatorStats;
+  private final CompressionCodecFactory codecFactory;
 
   public DrillFileSystem(Configuration fsConf) throws IOException {
     this(fsConf, null);
@@ -101,6 +106,7 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
 
   public DrillFileSystem(Configuration fsConf, OperatorStats operatorStats) throws IOException {
     this.underlyingFs = FileSystem.get(fsConf);
+    this.codecFactory = new CompressionCodecFactory(fsConf);
     this.operatorStats = operatorStats;
   }
 
@@ -717,6 +723,14 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
     }
   }
 
+  public InputStream openPossiblyCompressedStream(Path path) throws IOException {
+    CompressionCodec codec = codecFactory.getCodec(path); // infers from file ext.
+    if (codec != null) {
+      return codec.createInputStream(open(path));
+    } else {
+      return open(path);
+    }
+  }
   @Override
   public void fileOpened(Path path, DrillFSDataInputStream fsDataInputStream) {
     openedFiles.put(fsDataInputStream, new DebugStackTrace(path, Thread.currentThread().getStackTrace()));

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index b4efe70..762760a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -49,7 +49,6 @@ import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
@@ -66,7 +65,6 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   private final StoragePluginConfig storageConfig;
   protected final FormatPluginConfig formatConfig;
   private final String name;
-  protected final CompressionCodecFactory codecFactory;
   private final boolean compressible;
 
   protected EasyFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
@@ -82,7 +80,6 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     this.storageConfig = storageConfig;
     this.formatConfig = formatConfig;
     this.name = name == null ? defaultName : name;
-    this.codecFactory = new CompressionCodecFactory(new Configuration(fsConf));
   }
 
   @Override
@@ -148,8 +145,10 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
         newColumns.add(AbstractRecordReader.STAR_COLUMN);
       }
       // Create a new sub scan object with the new set of columns;
-      scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), newColumns,
-          scan.getSelectionRoot());
+      EasySubScan newScan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(),
+          newColumns, scan.getSelectionRoot());
+      newScan.setOperatorId(scan.getOperatorId());
+      scan = newScan;
     }
 
     int numParts = 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 39dc073..5f9e02b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.BlockMapBuilder;
 import org.apache.drill.exec.store.schedule.CompleteFileWork;
 import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
+import org.apache.drill.exec.util.ImpersonationUtil;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -49,7 +50,6 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
-import org.apache.drill.exec.util.ImpersonationUtil;
 
 @JsonTypeName("fs-scan")
 public class EasyGroupScan extends AbstractFileGroupScan{
@@ -208,7 +208,9 @@ public class EasyGroupScan extends AbstractFileGroupScan{
     Preconditions.checkArgument(!filesForMinor.isEmpty(),
         String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId));
 
-    return new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot);
+    EasySubScan subScan = new EasySubScan(getUserName(), convert(filesForMinor), formatPlugin, columns, selectionRoot);
+    subScan.setOperatorId(this.getOperatorId());
+    return subScan;
   }
 
   private List<FileWorkImpl> convert(List<CompleteFileWork> list) {

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 2666b2e..554c633 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -21,10 +21,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
 
-import com.google.common.collect.ImmutableList;
-
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.memory.OutOfMemoryException;
@@ -38,10 +36,7 @@ import org.apache.drill.exec.store.easy.json.reader.CountingJsonReader;
 import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -114,9 +109,13 @@ public class JSONRecordReader extends AbstractRecordReader {
   }
 
   @Override
-  public void setup(final OutputMutator output) throws ExecutionSetupException {
+  public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
+    this.operatorContext = context;
     try{
-      setupData();
+      if (hadoopPath != null) {
+        this.stream = fileSystem.openPossiblyCompressedStream(hadoopPath);
+      }
+
       this.writer = new VectorContainerWriter(output);
       if (isSkipQuery()) {
         this.jsonReader = new CountingJsonReader(fragmentContext.getManagedBuffer());
@@ -129,18 +128,6 @@ public class JSONRecordReader extends AbstractRecordReader {
     }
   }
 
-  private void setupData() throws IOException{
-    if(hadoopPath != null){
-      final CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
-      final CompressionCodec codec = factory.getCodec(hadoopPath); // infers from file ext.
-      if (codec != null) {
-        this.stream = codec.createInputStream(fileSystem.open(hadoopPath));
-      } else {
-        this.stream = fileSystem.open(hadoopPath);
-      }
-    }
-  }
-
   private void setupParser() throws IOException{
     if(hadoopPath != null){
       jsonReader.setSource(stream);
@@ -171,15 +158,6 @@ public class JSONRecordReader extends AbstractRecordReader {
     throw exceptionBuilder.build();
   }
 
-
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
-  }
-
-  public void setOperatorContext(final OperatorContext operatorContext) {
-    this.operatorContext = operatorContext;
-  }
-
   @Override
   public int next() {
     writer.allocate();

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 722650d..5756a6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -33,13 +34,15 @@ import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.RecordWriter;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.store.dfs.FileSystemConfig;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
 import org.apache.drill.exec.store.dfs.easy.EasyWriter;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.easy.text.compliant.CompliantTextRecordReader;
+import org.apache.drill.exec.store.easy.text.compliant.TextParsingSettings;
 import org.apache.drill.exec.store.text.DrillTextRecordReader;
 import org.apache.drill.exec.store.text.DrillTextRecordWriter;
 import org.apache.hadoop.conf.Configuration;
@@ -47,8 +50,11 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileSplit;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 
 public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
@@ -71,9 +77,15 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
       List<SchemaPath> columns) throws ExecutionSetupException {
     Path path = dfs.makeQualified(new Path(fileWork.getPath()));
     FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
-    Preconditions.checkArgument(((TextFormatConfig)formatConfig).getDelimiter().length() == 1, "Only single character delimiter supported");
-    return new DrillTextRecordReader(split, getFsConf(), context,
-        ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
+
+    if (context.getOptions().getOption(ExecConstants.ENABLE_NEW_TEXT_READER_KEY).bool_val == true) {
+      TextParsingSettings settings = new TextParsingSettings();
+      settings.set((TextFormatConfig)formatConfig);
+      return new CompliantTextRecordReader(split, dfs, context, settings, columns);
+    } else {
+      char delim = ((TextFormatConfig)formatConfig).getFieldDelimiter();
+      return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns);
+    }
   }
 
   @Override
@@ -92,7 +104,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
     options.put("prefix", fragmentId);
 
-    options.put("separator", ((TextFormatConfig)getConfig()).getDelimiter());
+    options.put("separator", ((TextFormatConfig)getConfig()).getFieldDelimiterAsString());
     options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig)writer.getStorageConfig()).connection);
 
     options.put("extension", ((TextFormatConfig)getConfig()).getExtensions().get(0));
@@ -103,42 +115,117 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
     return recordWriter;
   }
 
-  @JsonTypeName("text")
+  @JsonTypeName("text") @JsonInclude(Include.NON_DEFAULT)
   public static class TextFormatConfig implements FormatPluginConfig {
 
     public List<String> extensions;
-    public String delimiter = "\n";
+    public String lineDelimiter = "\n";
+    public char fieldDelimiter = '\n';
+    public char quote = '"';
+    public char escape = '"';
+    public char comment = '#';
+    public boolean skipFirstLine = false;
+
 
     public List<String> getExtensions() {
       return extensions;
     }
 
-    public String getDelimiter() {
-      return delimiter;
+    public char getQuote() {
+      return quote;
+    }
+
+    public char getEscape() {
+      return escape;
+    }
+
+    public char getComment() {
+      return comment;
+    }
+
+    public String getLineDelimiter() {
+      return lineDelimiter;
+    }
+
+    public char getFieldDelimiter() {
+      return fieldDelimiter;
+    }
+
+    @JsonIgnore
+    public String getFieldDelimiterAsString(){
+      return new String(new char[]{fieldDelimiter});
+    }
+
+    @Deprecated
+    @JsonProperty("delimiter")
+    public void setFieldDelimiter(char delimiter){
+      this.fieldDelimiter = delimiter;
+    }
+
+    public boolean isSkipFirstLine() {
+      return skipFirstLine;
     }
 
     @Override
     public int hashCode() {
-      return 33;
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + comment;
+      result = prime * result + escape;
+      result = prime * result + ((extensions == null) ? 0 : extensions.hashCode());
+      result = prime * result + fieldDelimiter;
+      result = prime * result + ((lineDelimiter == null) ? 0 : lineDelimiter.hashCode());
+      result = prime * result + quote;
+      result = prime * result + (skipFirstLine ? 1231 : 1237);
+      return result;
     }
 
     @Override
     public boolean equals(Object obj) {
       if (this == obj) {
         return true;
-      } else if (obj == null) {
+      }
+      if (obj == null) {
         return false;
-      } else if (!(obj instanceof TextFormatConfig)) {
+      }
+      if (getClass() != obj.getClass()) {
         return false;
       }
-
-      TextFormatConfig that = (TextFormatConfig) obj;
-      if (this.delimiter.equals(that.delimiter)) {
-        return true;
+      TextFormatConfig other = (TextFormatConfig) obj;
+      if (comment != other.comment) {
+        return false;
       }
-      return false;
+      if (escape != other.escape) {
+        return false;
+      }
+      if (extensions == null) {
+        if (other.extensions != null) {
+          return false;
+        }
+      } else if (!extensions.equals(other.extensions)) {
+        return false;
+      }
+      if (fieldDelimiter != other.fieldDelimiter) {
+        return false;
+      }
+      if (lineDelimiter == null) {
+        if (other.lineDelimiter != null) {
+          return false;
+        }
+      } else if (!lineDelimiter.equals(other.lineDelimiter)) {
+        return false;
+      }
+      if (quote != other.quote) {
+        return false;
+      }
+      if (skipFirstLine != other.skipFirstLine) {
+        return false;
+      }
+      return true;
     }
 
+
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
new file mode 100644
index 0000000..b2af32d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.mapred.FileSplit;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+
+// New text reader, complies with the RFC 4180 standard for text/csv files
+public class CompliantTextRecordReader extends AbstractRecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompliantTextRecordReader.class);
+
+  private static final int MAX_RECORDS_PER_BATCH = 8096;
+  static final int READ_BUFFER = 1024*1024;
+  private static final int WHITE_SPACE_BUFFER = 64*1024;
+
+  // settings to be used while parsing
+  private TextParsingSettings settings;
+  // Chunk of the file to be read by this reader
+  private FileSplit split;
+  // text reader implementation
+  private TextReader reader;
+  // input buffer
+  private DrillBuf readBuffer;
+  // working buffer to handle whitespaces
+  private DrillBuf whitespaceBuffer;
+  private DrillFileSystem dfs;
+
+  public CompliantTextRecordReader(FileSplit split, DrillFileSystem dfs, FragmentContext context, TextParsingSettings settings, List<SchemaPath> columns) {
+    this.split = split;
+    this.settings = settings;
+    this.dfs = dfs;
+    setColumns(columns);
+  }
+
+  // checks to see if we are querying all columns(star) or individual columns
+  @Override
+  public boolean isStarQuery() {
+    if(settings.isUseRepeatedVarChar()){
+      return super.isStarQuery() || Iterables.tryFind(getColumns(), new Predicate<SchemaPath>() {
+        @Override
+        public boolean apply(@Nullable SchemaPath path) {
+          return path.equals(RepeatedVarCharOutput.COLUMNS);
+        }
+      }).isPresent();
+    }else{
+      return isStarQuery();
+    }
+  }
+
+  /**
+   * Performs the initial setup required for the record reader.
+   * Initializes the input stream, handling of the output record batch
+   * and the actual reader to be used.
+   * @param context  operator context from which buffer's will be allocated and managed
+   * @param outputMutator  Used to create the schema in the output record batch
+   * @throws ExecutionSetupException
+   */
+  @Override
+  public void setup(OperatorContext context, OutputMutator outputMutator) throws ExecutionSetupException {
+
+
+    readBuffer = context.getManagedBuffer(READ_BUFFER);
+    whitespaceBuffer = context.getManagedBuffer(WHITE_SPACE_BUFFER);
+
+    try {
+      InputStream stream = dfs.openPossiblyCompressedStream(split.getPath());
+      TextInput input = new TextInput(settings,  stream, readBuffer, split.getStart(), split.getStart() + split.getLength());
+
+      TextOutput output = null;
+      if(settings.isUseRepeatedVarChar()){
+        output = new RepeatedVarCharOutput(outputMutator, getColumns(), isStarQuery());
+      }else{
+        //TODO: Add field output.
+        throw new UnsupportedOperationException();
+      }
+
+      this.reader = new TextReader(settings, input, output, whitespaceBuffer);
+      reader.start();
+    } catch (SchemaChangeException | IOException e) {
+      throw new ExecutionSetupException(String.format("Failure while setting up text reader for file %s", split.getPath()), e);
+    }
+  }
+
+
+  /**
+   * Generates the next record batch
+   * @return  number of records in the batch
+   *
+   */
+  @Override
+  public int next() {
+    reader.resetForNextBatch();
+    int cnt = 0;
+
+    try{
+      while(cnt < MAX_RECORDS_PER_BATCH && reader.parseNext()){
+        cnt++;
+      }
+      reader.finishBatch();
+      return cnt;
+    }catch(IOException e){
+      throw new DrillRuntimeException(String.format("Failure while setting up text reader for file %s.  Happened at or shortly before byte position %d.", split.getPath(), reader.getPos()), e);
+    }
+  }
+
+  /**
+   * Cleanup state once we are finished processing all the records.
+   * This would internally close the input stream we are reading from.
+   */
+  @Override
+  public void cleanup() {
+    try {
+      reader.close();
+    } catch (IOException e) {
+      logger.warn("Exception while closing stream.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
new file mode 100644
index 0000000..3ad5c2a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/RepeatedVarCharOutput.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import io.netty.buffer.DrillBuf;
+import io.netty.util.internal.PlatformDependent;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.vector.RepeatedVarCharVector;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Class is responsible for generating record batches for text file inputs. We generate
+ * a record batch with a single vector of type repeated varchar vector. Each record is a single
+ * value within the vector containing all the fields in the record as individual array elements.
+ */
+class RepeatedVarCharOutput extends TextOutput {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatedVarCharOutput.class);
+
+  static final String COL_NAME = "columns";
+  static final FieldReference REF = new FieldReference(COL_NAME);
+  static final SchemaPath COLUMNS = SchemaPath.getSimplePath("columns");
+  public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024;
+
+  // output vector
+  private final RepeatedVarCharVector vector;
+
+  // mutator for the output vector
+  private final RepeatedVarCharVector.Mutator mutator;
+
+  // boolean array indicating which fields are selected (if star query entire array is set to true)
+  private final boolean[] collectedFields;
+
+  // pointer to keep track of the offsets per record
+  private long repeatedOffset;
+
+  // pointer to keep track of the original offsets per record
+  private long repeatedOffsetOriginal;
+
+  // pointer to end of the offset buffer
+  private long repeatedOffsetMax;
+
+  // pointer to the start of the actual data buffer
+  private long characterDataOriginal;
+
+  // pointer to the current location of the data buffer
+  private long characterData;
+
+  // pointer to the end of the data buffer
+  private long characterDataMax;
+
+  // current pointer into the buffer that keeps track of the length of individual fields
+  private long charLengthOffset;
+
+  // pointer to the start of the length buffer
+  private long charLengthOffsetOriginal;
+
+  // pointer to the end of length buffer
+  private long charLengthOffsetMax;
+
+  // pointer to the beginning of the record
+  private long recordStart;
+
+  // total number of records processed (across batches)
+  private long recordCount;
+
+  // number of records processed in this current batch
+  private int batchIndex;
+
+  // current index of the field being processed within the record
+  private int fieldIndex = -1;
+
+  /* boolean to indicate if we are currently appending data to the output vector
+   * Its set to false when we have hit out of memory or we are not interested in
+   * the particular field
+   */
+  private boolean collect;
+
+  // are we currently appending to a field
+  private boolean fieldOpen;
+
+  // maximum number of fields/columns
+  private final int maxField;
+
+  /**
+   * We initialize and add the repeated varchar vector to the record batch in this
+   * constructor. Perform some sanity checks if the selected columns are valid or not.
+   * @param outputMutator  Used to create/modify schema in the record batch
+   * @param columns  List of columns selected in the query
+   * @param isStarQuery  boolean to indicate if all fields are selected or not
+   * @throws SchemaChangeException
+   */
+  public RepeatedVarCharOutput(OutputMutator outputMutator, Collection<SchemaPath> columns, boolean isStarQuery) throws SchemaChangeException {
+    super();
+
+    MaterializedField field = MaterializedField.create(REF, Types.repeated(TypeProtos.MinorType.VARCHAR));
+    this.vector = outputMutator.addField(field, RepeatedVarCharVector.class);
+
+    this.mutator = vector.getMutator();
+
+
+    { // setup fields
+      List<Integer> columnIds = new ArrayList<Integer>();
+      if (!isStarQuery) {
+        String pathStr;
+        for (SchemaPath path : columns) {
+          assert path.getRootSegment().isNamed();
+          pathStr = path.getRootSegment().getPath();
+          Preconditions.checkArgument(pathStr.equals(COL_NAME) || (pathStr.equals("*") && path.getRootSegment().getChild() == null),
+              "Selected column(s) must have name 'columns' or must be plain '*'");
+
+          if (path.getRootSegment().getChild() != null) {
+            Preconditions.checkArgument(path.getRootSegment().getChild().isArray(), "Selected column must be an array index");
+            int index = path.getRootSegment().getChild().getArraySegment().getIndex();
+            columnIds.add(index);
+          }
+        }
+        Collections.sort(columnIds);
+
+      }
+
+      boolean[] fields = new boolean[MAXIMUM_NUMBER_COLUMNS];
+
+      int maxField = fields.length;
+
+      if(isStarQuery){
+        Arrays.fill(fields, true);
+      }else{
+        for(Integer i : columnIds){
+          maxField = 0;
+          maxField = Math.max(maxField, i);
+          fields[i] = true;
+        }
+      }
+      this.collectedFields = fields;
+      this.maxField = maxField;
+    }
+
+
+  }
+
+  /**
+   * Start a new record batch. Resets all the offsets and pointers that
+   * store buffer addresses
+   */
+  public void startBatch() {
+    this.recordStart = characterDataOriginal;
+    this.fieldOpen = false;
+    this.batchIndex = 0;
+    this.fieldIndex = -1;
+    this.collect = true;
+
+    loadRepeatedOffsetAddress();
+    loadVarCharOffsetAddress();
+    loadVarCharDataAddress();
+  }
+
+  private void loadRepeatedOffsetAddress(){
+    DrillBuf buf = vector.getOffsetVector().getBuffer();
+    checkBuf(buf);
+    this.repeatedOffset = buf.memoryAddress() + 4;
+    this.repeatedOffsetOriginal = buf.memoryAddress() + 4;
+    this.repeatedOffsetMax = buf.memoryAddress() + buf.capacity();
+  }
+
+  private void loadVarCharDataAddress(){
+    DrillBuf buf = vector.getValuesVector().getBuffer();
+    checkBuf(buf);
+    this.characterData = buf.memoryAddress();
+    this.characterDataOriginal = buf.memoryAddress();
+    this.characterDataMax = buf.memoryAddress() + buf.capacity();
+  }
+
+  private void loadVarCharOffsetAddress(){
+    DrillBuf buf = vector.getValuesVector().getOffsetVector().getBuffer();
+    checkBuf(buf);
+    this.charLengthOffset = buf.memoryAddress() + 4;
+    this.charLengthOffsetOriginal = buf.memoryAddress() + 4; // add four as offsets conceptually start at 1. (first item is 0..1)
+    this.charLengthOffsetMax = buf.memoryAddress() + buf.capacity();
+  }
+
+  private void expandVarCharOffsets(){
+    vector.getValuesVector().getOffsetVector().reAlloc();
+    long diff = charLengthOffset - charLengthOffsetOriginal;
+    loadVarCharOffsetAddress();
+    charLengthOffset += diff;
+  }
+
+  private void expandVarCharData(){
+    vector.getValuesVector().reAlloc();
+    long diff = characterData - characterDataOriginal;
+    loadVarCharDataAddress();
+    characterData += diff;
+  }
+
+  private void expandRepeatedOffsets(){
+    vector.getOffsetVector().reAlloc();
+    long diff = repeatedOffset - repeatedOffsetOriginal;
+    loadRepeatedOffsetAddress();
+    repeatedOffset += diff;
+  }
+
+  /**
+   * Helper method to check if the buffer we are accessing
+   * has a minimum reference count and has not been deallocated
+   * @param b  working drill buffer
+   */
+  private void checkBuf(DrillBuf b){
+    if(b.refCnt() < 1){
+      throw new IllegalStateException("Cannot access a dereferenced buffer.");
+    }
+  }
+
+  @Override
+  public void startField(int index) {
+    fieldIndex = index;
+    collect = collectedFields[index];
+    fieldOpen = true;
+  }
+
+  @Override
+  public boolean endField() {
+    fieldOpen = false;
+
+    if(charLengthOffset >= charLengthOffsetMax){
+      expandVarCharOffsets();
+    }
+
+    int newOffset = (int) (characterData - characterDataOriginal);
+    PlatformDependent.putInt(charLengthOffset, newOffset);
+    charLengthOffset += 4;
+    return fieldIndex < maxField;
+  }
+
+  @Override
+  public boolean endEmptyField() {
+    return endField();
+  }
+
+  @Override
+  public void append(byte data) {
+    if(!collect){
+      return;
+    }
+
+    if(characterData >= characterDataMax){
+      expandVarCharData();
+    }
+
+    PlatformDependent.putByte(characterData, data);
+    characterData++;
+
+  }
+
+  @Override
+  public long getRecordCount() {
+    return recordCount;
+  }
+
+  @Override
+  public boolean rowHasData() {
+    return this.recordStart < characterData;
+  }
+
+  @Override
+  public void finishRecord() {
+    this.recordStart = characterData;
+
+    if(fieldOpen){
+      endField();
+    }
+
+    if(repeatedOffset >= repeatedOffsetMax){
+      expandRepeatedOffsets();
+    }
+
+    int newOffset = ((int) (charLengthOffset - charLengthOffsetOriginal))/4;
+    PlatformDependent.putInt(repeatedOffset, newOffset);
+    repeatedOffset += 4;
+
+    // if there were no defined fields, skip.
+    if(fieldIndex > -1){
+      batchIndex++;
+      recordCount++;
+    }
+
+
+  }
+
+
+  // Sets the record count in this batch within the value vector
+  @Override
+  public void finishBatch() {
+    mutator.setValueCount(batchIndex);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/StreamFinishedPseudoException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/StreamFinishedPseudoException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/StreamFinishedPseudoException.java
new file mode 100644
index 0000000..ab9ee0d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/StreamFinishedPseudoException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+class StreamFinishedPseudoException extends RuntimeException {
+
+  public static final StreamFinishedPseudoException INSTANCE = new StreamFinishedPseudoException();
+
+  private StreamFinishedPseudoException() {
+    super("", null, false, true);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
new file mode 100644
index 0000000..c764f56
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextInput.java
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+/*******************************************************************************
+ * Copyright 2014 uniVocity Software Pty Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+import io.netty.buffer.DrillBuf;
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.util.AssertionUtil;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+
+import com.google.common.base.Preconditions;
+import com.univocity.parsers.common.Format;
+
+/**
+ * Class that fronts an InputStream to provide a byte consumption interface.
+ * Also manages only reading lines to and from each split.
+ */
+final class TextInput {
+
+  private static final byte NULL_BYTE = (byte) '\0';
+  private final byte lineSeparator1;
+  private final byte lineSeparator2;
+  private final byte normalizedLineSeparator;
+  private final TextParsingSettings settings;
+
+  private long lineCount;
+  private long charCount;
+
+  /**
+   * The starting position in the file.
+   */
+  private final long startPos;
+  private final long endPos;
+
+  private int bufferMark;
+  private long streamMark;
+
+  private long streamPos;
+
+  private final Seekable seekable;
+  private final FSDataInputStream inputFS;
+  private final InputStream input;
+
+  private final DrillBuf buffer;
+  private final ByteBuffer underlyingBuffer;
+  private final long bStart;
+  private final long bStartMinus1;
+
+  private final boolean bufferReadable;
+
+  /**
+   * Whether there was a possible partial line separator on the previous
+   * read so we dropped it and it should be appended to next read.
+   */
+  private boolean remByte = false;
+
+  /**
+   * The current position in the buffer.
+   */
+  public int bufferPtr;
+
+  /**
+   * The quantity of valid data in the buffer.
+   */
+  public int length = -1;
+
+  private boolean endFound = false;
+
+  /**
+   * Creates a new instance with the mandatory characters for handling newlines transparently.
+   * @param lineSeparator the sequence of characters that represent a newline, as defined in {@link Format#getLineSeparator()}
+   * @param normalizedLineSeparator the normalized newline character (as defined in {@link Format#getNormalizedNewline()}) that is used to replace any lineSeparator sequence found in the input.
+   */
+  public TextInput(TextParsingSettings settings, InputStream input, DrillBuf readBuffer, long startPos, long endPos) {
+    byte[] lineSeparator = settings.getNewLineDelimiter();
+    byte normalizedLineSeparator = settings.getNormalizedNewLine();
+    Preconditions.checkArgument(lineSeparator != null && (lineSeparator.length == 1 || lineSeparator.length == 2), "Invalid line separator. Expected 1 to 2 characters");
+    Preconditions.checkArgument(input instanceof Seekable, "Text input only supports an InputStream that supports Seekable.");
+    boolean isCompressed = input instanceof CompressionInputStream ;
+    Preconditions.checkArgument(!isCompressed || startPos == 0, "Cannot use split on compressed stream.");
+
+    // splits aren't allowed with compressed data.  The split length will be the compressed size which means we'll normally end prematurely.
+    if(isCompressed && endPos > 0){
+      endPos = Long.MAX_VALUE;
+    }
+
+    this.input = input;
+    this.seekable = (Seekable) input;
+    this.settings = settings;
+
+    if(input instanceof FSDataInputStream){
+      this.inputFS = (FSDataInputStream) input;
+      this.bufferReadable = inputFS.getWrappedStream() instanceof ByteBufferReadable;
+    }else{
+      this.inputFS = null;
+      this.bufferReadable = false;
+    }
+
+    this.startPos = startPos;
+    this.endPos = endPos;
+
+    this.lineSeparator1 = lineSeparator[0];
+    this.lineSeparator2 = lineSeparator.length == 2 ? lineSeparator[1] : NULL_BYTE;
+    this.normalizedLineSeparator = normalizedLineSeparator;
+
+    this.buffer = readBuffer;
+    this.bStart = buffer.memoryAddress();
+    this.bStartMinus1 = bStart -1;
+    this.underlyingBuffer = buffer.nioBuffer(0, buffer.capacity());
+  }
+
+  /**
+   * Test the input to position for read start.  If the input is a non-zero split or
+   * splitFirstLine is enabled, input will move to appropriate complete line.
+   * @throws IOException
+   */
+  final void start() throws IOException {
+    lineCount = 0;
+    if(startPos > 0){
+      seekable.seek(startPos);
+    }
+
+    updateBuffer();
+    if (length > 0) {
+      if(startPos > 0 || settings.isSkipFirstLine()){
+
+        // move to next full record.
+        skipLines(1);
+      }
+    }
+  }
+
+
+  /**
+   * Helper method to get the most recent characters consumed since the last record started.
+   * May get an incomplete string since we don't support stream rewind.  Returns empty string for now.
+   * @return String of last few bytes.
+   * @throws IOException
+   */
+  public String getStringSinceMarkForError() throws IOException {
+    return " ";
+  }
+
+  long getPos(){
+    return streamPos + bufferPtr;
+  }
+
+  public void mark(){
+    streamMark = streamPos;
+    bufferMark = bufferPtr;
+  }
+
+  /**
+   * read some more bytes from the stream.  Uses the zero copy interface if available.  Otherwise, does byte copy.
+   * @throws IOException
+   */
+  private final void read() throws IOException {
+    if(bufferReadable){
+
+      if(remByte){
+        underlyingBuffer.put(lineSeparator1);
+        remByte = false;
+      }
+      length = inputFS.read(underlyingBuffer);
+
+    }else{
+
+      byte[] b = new byte[underlyingBuffer.capacity()];
+      if(remByte){
+        b[0] = lineSeparator1;
+        length = input.read(b, 1, b.length - 1);
+        remByte = false;
+      }else{
+        length = input.read(b);
+      }
+
+      underlyingBuffer.put(b);
+    }
+  }
+
+
+  /**
+   * Read more data into the buffer.  Will also manage split end conditions.
+   * @throws IOException
+   */
+  private final void updateBuffer() throws IOException {
+    streamPos = seekable.getPos();
+    underlyingBuffer.clear();
+
+    if(endFound){
+      length = -1;
+      return;
+    }
+
+    read();
+
+    // check our data read allowance.
+    if(streamPos + length >= this.endPos){
+      updateLengthBasedOnConstraint();
+    }
+
+    charCount += bufferPtr;
+    bufferPtr = 1;
+
+    buffer.writerIndex(underlyingBuffer.limit());
+    buffer.readerIndex(underlyingBuffer.position());
+
+  }
+
+  /**
+   * Checks to see if we can go over the end of our bytes constraint on the data.  If so,
+   * adjusts so that we can only read to the last character of the first line that crosses
+   * the split boundary.
+   */
+  private void updateLengthBasedOnConstraint(){
+    // we've run over our alotted data.
+    final byte lineSeparator1 = this.lineSeparator1;
+    final byte lineSeparator2 = this.lineSeparator2;
+
+    // find the next line separator:
+    final long max = bStart + length;
+
+    for(long m = this.bStart + (endPos - streamPos); m < max; m++){
+      if(PlatformDependent.getByte(m) == lineSeparator1){
+        // we found a potential line break.
+
+        if(lineSeparator2 == NULL_BYTE){
+          // we found a line separator and don't need to consult the next byte.
+          length = (int)(m - bStart);
+          endFound = true;
+          break;
+        }else{
+          // this is a two byte line separator.
+
+          long mPlus = m+1;
+          if(mPlus < max){
+            // we can check next byte and see if the second lineSeparator is correct.
+            if(lineSeparator2 == PlatformDependent.getByte(mPlus)){
+              length = (int)(mPlus - bStart);
+              endFound = true;
+              break;
+            }else{
+              // this was a partial line break.
+              continue;
+            }
+          }else{
+            // the last character of the read was a remnant byte.  We'll hold off on dealing with this byte until the next read.
+            remByte = true;
+            length -= 1;
+            break;
+          }
+
+        }
+      }
+    }
+  }
+
+  /**
+   * Get next byte from stream.  Also maintains the current line count.  Will throw a StreamFinishedPseudoException
+   * when the stream has run out of bytes.
+   * @return next byte from stream.
+   * @throws IOException
+   */
+  public final byte nextChar() throws IOException {
+    final byte lineSeparator1 = this.lineSeparator1;
+    final byte lineSeparator2 = this.lineSeparator2;
+
+    if (length == -1) {
+      throw StreamFinishedPseudoException.INSTANCE;
+    }
+
+    if(AssertionUtil.BOUNDS_CHECKING_ENABLED){
+      buffer.checkBytes(bufferPtr - 1, bufferPtr);
+    }
+
+    byte byteChar = PlatformDependent.getByte(bStartMinus1 + bufferPtr);
+
+    if (bufferPtr >= length) {
+      if (length != -1) {
+        updateBuffer();
+        bufferPtr--;
+      } else {
+        throw StreamFinishedPseudoException.INSTANCE;
+      }
+    }
+
+    bufferPtr++;
+
+    // monitor for next line.
+    if (lineSeparator1 == byteChar && (lineSeparator2 == NULL_BYTE || lineSeparator2 == buffer.getByte(bufferPtr - 1))) {
+      lineCount++;
+
+      if (lineSeparator2 != NULL_BYTE) {
+        byteChar = normalizedLineSeparator;
+
+        if (bufferPtr >= length) {
+          if (length != -1) {
+            updateBuffer();
+          } else {
+            throw StreamFinishedPseudoException.INSTANCE;
+          }
+        }
+      }
+    }
+    return byteChar;
+  }
+
+  /**
+   * Number of lines read since the start of this split.
+   * @return
+   */
+  public final long lineCount() {
+    return lineCount;
+  }
+
+  /**
+   * Skip forward the number of line delimiters.  If you are in the middle of a line,
+   * a value of 1 will skip to the start of the next record.
+   * @param lines Number of lines to skip.
+   * @throws IOException
+   */
+  public final void skipLines(int lines) throws IOException {
+    if (lines < 1) {
+      return;
+    }
+    long expectedLineCount = this.lineCount + lines;
+
+    try {
+      do {
+        nextChar();
+      } while (lineCount < expectedLineCount);
+      if (lineCount < lines) {
+        throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
+      }
+    } catch (EOFException ex) {
+      throw new IllegalArgumentException("Unable to skip " + lines + " lines from line " + (expectedLineCount - lines) + ". End of input reached");
+    }
+  }
+
+  public final long charCount() {
+    return charCount + bufferPtr;
+  }
+
+  public long getLineCount() {
+    return lineCount;
+  }
+
+  public void close() throws IOException{
+    input.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextOutput.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextOutput.java
new file mode 100644
index 0000000..66b1165
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextOutput.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+/* Base class for producing output record batches while dealing with
+ * Text files.
+ */
+abstract class TextOutput {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TextOutput.class);
+
+  /**
+   * Start processing a new field within a record.
+   * @param index  index within the record
+   */
+  public abstract void startField(int index);
+
+  /**
+   * End processing a field within a record.
+   * @return  true if engine should continue processing record.  false if rest of record can be skipped.
+   */
+  public abstract boolean endField();
+
+  /**
+   * Shortcut that lets the output know that we are closing ending a field with no data.
+   * @return true if engine should continue processing record.  false if rest of record can be skipped.
+   */
+  public abstract boolean endEmptyField();
+
+  /**
+   * Add the provided data but drop any whitespace.
+   * @param data
+   */
+  public void appendIgnoringWhitespace(byte data){
+    if(TextReader.isWhite(data)){
+      // noop
+    }else{
+      append(data);
+    }
+  }
+
+  /**
+   * This function appends the byte to the output character data buffer
+   * @param data  current byte read
+   */
+  public abstract void append(byte data);
+
+  /**
+   * Completes the processing of a given record. Also completes the processing of the
+   * last field being read.
+   */
+  public abstract void finishRecord();
+
+  /**
+   *  Return the total number of records (across batches) processed
+   */
+  public abstract long getRecordCount();
+
+  /**
+   * Informs output to setup for new record batch.
+   */
+  public abstract void startBatch();
+
+  /**
+   * Does any final cleanup that is required for closing a batch.  Example might include closing the last field.
+   */
+  public abstract void finishBatch();
+
+  /**
+   * Helper method to check if the current record has any non-empty fields
+   */
+  public abstract boolean rowHasData();
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/7ec99871/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingContext.java
new file mode 100644
index 0000000..7b95ee2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/TextParsingContext.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text.compliant;
+
+import java.io.IOException;
+
+import com.univocity.parsers.common.ParserOutput;
+import com.univocity.parsers.common.ParsingContext;
+import com.univocity.parsers.common.input.CharInputReader;
+
+class TextParsingContext implements ParsingContext {
+
+  private final TextInput input;
+  private final TextOutput output;
+  protected boolean stopped = false;
+
+  private int[] extractedIndexes = null;
+
+  public TextParsingContext(TextInput input, TextOutput output) {
+    this.input = input;
+    this.output = output;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void stop() {
+    stopped = true;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isStopped() {
+    return stopped;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long currentLine() {
+    return input.lineCount();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long currentChar() {
+    return input.charCount();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int currentColumn() {
+    return -1;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String[] headers() {
+    return new String[]{};
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int[] extractedFieldIndexes() {
+    return extractedIndexes;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long currentRecord() {
+    return output.getRecordCount();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String currentParsedContent() {
+    try {
+      return input.getStringSinceMarkForError();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void skipLines(int lines) {
+  }
+
+  @Override
+  public boolean columnsReordered() {
+    return false;
+  }
+}
+


[7/7] drill git commit: DRILL-2942: Enable access to epoll mode using system property -Ddrill.exec.enable-epoll=true.

Posted by ja...@apache.org.
DRILL-2942: Enable access to epoll mode using system property -Ddrill.exec.enable-epoll=true.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6b98db38
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6b98db38
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6b98db38

Branch: refs/heads/master
Commit: 6b98db386204ca40769aad9ef2c4c3ba89147668
Parents: e14d989
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sat May 2 17:32:12 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 7 00:12:26 2015 -0700

----------------------------------------------------------------------
 distribution/src/resources/drill-env.sh                  |  2 +-
 .../main/java/org/apache/drill/exec/ExecConstants.java   |  1 +
 .../java/org/apache/drill/exec/rpc/TransportCheck.java   | 11 +++++------
 3 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6b98db38/distribution/src/resources/drill-env.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-env.sh b/distribution/src/resources/drill-env.sh
index 2aede3f..345938e 100644
--- a/distribution/src/resources/drill-env.sh
+++ b/distribution/src/resources/drill-env.sh
@@ -16,7 +16,7 @@
 DRILL_MAX_DIRECT_MEMORY="8G"
 DRILL_HEAP="4G"
 
-export DRILL_JAVA_OPTS="-Xms$DRILL_HEAP -Xmx$DRILL_HEAP -XX:MaxDirectMemorySize=$DRILL_MAX_DIRECT_MEMORY -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=1G -ea"
+export DRILL_JAVA_OPTS="-Xms$DRILL_HEAP -Xmx$DRILL_HEAP -XX:MaxDirectMemorySize=$DRILL_MAX_DIRECT_MEMORY -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=1G -Ddrill.exec.enable-epoll=true -ea"
 
 # Class unloading is disabled by default in Java 7
 # http://hg.openjdk.java.net/jdk7u/jdk7u60/hotspot/file/tip/src/share/vm/runtime/globals.hpp#l1622

http://git-wip-us.apache.org/repos/asf/drill/blob/6b98db38/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index fb764c7..1591079 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -87,6 +87,7 @@ public interface ExecConstants {
   public static final String ENABLE_FRAGMENT_MEMORY_LIMIT = "drill.exec.memory.enable_frag_limit";
   public static final String FRAGMENT_MEM_OVERCOMMIT_FACTOR = "drill.exec.memory.frag_mem_overcommit_factor";
 
+  public static final String USE_LINUX_EPOLL = "drill.exec.enable-epoll";
 
   public static final String CLIENT_SUPPORT_COMPLEX_TYPES = "drill.client.supports-complex-types";
 

http://git-wip-us.apache.org/repos/asf/drill/blob/6b98db38/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java
index 6401518..34da53c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/TransportCheck.java
@@ -30,10 +30,11 @@ import io.netty.util.internal.SystemPropertyUtil;
 
 import java.util.Locale;
 
+import org.apache.drill.exec.ExecConstants;
+
 /**
  * TransportCheck decides whether or not to use the native EPOLL mechanism for communication.
  */
-@SuppressWarnings("unused")
 public class TransportCheck {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TransportCheck.class);
 
@@ -43,12 +44,10 @@ public class TransportCheck {
 
     String name = SystemPropertyUtil.get("os.name").toLowerCase(Locale.US).trim();
 
-    if (!name.startsWith("linux") || SystemPropertyUtil.getBoolean("drill.exec.disable-linux-epoll", false) //
-        /* disable epoll */  || true //
-        ) {
-      SUPPORTS_EPOLL = false;
-    }else{
+    if (name.startsWith("linux") && SystemPropertyUtil.getBoolean(ExecConstants.USE_LINUX_EPOLL, false)) {
       SUPPORTS_EPOLL = true;
+    } else {
+      SUPPORTS_EPOLL = false;
     }
   }