You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/06 14:40:51 UTC

drill git commit: DRILL-2959: Make sure to close out compression codecs.

Repository: drill
Updated Branches:
  refs/heads/DRILL-2959v2 [created] e1fb13f47


DRILL-2959: Make sure to close out compression codecs.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e1fb13f4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e1fb13f4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e1fb13f4

Branch: refs/heads/DRILL-2959v2
Commit: e1fb13f47d3332f643e3229a5553652b4afa20ab
Parents: 3b19076
Author: Jacques Nadeau <ja...@apache.org>
Authored: Mon May 4 18:14:38 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed May 6 13:33:05 2015 +0100

----------------------------------------------------------------------
 .../exec/store/parquet/DirectCodecFactory.java  | 379 +++++++++++++++++++
 .../exec/store/parquet/DirectCodecPool.java     | 187 +++++++++
 .../ParquetDirectByteBufferAllocator.java       |  13 +-
 .../exec/store/parquet/ParquetFormatPlugin.java |   7 -
 .../exec/store/parquet/ParquetRecordWriter.java |  11 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   3 +-
 .../store/parquet/columnreaders/PageReader.java |  32 +-
 .../columnreaders/ParquetRecordReader.java      |  41 +-
 .../exec/store/parquet2/DrillParquetReader.java |  23 +-
 .../parquet/hadoop/CodecFactoryExposer.java     | 160 --------
 .../parquet/hadoop/ColumnChunkIncReadStore.java |   7 +-
 .../ColumnChunkPageWriteStoreExposer.java       |  14 +-
 .../exec/store/TestDirectCodecFactory.java      | 155 ++++++++
 .../store/parquet/ParquetRecordReaderTest.java  |   3 +-
 pom.xml                                         |   2 +-
 15 files changed, 797 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java
new file mode 100644
index 0000000..ed455a2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.IdentityHashMap;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.xerial.snappy.Snappy;
+
+import parquet.bytes.ByteBufferAllocator;
+import parquet.bytes.BytesInput;
+import parquet.hadoop.CodecFactory;
+import parquet.hadoop.CodecFactory.BytesCompressor;
+import parquet.hadoop.HeapCodecFactory.HeapBytesCompressor;
+import parquet.hadoop.HeapCodecFactory.HeapBytesDecompressor;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import com.google.common.base.Preconditions;
+
+public class DirectCodecFactory extends CodecFactory<BytesCompressor, DirectBytesDecompressor> implements AutoCloseable {
+
+  private final ByteBufferAllocator allocator;
+  private final IdentityHashMap<ByteBuffer, Integer> allocatedBuffers = new IdentityHashMap<ByteBuffer, Integer>();
+
+  public DirectCodecFactory(Configuration config, ByteBufferAllocator allocator) {
+    super(config);
+    Preconditions.checkNotNull(allocator);
+    this.allocator = allocator;
+  }
+
+  public DirectCodecFactory(Configuration config, BufferAllocator allocator) {
+    this(config, new ParquetDirectByteBufferAllocator(allocator));
+  }
+
+  private ByteBuffer ensure(ByteBuffer buffer, int size) {
+    if (buffer == null) {
+      buffer = allocator.allocate(size);
+      allocatedBuffers.put(buffer, 0);
+    } else if (buffer.capacity() >= size) {
+      buffer.clear();
+    } else {
+      allocator.release(buffer);
+      release(buffer);
+      buffer = allocator.allocate(size);
+      allocatedBuffers.put(buffer, 0);
+    }
+    return buffer;
+  }
+
+  ByteBuffer release(ByteBuffer buffer) {
+    if (buffer != null) {
+      allocator.release(buffer);
+      allocatedBuffers.remove(buffer);
+    }
+    return null;
+  }
+
+  @Override
+  protected BytesCompressor createCompressor(final CompressionCodecName codecName, final CompressionCodec codec,
+      int pageSize) {
+
+    if (codec == null) {
+      return new NoopCompressor();
+    } else if (codecName == CompressionCodecName.SNAPPY) {
+      // avoid using the Parquet Snappy codec since it allocates direct buffers at awkward spots.
+      return new SnappyCompressor();
+    } else {
+
+      // todo: move zlib above since it also generates allocateDirect calls.
+      return new HeapBytesCompressor(codecName, codec, pageSize);
+    }
+  }
+
+  @Override
+  protected DirectBytesDecompressor createDecompressor(final CompressionCodec codec) {
+    // if (true) {
+    // return new HeapFakeDirect(codec);
+    // }
+
+    if (codec == null) {
+      return new NoopDecompressor();
+    } else if (DirectCodecPool.INSTANCE.codec(codec).supportsDirectDecompression()) {
+      return new FullDirectDecompressor(codec);
+    } else {
+      // return new HeapFakeDirect(codec);
+      return new IndirectDecompressor(codec);
+    }
+  }
+
+  public void close() {
+    release();
+  }
+
+  private class HeapFakeDirect extends DirectBytesDecompressor {
+
+    private final ExposedHeapBytesDecompressor innerCompressor;
+
+    public HeapFakeDirect(CompressionCodec codec){
+      innerCompressor = new ExposedHeapBytesDecompressor(codec);
+    }
+
+    @Override
+    public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+        throws IOException {
+      BytesInput uncompressed = decompress(new ByteBufBytesInput(input), uncompressedSize);
+      output.clear();
+      output.setBytes(0, uncompressed.toByteArray());
+      output.writerIndex((int) uncompressed.size());
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput paramBytesInput, int uncompressedSize) throws IOException {
+      return innerCompressor.decompress(paramBytesInput, uncompressedSize);
+    }
+
+    @Override
+    protected void release() {
+      innerCompressor.release();
+    }
+
+  }
+
+  private class ExposedHeapBytesDecompressor extends HeapBytesDecompressor {
+    public ExposedHeapBytesDecompressor(CompressionCodec codec) {
+      super(codec);
+    }
+
+    public void release() {
+      super.release();
+    }
+  }
+
+  public class IndirectDecompressor extends DirectBytesDecompressor {
+    private final Decompressor decompressor;
+
+    public IndirectDecompressor(CompressionCodec codec) {
+      this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor();
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+      decompressor.reset();
+      byte[] inputBytes = bytes.toByteArray();
+      decompressor.setInput(inputBytes, 0, inputBytes.length);
+      byte[] output = new byte[uncompressedSize];
+      decompressor.decompress(output, 0, uncompressedSize);
+      return BytesInput.from(output);
+    }
+
+    @Override
+    public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+        throws IOException {
+
+      decompressor.reset();
+      byte[] inputBytes = new byte[input.capacity()];
+      input.getBytes(0, inputBytes);
+      decompressor.setInput(inputBytes, 0, inputBytes.length);
+      byte[] outputBytes = new byte[uncompressedSize];
+      decompressor.decompress(outputBytes, 0, uncompressedSize);
+      output.clear();
+      output.writeBytes(outputBytes);
+    }
+
+    @Override
+    protected void release() {
+      DirectCodecPool.INSTANCE.returnDecompressor(decompressor);
+    }
+  }
+
+  public class FullDirectDecompressor extends DirectBytesDecompressor {
+    private final DirectDecompressor decompressor;
+    private ByteBuffer compressedBuffer;
+    private ByteBuffer uncompressedBuffer;
+    private ExposedHeapBytesDecompressor extraDecompressor;
+    public FullDirectDecompressor(CompressionCodec codec){
+      this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDirectDecompressor();
+      this.extraDecompressor = new ExposedHeapBytesDecompressor(codec);
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput compressedBytes, int uncompressedSize) throws IOException {
+
+      if(false){
+        // TODO: fix direct path. (currently, this code is causing issues when writing complex Parquet files.
+        ByteBuffer bufferIn = compressedBytes.toByteBuffer();
+        uncompressedBuffer = ensure(uncompressedBuffer, uncompressedSize);
+        uncompressedBuffer.clear();
+
+        if (bufferIn.isDirect()) {
+          decompressor.decompress(bufferIn, uncompressedBuffer);
+        } else {
+          compressedBuffer = ensure(this.compressedBuffer, (int) compressedBytes.size());
+          compressedBuffer.clear();
+          compressedBuffer.put(bufferIn);
+          compressedBuffer.flip();
+          decompressor.decompress(compressedBuffer, uncompressedBuffer);
+        }
+        return BytesInput.from(uncompressedBuffer, 0, uncompressedSize);
+
+      } else {
+        return extraDecompressor.decompress(compressedBytes, uncompressedSize);
+      }
+
+
+    }
+
+
+    @Override
+    public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+        throws IOException {
+      output.clear();
+      decompressor.decompress(input.nioBuffer(0, compressedSize), output.nioBuffer(0, uncompressedSize));
+      output.writerIndex(uncompressedSize);
+    }
+
+    @Override
+    protected void release() {
+      compressedBuffer = DirectCodecFactory.this.release(compressedBuffer);
+      uncompressedBuffer = DirectCodecFactory.this.release(uncompressedBuffer);
+      DirectCodecPool.INSTANCE.returnDecompressor(decompressor);
+      extraDecompressor.release();
+    }
+
+  }
+
+  public class NoopDecompressor extends DirectBytesDecompressor {
+
+    @Override
+    public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+        throws IOException {
+      Preconditions.checkArgument(compressedSize == uncompressedSize,
+          "Non-compressed data did not have matching compressed and uncompressed sizes.");
+      output.clear();
+      output.writeBytes(input, compressedSize);
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+      return bytes;
+    }
+
+    @Override
+    protected void release() {
+    }
+
+  }
+
+  public class SnappyCompressor extends BytesCompressor {
+
+    private ByteBuffer incoming;
+    private ByteBuffer outgoing;
+
+    public SnappyCompressor() {
+      super();
+    }
+
+    @Override
+    public BytesInput compress(BytesInput bytes) throws IOException {
+      int maxOutputSize = Snappy.maxCompressedLength((int) bytes.size());
+      ByteBuffer bufferIn = bytes.toByteBuffer();
+      outgoing = ensure(outgoing, maxOutputSize);
+      final int size;
+      if (bufferIn.isDirect()) {
+        size = Snappy.compress(bufferIn, outgoing);
+      } else {
+        this.incoming = ensure(this.incoming, (int) bytes.size());
+        this.incoming.put(bufferIn);
+        this.incoming.flip();
+        size = Snappy.compress(this.incoming, outgoing);
+      }
+
+      return BytesInput.from(outgoing, 0, (int) size);
+    }
+
+    @Override
+    public CompressionCodecName getCodecName() {
+      return CompressionCodecName.SNAPPY;
+    }
+
+    @Override
+    protected void release() {
+      outgoing = DirectCodecFactory.this.release(outgoing);
+      incoming = DirectCodecFactory.this.release(incoming);
+    }
+
+  }
+
+  public static class NoopCompressor extends BytesCompressor {
+
+    @Override
+    public BytesInput compress(BytesInput bytes) throws IOException {
+      return bytes;
+    }
+
+    @Override
+    public CompressionCodecName getCodecName() {
+      return CompressionCodecName.UNCOMPRESSED;
+    }
+
+    @Override
+    protected void release() {
+    }
+
+  }
+
+  public static class ByteBufBytesInput extends BytesInput {
+    private final ByteBuf buf;
+    private final int length;
+
+    public ByteBufBytesInput(ByteBuf buf) {
+      this(buf, 0, buf.capacity());
+    }
+
+    public ByteBufBytesInput(ByteBuf buf, int offset, int length) {
+      super();
+      if(buf.capacity() == length && offset == 0){
+        this.buf = buf;
+      }else{
+        this.buf = buf.slice(offset, length);
+      }
+
+      this.length = length;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      final WritableByteChannel outputChannel = Channels.newChannel(out);
+      outputChannel.write(buf.nioBuffer());
+    }
+
+    @Override
+    public ByteBuffer toByteBuffer() throws IOException {
+      return buf.nioBuffer();
+    }
+
+    @Override
+    public long size() {
+      return length;
+    }
+  }
+
+
+  public abstract class DirectBytesDecompressor extends CodecFactory.BytesDecompressor {
+    public abstract void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+        throws IOException;
+  }
+
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java
new file mode 100644
index 0000000..26d97c9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.commons.pool.BasePoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressionCodec;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class DirectCodecPool {
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectCodecPool.class);
+
+  public static final DirectCodecPool INSTANCE = new DirectCodecPool();
+
+  @SuppressWarnings("unchecked")
+  private final Map<CompressionCodec, CodecPool> codecs = (Map<CompressionCodec, CodecPool>) (Object) Collections.synchronizedMap(Maps.newHashMap());
+
+  @SuppressWarnings("unchecked")
+  private final Map<Class<?>, GenericObjectPool> directDePools = (Map<Class<?>, GenericObjectPool>) (Object) Collections
+      .synchronizedMap(Maps.newHashMap());
+  private final Map<Class<?>, GenericObjectPool> dePools = (Map<Class<?>, GenericObjectPool>) (Object) Collections
+      .synchronizedMap(Maps.newHashMap());
+  private final Map<Class<?>, GenericObjectPool> cPools = (Map<Class<?>, GenericObjectPool>) (Object) Collections
+      .synchronizedMap(Maps.newHashMap());
+
+  private DirectCodecPool() {
+  }
+
+  public class CodecPool {
+    private final GenericObjectPool compressorPool;
+    private final GenericObjectPool decompressorPool;
+    private final GenericObjectPool directDecompressorPool;
+    private final boolean supportDirectDecompressor;
+
+    private CodecPool(final CompressionCodec codec){
+      try {
+        boolean supportDirectDecompressor = codec instanceof DirectDecompressionCodec;
+        compressorPool = new GenericObjectPool(new BasePoolableObjectFactory() {
+          public Object makeObject() throws Exception {
+            return codec.createCompressor();
+          }
+        }, Integer.MAX_VALUE);
+
+        Object com = compressorPool.borrowObject();
+        if (com != null) {
+          cPools.put(com.getClass(), compressorPool);
+          compressorPool.returnObject(com);
+        }else{
+          logger.warn("Unable to find compressor for codec {}", codec.getClass().getName());
+        }
+
+        decompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() {
+          public Object makeObject() throws Exception {
+            return codec.createDecompressor();
+          }
+        }, Integer.MAX_VALUE);
+
+        Object decom = decompressorPool.borrowObject();
+        if (decom != null) {
+          dePools.put(decom.getClass(), decompressorPool);
+          decompressorPool.returnObject(decom);
+        } else {
+          logger.warn("Unable to find decompressor for codec {}", codec.getClass().getName());
+        }
+
+        if (supportDirectDecompressor) {
+          directDecompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() {
+            public Object makeObject() throws Exception {
+              return ((DirectDecompressionCodec) codec).createDirectDecompressor();
+            }
+          }, Integer.MAX_VALUE);
+
+          Object ddecom = directDecompressorPool.borrowObject();
+          if (ddecom != null) {
+            directDePools.put(ddecom.getClass(), directDecompressorPool);
+            directDecompressorPool.returnObject(ddecom);
+
+          } else {
+            supportDirectDecompressor = false;
+            logger.warn("Unable to find direct decompressor for codec {}", codec.getClass().getName());
+          }
+
+        } else {
+          directDecompressorPool = null;
+        }
+
+        this.supportDirectDecompressor = supportDirectDecompressor;
+      } catch (Exception e) {
+        throw new DrillRuntimeException(e);
+      }
+    }
+
+    public DirectDecompressor borrowDirectDecompressor(){
+      Preconditions.checkArgument(supportDirectDecompressor, "Tried to get a direct Decompressor from a non-direct codec.");
+      try {
+        return (DirectDecompressor) directDecompressorPool.borrowObject();
+      } catch (Exception e) {
+        throw new DrillRuntimeException(e);
+      }
+    }
+
+    public boolean supportsDirectDecompression() {
+      return supportDirectDecompressor;
+    }
+
+    public Decompressor borrowDecompressor(){
+      try {
+        return (Decompressor) decompressorPool.borrowObject();
+      } catch (Exception e) {
+        throw new DrillRuntimeException(e);
+      }
+    }
+
+    public Compressor borrowCompressor(){
+      try {
+        return (Compressor) compressorPool.borrowObject();
+      } catch (Exception e) {
+        throw new DrillRuntimeException(e);
+      }
+    }
+  }
+
+  public CodecPool codec(CompressionCodec codec){
+    CodecPool pools = codecs.get(codec);
+    if(pools == null){
+      synchronized(this){
+        pools = codecs.get(codec);
+        if(pools == null){
+          pools = new CodecPool(codec);
+          codecs.put(codec, pools);
+        }
+      }
+    }
+    return pools;
+  }
+
+  private void returnToPool(Object obj, Map<Class<?>, GenericObjectPool> pools) {
+    try {
+      GenericObjectPool pool = pools.get(obj.getClass());
+      if (pool == null) {
+        throw new IllegalStateException("Received unexpected decompressor.");
+      }
+      pool.returnObject(obj);
+    } catch (Exception e) {
+      throw new DrillRuntimeException(e);
+    }
+
+  }
+
+  public void returnCompressor(Compressor compressor) {
+    returnToPool(compressor, cPools);
+  }
+
+  public void returnDecompressor(Decompressor decompressor) {
+    returnToPool(decompressor, dePools);
+  }
+
+  public void returnDecompressor(DirectDecompressor decompressor) {
+    returnToPool(decompressor, directDePools);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
index 45a1dc6..79d1b90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.OperatorContext;
 
 import parquet.bytes.ByteBufferAllocator;
@@ -32,17 +33,21 @@ import parquet.bytes.ByteBufferAllocator;
 public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
 
-  private OperatorContext oContext;
-  private HashMap<Integer, ByteBuf> allocatedBuffers = new HashMap<Integer, ByteBuf>();
+  private final BufferAllocator allocator;
+  private final HashMap<Integer, ByteBuf> allocatedBuffers = new HashMap<Integer, ByteBuf>();
 
   public ParquetDirectByteBufferAllocator(OperatorContext o){
-    oContext=o;
+    allocator = o.getAllocator();
+  }
+
+  public ParquetDirectByteBufferAllocator(BufferAllocator allocator) {
+    this.allocator = allocator;
   }
 
 
   @Override
   public ByteBuffer allocate(int sz) {
-    ByteBuf bb = oContext.getAllocator().buffer(sz);
+    ByteBuf bb = allocator.buffer(sz);
     ByteBuffer b = bb.nioBuffer(0, sz);
     allocatedBuffers.put(System.identityHashCode(b), bb);
     logger.debug("ParquetDirectByteBufferAllocator: Allocated "+sz+" bytes. Allocated ByteBuffer id: "+System.identityHashCode(b));

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index cfa4c93..322a88d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 
 import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ParquetFileWriter;
 
 import com.google.common.collect.ImmutableSet;
@@ -74,7 +73,6 @@ public class ParquetFormatPlugin implements FormatPlugin{
   private static final List<MagicString> MAGIC_STRINGS = Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC));
 
   private final DrillbitContext context;
-  private final CodecFactoryExposer codecFactoryExposer;
   private final Configuration fsConf;
   private final ParquetFormatMatcher formatMatcher;
   private final ParquetFormatConfig config;
@@ -89,7 +87,6 @@ public class ParquetFormatPlugin implements FormatPlugin{
   public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
       StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
     this.context = context;
-    this.codecFactoryExposer = new CodecFactoryExposer(fsConf);
     this.config = formatConfig;
     this.formatMatcher = new ParquetFormatMatcher(this);
     this.storageConfig = storageConfig;
@@ -171,10 +168,6 @@ public class ParquetFormatPlugin implements FormatPlugin{
     return storageConfig;
   }
 
-  public CodecFactoryExposer getCodecFactoryExposer() {
-    return codecFactoryExposer;
-  }
-
   public String getName(){
     return name;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 3506ffa..8615eb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -80,6 +80,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private boolean validating = false;
   private CompressionCodecName codec = CompressionCodecName.SNAPPY;
   private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
+  private DirectCodecFactory codecFactory;
 
   private long recordCount = 0;
   private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
@@ -100,6 +101,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
     super();
     this.oContext = context.newOperatorContext(writer, true);
+    this.codecFactory = new DirectCodecFactory(writer.getFormatPlugin().getFsConf(), oContext.getAllocator());
   }
 
   @Override
@@ -156,10 +158,9 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
     int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5);
     pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext,
-      codec,
-      pageSize,
-      this.schema,
-      initialBlockBufferSize);
+        codecFactory.getCompressor(codec, pageSize),
+        schema,
+        initialBlockBufferSize);
     int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
     store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(this.schema);
@@ -332,6 +333,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
       ColumnChunkPageWriteStoreExposer.close(pageStore);
     }
 
+    codecFactory.close();
+
     if (!hasRecords) {
       // the very last file is empty, delete it (DRILL-2408)
       Path path = getPath();

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index d5586ce..d5b7303 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -39,7 +39,6 @@ import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
-import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
@@ -130,7 +129,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
           readers.add(
               new ParquetRecordReader(
                   context, e.getPath(), e.getRowGroupIndex(), fs,
-                  rowGroupScan.getStorageEngine().getCodecFactoryExposer(),
+                  new DirectCodecFactory(fs.getConf(), oContext.getAllocator()),
                   footers.get(e.getPath()),
                   rowGroupScan.getColumns()
               )

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 6a41a04..28a8b23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -28,6 +28,9 @@ import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.store.parquet.ColumnDataReader;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.ByteBufBytesInput;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor;
 import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,8 +48,6 @@ import parquet.format.PageHeader;
 import parquet.format.PageType;
 import parquet.format.Util;
 import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
-import parquet.hadoop.CodecFactoryExposer.HadoopByteBufBytesInput;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.schema.PrimitiveType;
@@ -101,13 +102,13 @@ final class PageReader {
   // These need to be held throughout reading of the entire column chunk
   List<ByteBuf> allocatedDictionaryBuffers;
 
-  private final CodecFactoryExposer codecFactory;
+  private final DirectCodecFactory codecFactory;
 
   PageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
     throws ExecutionSetupException{
     this.parentColumnReader = parentStatus;
     allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
-    codecFactory = parentColumnReader.parentReader.getCodecFactoryExposer();
+    codecFactory = parentColumnReader.parentReader.getCodecFactory();
 
     long start = columnChunkMetaData.getFirstDataPageOffset();
     try {
@@ -137,10 +138,12 @@ final class PageReader {
         final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
         try {
           dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
-          codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+          DirectBytesDecompressor decompressor = codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData
+              .getCodec());
+          decompressor.decompress(
               compressedData,
-              dictionaryData,
               pageHeader.compressed_page_size,
+              dictionaryData,
               pageHeader.getUncompressed_page_size());
 
         } finally {
@@ -149,7 +152,7 @@ final class PageReader {
       }
 
       DictionaryPage page = new DictionaryPage(
-          getBytesInput(dictionaryData),
+          asBytesInput(dictionaryData),
           pageHeader.uncompressed_page_size,
           pageHeader.dictionary_page_header.num_values,
           parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
@@ -158,9 +161,8 @@ final class PageReader {
     }
   }
 
-  public static BytesInput getBytesInput(DrillBuf uncompressedByteBuf) throws IOException {
-    final ByteBuffer outBuffer = uncompressedByteBuf.nioBuffer(0, uncompressedByteBuf.capacity());
-    return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit());
+  public static BytesInput asBytesInput(DrillBuf buf) throws IOException {
+    return new ByteBufBytesInput(buf);
   }
 
   /**
@@ -197,17 +199,17 @@ final class PageReader {
           final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
           try{
             dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
-            codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+            codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec()).decompress(
                 compressedData,
-                uncompressedData,
                 pageHeader.compressed_page_size,
+                uncompressedData,
                 pageHeader.getUncompressed_page_size());
           } finally {
             compressedData.release();
           }
         }
         DictionaryPage page = new DictionaryPage(
-            getBytesInput(uncompressedData),
+            asBytesInput(uncompressedData),
             pageHeader.uncompressed_page_size,
             pageHeader.dictionary_page_header.num_values,
             parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
@@ -225,10 +227,10 @@ final class PageReader {
     }else{
       final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
       dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
-      codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+      codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec()).decompress(
           compressedData,
-          pageData,
           pageHeader.compressed_page_size,
+          pageData,
           pageHeader.getUncompressed_page_size());
       compressedData.release();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 11d0042..2072aae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -30,7 +30,6 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -39,8 +38,8 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
 import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -51,7 +50,6 @@ import parquet.column.ColumnDescriptor;
 import parquet.format.FileMetaData;
 import parquet.format.SchemaElement;
 import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ParquetFileWriter;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
@@ -103,36 +101,41 @@ public class ParquetRecordReader extends AbstractRecordReader {
   // records specified in the row group metadata
   long mockRecordsRead;
 
-  private final CodecFactoryExposer codecFactoryExposer;
+  private final DirectCodecFactory codecFactory;
   int rowGroupIndex;
   long totalRecordsRead;
 
-  public ParquetRecordReader(FragmentContext fragmentContext, //
-                             String path, //
-                             int rowGroupIndex, //
-                             FileSystem fs, //
-                             CodecFactoryExposer codecFactoryExposer, //
-                             ParquetMetadata footer, //
+  public ParquetRecordReader(FragmentContext fragmentContext,
+      String path,
+      int rowGroupIndex,
+      FileSystem fs,
+      DirectCodecFactory codecFactory,
+      ParquetMetadata footer,
                              List<SchemaPath> columns) throws ExecutionSetupException {
-    this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer,
+    this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer,
         columns);
   }
 
-  public ParquetRecordReader(FragmentContext fragmentContext, long batchSize,
-                             String path, int rowGroupIndex, FileSystem fs,
-                             CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer,
-                             List<SchemaPath> columns) throws ExecutionSetupException {
+  public ParquetRecordReader(
+      FragmentContext fragmentContext,
+      long batchSize,
+      String path,
+      int rowGroupIndex,
+      FileSystem fs,
+      DirectCodecFactory codecFactory,
+      ParquetMetadata footer,
+      List<SchemaPath> columns) throws ExecutionSetupException {
     this.hadoopPath = new Path(path);
     this.fileSystem = fs;
-    this.codecFactoryExposer = codecFactoryExposer;
+    this.codecFactory = codecFactory;
     this.rowGroupIndex = rowGroupIndex;
     this.batchSize = batchSize;
     this.footer = footer;
     setColumns(columns);
   }
 
-  public CodecFactoryExposer getCodecFactoryExposer() {
-    return codecFactoryExposer;
+  public DirectCodecFactory getCodecFactory() {
+    return codecFactory;
   }
 
   public Path getHadoopPath() {
@@ -452,6 +455,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
     }
     columnStatuses.clear();
 
+    codecFactory.close();
+
     for (VarLengthColumn r : varLengthReader.columns) {
       r.clear();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 921d134..07950df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -18,16 +18,14 @@
 package org.apache.drill.exec.store.parquet2;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.Collection;
-import java.util.ArrayList;
-import java.util.Map;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
-import com.google.common.collect.Sets;
-
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
@@ -43,37 +41,30 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.MaterializedField.Key;
 import org.apache.drill.exec.store.AbstractRecordReader;
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
 import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.BaseValueVector;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import parquet.column.ColumnDescriptor;
 import parquet.common.schema.ColumnPath;
-import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ColumnChunkIncReadStore;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.io.ColumnIOFactory;
-import parquet.io.InvalidRecordException;
 import parquet.io.MessageColumnIO;
 import parquet.schema.GroupType;
 import parquet.schema.MessageType;
 import parquet.schema.Type;
-import parquet.schema.PrimitiveType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-
-import parquet.schema.Types;
+import com.google.common.collect.Sets;
 
 public class DrillParquetReader extends AbstractRecordReader {
 
@@ -247,7 +238,6 @@ public class DrillParquetReader extends AbstractRecordReader {
         paths.put(md.getPath(), md);
       }
 
-      CodecFactoryExposer codecFactoryExposer = new CodecFactoryExposer(fileSystem.getConf());
       Path filePath = new Path(entry.getPath());
 
       BlockMetaData blockMetaData = footer.getBlocks().get(entry.getRowGroupIndex());
@@ -255,7 +245,8 @@ public class DrillParquetReader extends AbstractRecordReader {
       recordCount = (int) blockMetaData.getRowCount();
 
       pageReadStore = new ColumnChunkIncReadStore(recordCount,
-              codecFactoryExposer.getCodecFactory(), operatorContext.getAllocator(), fileSystem, filePath);
+          new DirectCodecFactory(fileSystem.getConf(), operatorContext.getAllocator()), operatorContext.getAllocator(),
+          fileSystem, filePath);
 
       for (String[] path : schema.getPaths()) {
         Type type = schema.getType(path);

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
deleted file mode 100644
index 5438660..0000000
--- a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.hadoop;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DirectDecompressionCodec;
-import org.apache.hadoop.io.compress.DirectDecompressor;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import parquet.bytes.BytesInput;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-public class CodecFactoryExposer{
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CodecFactoryExposer.class);
-
-  private CodecFactory codecFactory;
-  private final Map<String, org.apache.hadoop.io.compress.DirectDecompressionCodec> codecByName = new HashMap<String, org.apache.hadoop.io.compress.DirectDecompressionCodec>();
-  private Configuration configuration;
-
-  public CodecFactoryExposer(Configuration config){
-    codecFactory = new CodecFactory(config);configuration = config;
-  }
-
-  public CodecFactory getCodecFactory() {
-    return codecFactory;
-  }
-
-  public BytesInput decompress(BytesInput bytes, int uncompressedSize, CompressionCodecName codecName) throws IOException {
-    return codecFactory.getDecompressor(codecName).decompress(bytes, uncompressedSize);
-  }
-
-  public static BytesInput getBytesInput(ByteBuf uncompressedByteBuf, int uncompressedSize) throws IOException {
-    ByteBuffer outBuffer=uncompressedByteBuf.nioBuffer(0, uncompressedSize);
-    return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit());
-  }
-
-  public void decompress(CompressionCodecName codecName,
-      final DrillBuf compressedByteBuf,
-      final DrillBuf uncompressedByteBuf,
-                               int compressedSize,
-                               int uncompressedSize) throws IOException {
-    final ByteBuffer inpBuffer = compressedByteBuf.nioBuffer(0, compressedSize);
-    final ByteBuffer outBuffer = uncompressedByteBuf.nioBuffer(0, uncompressedSize);
-    CompressionCodec c = getCodec(codecName);
-    //TODO: Create the decompressor only once at init time.
-    Class<?> cx = c.getClass();
-
-    DirectDecompressionCodec d=null;
-    DirectDecompressor decompr=null;
-
-    if (DirectDecompressionCodec.class.isAssignableFrom(cx)) {
-      d=(DirectDecompressionCodec)c;
-    }
-
-    if(d!=null) {
-      decompr = d.createDirectDecompressor();
-    }
-
-    if(d!=null && decompr!=null){
-      decompr.decompress(inpBuffer, outBuffer);
-    }else{
-      logger.warn("This Hadoop implementation does not support a " + codecName +
-        " direct decompression codec interface. "+
-        "Direct decompression is available only on *nix systems with Hadoop 2.3 or greater. "+
-        "Read operations will be a little slower. ");
-      BytesInput outBytesInp = this.decompress(
-        new HadoopByteBufBytesInput(inpBuffer, 0, inpBuffer.limit()),
-        uncompressedSize,
-        codecName);
-      // COPY the data back into the output buffer.
-      // (DrillBufs can only refer to direct memory, so we cannot pass back a BytesInput backed
-      // by a byte array).
-      outBuffer.put(outBytesInp.toByteArray());
-    }
-  }
-
-  private DirectDecompressionCodec getCodec(CompressionCodecName codecName) {
-    String codecClassName = codecName.getHadoopCompressionCodecClassName();
-    if (codecClassName == null) {
-      return null;
-    }
-    DirectDecompressionCodec codec = codecByName.get(codecClassName);
-    if (codec != null) {
-      return codec;
-    }
-
-    try {
-      Class<?> codecClass = Class.forName(codecClassName);
-      codec = (DirectDecompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
-      codecByName.put(codecClassName, codec);
-      return codec;
-    } catch (ClassNotFoundException e) {
-      throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
-    }
-  }
-
-  public static class HadoopByteBufBytesInput extends BytesInput {
-
-    private final ByteBuffer byteBuf;
-    private final int length;
-    private final int offset;
-
-    public HadoopByteBufBytesInput(ByteBuffer byteBuf, int offset, int length) {
-      super();
-      this.byteBuf = byteBuf;
-      this.offset = offset;
-      this.length = length;
-    }
-
-    @Override
-    public void writeAllTo(OutputStream out) throws IOException {
-      final WritableByteChannel outputChannel = Channels.newChannel(out);
-      byteBuf.position(offset);
-      ByteBuffer tempBuf = byteBuf.slice();
-      tempBuf.limit(length);
-      outputChannel.write(tempBuf);
-    }
-
-    @Override
-    public ByteBuffer toByteBuffer() throws IOException {
-      byteBuf.position(offset);
-      ByteBuffer buf = byteBuf.slice();
-      buf.limit(length);
-      return buf;
-    }
-
-    @Override
-    public long size() {
-      return length;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
index 242cd28..6337d4c 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -28,7 +28,7 @@ import java.util.Map;
 
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -52,14 +52,15 @@ public class ColumnChunkIncReadStore implements PageReadStore {
 
   private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
 
-  private CodecFactory codecFactory = new CodecFactory(new Configuration());
+  private DirectCodecFactory codecFactory;
   private BufferAllocator allocator;
   private FileSystem fs;
   private Path path;
   private long rowCount;
   private List<FSDataInputStream> streams = new ArrayList();
 
-  public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator, FileSystem fs, Path path) {
+  public ColumnChunkIncReadStore(long rowCount, DirectCodecFactory codecFactory, BufferAllocator allocator,
+      FileSystem fs, Path path) {
     this.codecFactory = codecFactory;
     this.allocator = allocator;
     this.fs = fs;

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
index 0e9dec0..743d185 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
@@ -21,21 +21,19 @@ import java.io.IOException;
 
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
-import org.apache.hadoop.conf.Configuration;
 
 import parquet.column.page.PageWriteStore;
 import parquet.hadoop.CodecFactory.BytesCompressor;
-import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.schema.MessageType;
 
 public class ColumnChunkPageWriteStoreExposer {
 
-  public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(OperatorContext oContext,
-                                                                       CompressionCodecName codec,
-                                                                       int pageSize,
-                                                                       MessageType schema,
-                                                                       int initialSize) {
-    BytesCompressor compressor = new CodecFactory(new Configuration()).getCompressor(codec, pageSize);
+  public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(
+      OperatorContext oContext,
+      BytesCompressor compressor,
+      MessageType schema,
+      int initialSize
+      ) {
     return new ColumnChunkPageWriteStore(compressor, schema, initialSize, new ParquetDirectByteBufferAllocator(oContext));
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java
new file mode 100644
index 0000000..644144e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import io.netty.buffer.DrillBuf;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.drill.common.DeferredException;
+import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.ByteBufBytesInput;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import parquet.bytes.BytesInput;
+import parquet.hadoop.CodecFactory.BytesCompressor;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+public class TestDirectCodecFactory extends ExecTest {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestDirectCodecFactory.class);
+
+  private static enum Decompression {
+    ON_HEAP, OFF_HEAP, DRILLBUF
+  }
+
+  private void test(int size, CompressionCodecName codec, boolean useOnHeapCompression, Decompression decomp) {
+    DrillBuf rawBuf = null;
+    DrillBuf outBuf = null;
+    try (BufferAllocator allocator = new TopLevelAllocator();
+        DirectCodecFactory codecFactory = new DirectCodecFactory(new Configuration(), allocator)) {
+      try {
+        rawBuf = allocator.buffer(size);
+        final byte[] rawArr = new byte[size];
+        outBuf = allocator.buffer(size * 2);
+        Random r = new Random();
+        byte[] random = new byte[1024];
+        int pos = 0;
+        while (pos < size) {
+          r.nextBytes(random);
+          rawBuf.writeBytes(random);
+          System.arraycopy(random, 0, rawArr, pos, random.length);
+          pos += random.length;
+        }
+
+        BytesCompressor c = codecFactory.getCompressor(codec, 64 * 1024);
+        DirectBytesDecompressor d = codecFactory.getDecompressor(codec);
+
+        BytesInput compressed;
+        if (useOnHeapCompression) {
+          compressed = c.compress(BytesInput.from(rawArr));
+        } else {
+          compressed = c.compress(new ByteBufBytesInput(rawBuf));
+        }
+
+        switch (decomp) {
+        case DRILLBUF: {
+          ByteBuffer buf = compressed.toByteBuffer();
+          DrillBuf b = allocator.buffer(buf.capacity());
+          try {
+            b.writeBytes(buf);
+            d.decompress(b, (int) compressed.size(), outBuf, size);
+            for (int i = 0; i < size; i++) {
+              Assert.assertTrue("Data didn't match at " + i, outBuf.getByte(i) == rawBuf.getByte(i));
+            }
+          } finally {
+            b.release();
+          }
+          break;
+        }
+
+        case OFF_HEAP: {
+          ByteBuffer buf = compressed.toByteBuffer();
+          DrillBuf b = allocator.buffer(buf.capacity());
+          try {
+            b.writeBytes(buf);
+            BytesInput input = d.decompress(new ByteBufBytesInput(b), size);
+            Assert.assertArrayEquals(input.toByteArray(), rawArr);
+          } finally {
+            b.release();
+          }
+          break;
+        }
+        case ON_HEAP: {
+          byte[] buf = compressed.toByteArray();
+          BytesInput input = d.decompress(BytesInput.from(buf), size);
+          Assert.assertArrayEquals(input.toByteArray(), rawArr);
+          break;
+        }
+        }
+      } catch (Exception e) {
+        String msg = String.format(
+            "Failure while testing Codec: %s, OnHeapCompressionInput: %s, Decompression Mode: %s, Data Size: %d",
+            codec.name(),
+            useOnHeapCompression, decomp.name(), size);
+        System.out.println(msg);
+        throw new RuntimeException(msg, e);
+      } finally {
+        if (rawBuf != null) {
+          rawBuf.release();
+        }
+        if (outBuf != null) {
+          outBuf.release();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void compressionCodecs() throws Exception {
+    int[] sizes = { 4 * 1024, 1 * 1024 * 1024 };
+    boolean[] comp = { true, false };
+
+    try (DeferredException ex = new DeferredException()) {
+      for (int size : sizes) {
+        for (boolean useOnHeapComp : comp) {
+          for (Decompression decomp : Decompression.values()) {
+            for (CompressionCodecName codec : CompressionCodecName.values()) {
+              if (codec == CompressionCodecName.LZO) {
+                // not installed as gpl.
+                continue;
+              }
+              try {
+                test(size, codec, useOnHeapComp, decomp);
+              } catch (Exception e) {
+                ex.addException(e);
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index e50e3fb..83a1cb8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -70,7 +70,6 @@ import parquet.bytes.BytesInput;
 import parquet.column.page.DataPageV1;
 import parquet.column.page.PageReadStore;
 import parquet.column.page.PageReader;
-import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.Footer;
 import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.metadata.ParquetMetadata;
@@ -625,7 +624,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
     BufferAllocator allocator = new TopLevelAllocator();
     for(int i = 0; i < 25; i++) {
       ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs,
-          new CodecFactoryExposer(dfsConfig), f.getParquetMetadata(), columns);
+          new DirectCodecFactory(dfsConfig, allocator), f.getParquetMetadata(), columns);
       TestOutputMutator mutator = new TestOutputMutator(allocator);
       rr.setup(mutator);
       Stopwatch watch = new Stopwatch();

http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f0f4bc5..a207f2a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
     <proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path>
     <dep.junit.version>4.11</dep.junit.version>
     <dep.slf4j.version>1.7.6</dep.slf4j.version>
-    <parquet.version>1.6.0rc3-drill-r0.1</parquet.version>
+    <parquet.version>1.6.0rc3-drill-r0.3</parquet.version>
   </properties>
 
   <scm>