You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/06 14:40:51 UTC
drill git commit: DRILL-2959: Make sure to close out compression
codecs.
Repository: drill
Updated Branches:
refs/heads/DRILL-2959v2 [created] e1fb13f47
DRILL-2959: Make sure to close out compression codecs.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/e1fb13f4
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/e1fb13f4
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/e1fb13f4
Branch: refs/heads/DRILL-2959v2
Commit: e1fb13f47d3332f643e3229a5553652b4afa20ab
Parents: 3b19076
Author: Jacques Nadeau <ja...@apache.org>
Authored: Mon May 4 18:14:38 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Wed May 6 13:33:05 2015 +0100
----------------------------------------------------------------------
.../exec/store/parquet/DirectCodecFactory.java | 379 +++++++++++++++++++
.../exec/store/parquet/DirectCodecPool.java | 187 +++++++++
.../ParquetDirectByteBufferAllocator.java | 13 +-
.../exec/store/parquet/ParquetFormatPlugin.java | 7 -
.../exec/store/parquet/ParquetRecordWriter.java | 11 +-
.../store/parquet/ParquetScanBatchCreator.java | 3 +-
.../store/parquet/columnreaders/PageReader.java | 32 +-
.../columnreaders/ParquetRecordReader.java | 41 +-
.../exec/store/parquet2/DrillParquetReader.java | 23 +-
.../parquet/hadoop/CodecFactoryExposer.java | 160 --------
.../parquet/hadoop/ColumnChunkIncReadStore.java | 7 +-
.../ColumnChunkPageWriteStoreExposer.java | 14 +-
.../exec/store/TestDirectCodecFactory.java | 155 ++++++++
.../store/parquet/ParquetRecordReaderTest.java | 3 +-
pom.xml | 2 +-
15 files changed, 797 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java
new file mode 100644
index 0000000..ed455a2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecFactory.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.IdentityHashMap;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+import org.xerial.snappy.Snappy;
+
+import parquet.bytes.ByteBufferAllocator;
+import parquet.bytes.BytesInput;
+import parquet.hadoop.CodecFactory;
+import parquet.hadoop.CodecFactory.BytesCompressor;
+import parquet.hadoop.HeapCodecFactory.HeapBytesCompressor;
+import parquet.hadoop.HeapCodecFactory.HeapBytesDecompressor;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import com.google.common.base.Preconditions;
+
+public class DirectCodecFactory extends CodecFactory<BytesCompressor, DirectBytesDecompressor> implements AutoCloseable {
+
+ private final ByteBufferAllocator allocator;
+ private final IdentityHashMap<ByteBuffer, Integer> allocatedBuffers = new IdentityHashMap<ByteBuffer, Integer>();
+
+ public DirectCodecFactory(Configuration config, ByteBufferAllocator allocator) {
+ super(config);
+ Preconditions.checkNotNull(allocator);
+ this.allocator = allocator;
+ }
+
+ public DirectCodecFactory(Configuration config, BufferAllocator allocator) {
+ this(config, new ParquetDirectByteBufferAllocator(allocator));
+ }
+
+ private ByteBuffer ensure(ByteBuffer buffer, int size) {
+ if (buffer == null) {
+ buffer = allocator.allocate(size);
+ allocatedBuffers.put(buffer, 0);
+ } else if (buffer.capacity() >= size) {
+ buffer.clear();
+ } else {
+ allocator.release(buffer);
+ release(buffer);
+ buffer = allocator.allocate(size);
+ allocatedBuffers.put(buffer, 0);
+ }
+ return buffer;
+ }
+
+ ByteBuffer release(ByteBuffer buffer) {
+ if (buffer != null) {
+ allocator.release(buffer);
+ allocatedBuffers.remove(buffer);
+ }
+ return null;
+ }
+
+ @Override
+ protected BytesCompressor createCompressor(final CompressionCodecName codecName, final CompressionCodec codec,
+ int pageSize) {
+
+ if (codec == null) {
+ return new NoopCompressor();
+ } else if (codecName == CompressionCodecName.SNAPPY) {
+ // avoid using the Parquet Snappy codec since it allocates direct buffers at awkward spots.
+ return new SnappyCompressor();
+ } else {
+
+ // todo: move zlib above since it also generates allocateDirect calls.
+ return new HeapBytesCompressor(codecName, codec, pageSize);
+ }
+ }
+
+ @Override
+ protected DirectBytesDecompressor createDecompressor(final CompressionCodec codec) {
+ // if (true) {
+ // return new HeapFakeDirect(codec);
+ // }
+
+ if (codec == null) {
+ return new NoopDecompressor();
+ } else if (DirectCodecPool.INSTANCE.codec(codec).supportsDirectDecompression()) {
+ return new FullDirectDecompressor(codec);
+ } else {
+ // return new HeapFakeDirect(codec);
+ return new IndirectDecompressor(codec);
+ }
+ }
+
+ public void close() {
+ release();
+ }
+
+ private class HeapFakeDirect extends DirectBytesDecompressor {
+
+ private final ExposedHeapBytesDecompressor innerCompressor;
+
+ public HeapFakeDirect(CompressionCodec codec){
+ innerCompressor = new ExposedHeapBytesDecompressor(codec);
+ }
+
+ @Override
+ public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+ throws IOException {
+ BytesInput uncompressed = decompress(new ByteBufBytesInput(input), uncompressedSize);
+ output.clear();
+ output.setBytes(0, uncompressed.toByteArray());
+ output.writerIndex((int) uncompressed.size());
+ }
+
+ @Override
+ public BytesInput decompress(BytesInput paramBytesInput, int uncompressedSize) throws IOException {
+ return innerCompressor.decompress(paramBytesInput, uncompressedSize);
+ }
+
+ @Override
+ protected void release() {
+ innerCompressor.release();
+ }
+
+ }
+
+ private class ExposedHeapBytesDecompressor extends HeapBytesDecompressor {
+ public ExposedHeapBytesDecompressor(CompressionCodec codec) {
+ super(codec);
+ }
+
+ public void release() {
+ super.release();
+ }
+ }
+
+ public class IndirectDecompressor extends DirectBytesDecompressor {
+ private final Decompressor decompressor;
+
+ public IndirectDecompressor(CompressionCodec codec) {
+ this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor();
+ }
+
+ @Override
+ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+ decompressor.reset();
+ byte[] inputBytes = bytes.toByteArray();
+ decompressor.setInput(inputBytes, 0, inputBytes.length);
+ byte[] output = new byte[uncompressedSize];
+ decompressor.decompress(output, 0, uncompressedSize);
+ return BytesInput.from(output);
+ }
+
+ @Override
+ public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+ throws IOException {
+
+ decompressor.reset();
+ byte[] inputBytes = new byte[input.capacity()];
+ input.getBytes(0, inputBytes);
+ decompressor.setInput(inputBytes, 0, inputBytes.length);
+ byte[] outputBytes = new byte[uncompressedSize];
+ decompressor.decompress(outputBytes, 0, uncompressedSize);
+ output.clear();
+ output.writeBytes(outputBytes);
+ }
+
+ @Override
+ protected void release() {
+ DirectCodecPool.INSTANCE.returnDecompressor(decompressor);
+ }
+ }
+
+ public class FullDirectDecompressor extends DirectBytesDecompressor {
+ private final DirectDecompressor decompressor;
+ private ByteBuffer compressedBuffer;
+ private ByteBuffer uncompressedBuffer;
+ private ExposedHeapBytesDecompressor extraDecompressor;
+ public FullDirectDecompressor(CompressionCodec codec){
+ this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDirectDecompressor();
+ this.extraDecompressor = new ExposedHeapBytesDecompressor(codec);
+ }
+
+ @Override
+ public BytesInput decompress(BytesInput compressedBytes, int uncompressedSize) throws IOException {
+
+ if(false){
+ // TODO: fix direct path. (currently, this code is causing issues when writing complex Parquet files.
+ ByteBuffer bufferIn = compressedBytes.toByteBuffer();
+ uncompressedBuffer = ensure(uncompressedBuffer, uncompressedSize);
+ uncompressedBuffer.clear();
+
+ if (bufferIn.isDirect()) {
+ decompressor.decompress(bufferIn, uncompressedBuffer);
+ } else {
+ compressedBuffer = ensure(this.compressedBuffer, (int) compressedBytes.size());
+ compressedBuffer.clear();
+ compressedBuffer.put(bufferIn);
+ compressedBuffer.flip();
+ decompressor.decompress(compressedBuffer, uncompressedBuffer);
+ }
+ return BytesInput.from(uncompressedBuffer, 0, uncompressedSize);
+
+ } else {
+ return extraDecompressor.decompress(compressedBytes, uncompressedSize);
+ }
+
+
+ }
+
+
+ @Override
+ public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+ throws IOException {
+ output.clear();
+ decompressor.decompress(input.nioBuffer(0, compressedSize), output.nioBuffer(0, uncompressedSize));
+ output.writerIndex(uncompressedSize);
+ }
+
+ @Override
+ protected void release() {
+ compressedBuffer = DirectCodecFactory.this.release(compressedBuffer);
+ uncompressedBuffer = DirectCodecFactory.this.release(uncompressedBuffer);
+ DirectCodecPool.INSTANCE.returnDecompressor(decompressor);
+ extraDecompressor.release();
+ }
+
+ }
+
+ public class NoopDecompressor extends DirectBytesDecompressor {
+
+ @Override
+ public void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+ throws IOException {
+ Preconditions.checkArgument(compressedSize == uncompressedSize,
+ "Non-compressed data did not have matching compressed and uncompressed sizes.");
+ output.clear();
+ output.writeBytes(input, compressedSize);
+ }
+
+ @Override
+ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+ return bytes;
+ }
+
+ @Override
+ protected void release() {
+ }
+
+ }
+
+ public class SnappyCompressor extends BytesCompressor {
+
+ private ByteBuffer incoming;
+ private ByteBuffer outgoing;
+
+ public SnappyCompressor() {
+ super();
+ }
+
+ @Override
+ public BytesInput compress(BytesInput bytes) throws IOException {
+ int maxOutputSize = Snappy.maxCompressedLength((int) bytes.size());
+ ByteBuffer bufferIn = bytes.toByteBuffer();
+ outgoing = ensure(outgoing, maxOutputSize);
+ final int size;
+ if (bufferIn.isDirect()) {
+ size = Snappy.compress(bufferIn, outgoing);
+ } else {
+ this.incoming = ensure(this.incoming, (int) bytes.size());
+ this.incoming.put(bufferIn);
+ this.incoming.flip();
+ size = Snappy.compress(this.incoming, outgoing);
+ }
+
+ return BytesInput.from(outgoing, 0, (int) size);
+ }
+
+ @Override
+ public CompressionCodecName getCodecName() {
+ return CompressionCodecName.SNAPPY;
+ }
+
+ @Override
+ protected void release() {
+ outgoing = DirectCodecFactory.this.release(outgoing);
+ incoming = DirectCodecFactory.this.release(incoming);
+ }
+
+ }
+
+ public static class NoopCompressor extends BytesCompressor {
+
+ @Override
+ public BytesInput compress(BytesInput bytes) throws IOException {
+ return bytes;
+ }
+
+ @Override
+ public CompressionCodecName getCodecName() {
+ return CompressionCodecName.UNCOMPRESSED;
+ }
+
+ @Override
+ protected void release() {
+ }
+
+ }
+
+ public static class ByteBufBytesInput extends BytesInput {
+ private final ByteBuf buf;
+ private final int length;
+
+ public ByteBufBytesInput(ByteBuf buf) {
+ this(buf, 0, buf.capacity());
+ }
+
+ public ByteBufBytesInput(ByteBuf buf, int offset, int length) {
+ super();
+ if(buf.capacity() == length && offset == 0){
+ this.buf = buf;
+ }else{
+ this.buf = buf.slice(offset, length);
+ }
+
+ this.length = length;
+ }
+
+ @Override
+ public void writeAllTo(OutputStream out) throws IOException {
+ final WritableByteChannel outputChannel = Channels.newChannel(out);
+ outputChannel.write(buf.nioBuffer());
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer() throws IOException {
+ return buf.nioBuffer();
+ }
+
+ @Override
+ public long size() {
+ return length;
+ }
+ }
+
+
+ public abstract class DirectBytesDecompressor extends CodecFactory.BytesDecompressor {
+ public abstract void decompress(DrillBuf input, int compressedSize, DrillBuf output, int uncompressedSize)
+ throws IOException;
+ }
+
+
+
+}
+
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java
new file mode 100644
index 0000000..26d97c9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/DirectCodecPool.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.commons.pool.BasePoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressionCodec;
+import org.apache.hadoop.io.compress.DirectDecompressor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class DirectCodecPool {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectCodecPool.class);
+
+ public static final DirectCodecPool INSTANCE = new DirectCodecPool();
+
+ @SuppressWarnings("unchecked")
+ private final Map<CompressionCodec, CodecPool> codecs = (Map<CompressionCodec, CodecPool>) (Object) Collections.synchronizedMap(Maps.newHashMap());
+
+ @SuppressWarnings("unchecked")
+ private final Map<Class<?>, GenericObjectPool> directDePools = (Map<Class<?>, GenericObjectPool>) (Object) Collections
+ .synchronizedMap(Maps.newHashMap());
+ private final Map<Class<?>, GenericObjectPool> dePools = (Map<Class<?>, GenericObjectPool>) (Object) Collections
+ .synchronizedMap(Maps.newHashMap());
+ private final Map<Class<?>, GenericObjectPool> cPools = (Map<Class<?>, GenericObjectPool>) (Object) Collections
+ .synchronizedMap(Maps.newHashMap());
+
+ private DirectCodecPool() {
+ }
+
+ public class CodecPool {
+ private final GenericObjectPool compressorPool;
+ private final GenericObjectPool decompressorPool;
+ private final GenericObjectPool directDecompressorPool;
+ private final boolean supportDirectDecompressor;
+
+ private CodecPool(final CompressionCodec codec){
+ try {
+ boolean supportDirectDecompressor = codec instanceof DirectDecompressionCodec;
+ compressorPool = new GenericObjectPool(new BasePoolableObjectFactory() {
+ public Object makeObject() throws Exception {
+ return codec.createCompressor();
+ }
+ }, Integer.MAX_VALUE);
+
+ Object com = compressorPool.borrowObject();
+ if (com != null) {
+ cPools.put(com.getClass(), compressorPool);
+ compressorPool.returnObject(com);
+ }else{
+ logger.warn("Unable to find compressor for codec {}", codec.getClass().getName());
+ }
+
+ decompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() {
+ public Object makeObject() throws Exception {
+ return codec.createDecompressor();
+ }
+ }, Integer.MAX_VALUE);
+
+ Object decom = decompressorPool.borrowObject();
+ if (decom != null) {
+ dePools.put(decom.getClass(), decompressorPool);
+ decompressorPool.returnObject(decom);
+ } else {
+ logger.warn("Unable to find decompressor for codec {}", codec.getClass().getName());
+ }
+
+ if (supportDirectDecompressor) {
+ directDecompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() {
+ public Object makeObject() throws Exception {
+ return ((DirectDecompressionCodec) codec).createDirectDecompressor();
+ }
+ }, Integer.MAX_VALUE);
+
+ Object ddecom = directDecompressorPool.borrowObject();
+ if (ddecom != null) {
+ directDePools.put(ddecom.getClass(), directDecompressorPool);
+ directDecompressorPool.returnObject(ddecom);
+
+ } else {
+ supportDirectDecompressor = false;
+ logger.warn("Unable to find direct decompressor for codec {}", codec.getClass().getName());
+ }
+
+ } else {
+ directDecompressorPool = null;
+ }
+
+ this.supportDirectDecompressor = supportDirectDecompressor;
+ } catch (Exception e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ public DirectDecompressor borrowDirectDecompressor(){
+ Preconditions.checkArgument(supportDirectDecompressor, "Tried to get a direct Decompressor from a non-direct codec.");
+ try {
+ return (DirectDecompressor) directDecompressorPool.borrowObject();
+ } catch (Exception e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ public boolean supportsDirectDecompression() {
+ return supportDirectDecompressor;
+ }
+
+ public Decompressor borrowDecompressor(){
+ try {
+ return (Decompressor) decompressorPool.borrowObject();
+ } catch (Exception e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+
+ public Compressor borrowCompressor(){
+ try {
+ return (Compressor) compressorPool.borrowObject();
+ } catch (Exception e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
+ }
+
+ public CodecPool codec(CompressionCodec codec){
+ CodecPool pools = codecs.get(codec);
+ if(pools == null){
+ synchronized(this){
+ pools = codecs.get(codec);
+ if(pools == null){
+ pools = new CodecPool(codec);
+ codecs.put(codec, pools);
+ }
+ }
+ }
+ return pools;
+ }
+
+ private void returnToPool(Object obj, Map<Class<?>, GenericObjectPool> pools) {
+ try {
+ GenericObjectPool pool = pools.get(obj.getClass());
+ if (pool == null) {
+ throw new IllegalStateException("Received unexpected decompressor.");
+ }
+ pool.returnObject(obj);
+ } catch (Exception e) {
+ throw new DrillRuntimeException(e);
+ }
+
+ }
+
+ public void returnCompressor(Compressor compressor) {
+ returnToPool(compressor, cPools);
+ }
+
+ public void returnDecompressor(Decompressor decompressor) {
+ returnToPool(decompressor, dePools);
+ }
+
+ public void returnDecompressor(DirectDecompressor decompressor) {
+ returnToPool(decompressor, directDePools);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
index 45a1dc6..79d1b90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetDirectByteBufferAllocator.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
import parquet.bytes.ByteBufferAllocator;
@@ -32,17 +33,21 @@ import parquet.bytes.ByteBufferAllocator;
public class ParquetDirectByteBufferAllocator implements ByteBufferAllocator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetDirectByteBufferAllocator.class);
- private OperatorContext oContext;
- private HashMap<Integer, ByteBuf> allocatedBuffers = new HashMap<Integer, ByteBuf>();
+ private final BufferAllocator allocator;
+ private final HashMap<Integer, ByteBuf> allocatedBuffers = new HashMap<Integer, ByteBuf>();
public ParquetDirectByteBufferAllocator(OperatorContext o){
- oContext=o;
+ allocator = o.getAllocator();
+ }
+
+ public ParquetDirectByteBufferAllocator(BufferAllocator allocator) {
+ this.allocator = allocator;
}
@Override
public ByteBuffer allocate(int sz) {
- ByteBuf bb = oContext.getAllocator().buffer(sz);
+ ByteBuf bb = allocator.buffer(sz);
ByteBuffer b = bb.nioBuffer(0, sz);
allocatedBuffers.put(System.identityHashCode(b), bb);
logger.debug("ParquetDirectByteBufferAllocator: Allocated "+sz+" bytes. Allocated ByteBuffer id: "+System.identityHashCode(b));
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index cfa4c93..322a88d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.ParquetFileWriter;
import com.google.common.collect.ImmutableSet;
@@ -74,7 +73,6 @@ public class ParquetFormatPlugin implements FormatPlugin{
private static final List<MagicString> MAGIC_STRINGS = Lists.newArrayList(new MagicString(0, ParquetFileWriter.MAGIC));
private final DrillbitContext context;
- private final CodecFactoryExposer codecFactoryExposer;
private final Configuration fsConf;
private final ParquetFormatMatcher formatMatcher;
private final ParquetFormatConfig config;
@@ -89,7 +87,6 @@ public class ParquetFormatPlugin implements FormatPlugin{
public ParquetFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
StoragePluginConfig storageConfig, ParquetFormatConfig formatConfig){
this.context = context;
- this.codecFactoryExposer = new CodecFactoryExposer(fsConf);
this.config = formatConfig;
this.formatMatcher = new ParquetFormatMatcher(this);
this.storageConfig = storageConfig;
@@ -171,10 +168,6 @@ public class ParquetFormatPlugin implements FormatPlugin{
return storageConfig;
}
- public CodecFactoryExposer getCodecFactoryExposer() {
- return codecFactoryExposer;
- }
-
public String getName(){
return name;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 3506ffa..8615eb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -80,6 +80,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private boolean validating = false;
private CompressionCodecName codec = CompressionCodecName.SNAPPY;
private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
+ private DirectCodecFactory codecFactory;
private long recordCount = 0;
private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
@@ -100,6 +101,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
super();
this.oContext = context.newOperatorContext(writer, true);
+ this.codecFactory = new DirectCodecFactory(writer.getFormatPlugin().getFsConf(), oContext.getAllocator());
}
@Override
@@ -156,10 +158,9 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5);
pageStore = ColumnChunkPageWriteStoreExposer.newColumnChunkPageWriteStore(this.oContext,
- codec,
- pageSize,
- this.schema,
- initialBlockBufferSize);
+ codecFactory.getCompressor(codec, pageSize),
+ schema,
+ initialBlockBufferSize);
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(this.schema);
@@ -332,6 +333,8 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
ColumnChunkPageWriteStoreExposer.close(pageStore);
}
+ codecFactory.close();
+
if (!hasRecords) {
// the very last file is empty, delete it (DRILL-2408)
Path path = getPath();
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index d5586ce..d5b7303 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -39,7 +39,6 @@ import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.store.parquet2.DrillParquetReader;
-import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -130,7 +129,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
readers.add(
new ParquetRecordReader(
context, e.getPath(), e.getRowGroupIndex(), fs,
- rowGroupScan.getStorageEngine().getCodecFactoryExposer(),
+ new DirectCodecFactory(fs.getConf(), oContext.getAllocator()),
footers.get(e.getPath()),
rowGroupScan.getColumns()
)
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 6a41a04..28a8b23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -28,6 +28,9 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.store.parquet.ColumnDataReader;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.ByteBufBytesInput;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor;
import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -45,8 +48,6 @@ import parquet.format.PageHeader;
import parquet.format.PageType;
import parquet.format.Util;
import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
-import parquet.hadoop.CodecFactoryExposer.HadoopByteBufBytesInput;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.schema.PrimitiveType;
@@ -101,13 +102,13 @@ final class PageReader {
// These need to be held throughout reading of the entire column chunk
List<ByteBuf> allocatedDictionaryBuffers;
- private final CodecFactoryExposer codecFactory;
+ private final DirectCodecFactory codecFactory;
PageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
throws ExecutionSetupException{
this.parentColumnReader = parentStatus;
allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
- codecFactory = parentColumnReader.parentReader.getCodecFactoryExposer();
+ codecFactory = parentColumnReader.parentReader.getCodecFactory();
long start = columnChunkMetaData.getFirstDataPageOffset();
try {
@@ -137,10 +138,12 @@ final class PageReader {
final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
try {
dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
- codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+ DirectBytesDecompressor decompressor = codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData
+ .getCodec());
+ decompressor.decompress(
compressedData,
- dictionaryData,
pageHeader.compressed_page_size,
+ dictionaryData,
pageHeader.getUncompressed_page_size());
} finally {
@@ -149,7 +152,7 @@ final class PageReader {
}
DictionaryPage page = new DictionaryPage(
- getBytesInput(dictionaryData),
+ asBytesInput(dictionaryData),
pageHeader.uncompressed_page_size,
pageHeader.dictionary_page_header.num_values,
parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
@@ -158,9 +161,8 @@ final class PageReader {
}
}
- public static BytesInput getBytesInput(DrillBuf uncompressedByteBuf) throws IOException {
- final ByteBuffer outBuffer = uncompressedByteBuf.nioBuffer(0, uncompressedByteBuf.capacity());
- return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit());
+ public static BytesInput asBytesInput(DrillBuf buf) throws IOException {
+ return new ByteBufBytesInput(buf);
}
/**
@@ -197,17 +199,17 @@ final class PageReader {
final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
try{
dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
- codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+ codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec()).decompress(
compressedData,
- uncompressedData,
pageHeader.compressed_page_size,
+ uncompressedData,
pageHeader.getUncompressed_page_size());
} finally {
compressedData.release();
}
}
DictionaryPage page = new DictionaryPage(
- getBytesInput(uncompressedData),
+ asBytesInput(uncompressedData),
pageHeader.uncompressed_page_size,
pageHeader.dictionary_page_header.num_values,
parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
@@ -225,10 +227,10 @@ final class PageReader {
}else{
final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
- codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+ codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec()).decompress(
compressedData,
- pageData,
pageHeader.compressed_page_size,
+ pageData,
pageHeader.getUncompressed_page_size());
compressedData.release();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 11d0042..2072aae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -30,7 +30,6 @@ import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
@@ -39,8 +38,8 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.MaterializedField.Key;
import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
@@ -51,7 +50,6 @@ import parquet.column.ColumnDescriptor;
import parquet.format.FileMetaData;
import parquet.format.SchemaElement;
import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.ParquetFileWriter;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
@@ -103,36 +101,41 @@ public class ParquetRecordReader extends AbstractRecordReader {
// records specified in the row group metadata
long mockRecordsRead;
- private final CodecFactoryExposer codecFactoryExposer;
+ private final DirectCodecFactory codecFactory;
int rowGroupIndex;
long totalRecordsRead;
- public ParquetRecordReader(FragmentContext fragmentContext, //
- String path, //
- int rowGroupIndex, //
- FileSystem fs, //
- CodecFactoryExposer codecFactoryExposer, //
- ParquetMetadata footer, //
+ public ParquetRecordReader(FragmentContext fragmentContext,
+ String path,
+ int rowGroupIndex,
+ FileSystem fs,
+ DirectCodecFactory codecFactory,
+ ParquetMetadata footer,
List<SchemaPath> columns) throws ExecutionSetupException {
- this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer,
+ this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactory, footer,
columns);
}
- public ParquetRecordReader(FragmentContext fragmentContext, long batchSize,
- String path, int rowGroupIndex, FileSystem fs,
- CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer,
- List<SchemaPath> columns) throws ExecutionSetupException {
+ public ParquetRecordReader(
+ FragmentContext fragmentContext,
+ long batchSize,
+ String path,
+ int rowGroupIndex,
+ FileSystem fs,
+ DirectCodecFactory codecFactory,
+ ParquetMetadata footer,
+ List<SchemaPath> columns) throws ExecutionSetupException {
this.hadoopPath = new Path(path);
this.fileSystem = fs;
- this.codecFactoryExposer = codecFactoryExposer;
+ this.codecFactory = codecFactory;
this.rowGroupIndex = rowGroupIndex;
this.batchSize = batchSize;
this.footer = footer;
setColumns(columns);
}
- public CodecFactoryExposer getCodecFactoryExposer() {
- return codecFactoryExposer;
+ public DirectCodecFactory getCodecFactory() {
+ return codecFactory;
}
public Path getHadoopPath() {
@@ -452,6 +455,8 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
columnStatuses.clear();
+ codecFactory.close();
+
for (VarLengthColumn r : varLengthReader.columns) {
r.clear();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 921d134..07950df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -18,16 +18,14 @@
package org.apache.drill.exec.store.parquet2;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
import java.util.Collection;
-import java.util.ArrayList;
-import java.util.Map;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
-import com.google.common.collect.Sets;
-
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.PathSegment;
@@ -43,37 +41,30 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.MaterializedField.Key;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import parquet.column.ColumnDescriptor;
import parquet.common.schema.ColumnPath;
-import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.ColumnChunkIncReadStore;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.ColumnIOFactory;
-import parquet.io.InvalidRecordException;
import parquet.io.MessageColumnIO;
import parquet.schema.GroupType;
import parquet.schema.MessageType;
import parquet.schema.Type;
-import parquet.schema.PrimitiveType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-
-import parquet.schema.Types;
+import com.google.common.collect.Sets;
public class DrillParquetReader extends AbstractRecordReader {
@@ -247,7 +238,6 @@ public class DrillParquetReader extends AbstractRecordReader {
paths.put(md.getPath(), md);
}
- CodecFactoryExposer codecFactoryExposer = new CodecFactoryExposer(fileSystem.getConf());
Path filePath = new Path(entry.getPath());
BlockMetaData blockMetaData = footer.getBlocks().get(entry.getRowGroupIndex());
@@ -255,7 +245,8 @@ public class DrillParquetReader extends AbstractRecordReader {
recordCount = (int) blockMetaData.getRowCount();
pageReadStore = new ColumnChunkIncReadStore(recordCount,
- codecFactoryExposer.getCodecFactory(), operatorContext.getAllocator(), fileSystem, filePath);
+ new DirectCodecFactory(fileSystem.getConf(), operatorContext.getAllocator()), operatorContext.getAllocator(),
+ fileSystem, filePath);
for (String[] path : schema.getPaths()) {
Type type = schema.getType(path);
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
deleted file mode 100644
index 5438660..0000000
--- a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.hadoop;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.DirectDecompressionCodec;
-import org.apache.hadoop.io.compress.DirectDecompressor;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import parquet.bytes.BytesInput;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-public class CodecFactoryExposer{
-
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CodecFactoryExposer.class);
-
- private CodecFactory codecFactory;
- private final Map<String, org.apache.hadoop.io.compress.DirectDecompressionCodec> codecByName = new HashMap<String, org.apache.hadoop.io.compress.DirectDecompressionCodec>();
- private Configuration configuration;
-
- public CodecFactoryExposer(Configuration config){
- codecFactory = new CodecFactory(config);configuration = config;
- }
-
- public CodecFactory getCodecFactory() {
- return codecFactory;
- }
-
- public BytesInput decompress(BytesInput bytes, int uncompressedSize, CompressionCodecName codecName) throws IOException {
- return codecFactory.getDecompressor(codecName).decompress(bytes, uncompressedSize);
- }
-
- public static BytesInput getBytesInput(ByteBuf uncompressedByteBuf, int uncompressedSize) throws IOException {
- ByteBuffer outBuffer=uncompressedByteBuf.nioBuffer(0, uncompressedSize);
- return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit());
- }
-
- public void decompress(CompressionCodecName codecName,
- final DrillBuf compressedByteBuf,
- final DrillBuf uncompressedByteBuf,
- int compressedSize,
- int uncompressedSize) throws IOException {
- final ByteBuffer inpBuffer = compressedByteBuf.nioBuffer(0, compressedSize);
- final ByteBuffer outBuffer = uncompressedByteBuf.nioBuffer(0, uncompressedSize);
- CompressionCodec c = getCodec(codecName);
- //TODO: Create the decompressor only once at init time.
- Class<?> cx = c.getClass();
-
- DirectDecompressionCodec d=null;
- DirectDecompressor decompr=null;
-
- if (DirectDecompressionCodec.class.isAssignableFrom(cx)) {
- d=(DirectDecompressionCodec)c;
- }
-
- if(d!=null) {
- decompr = d.createDirectDecompressor();
- }
-
- if(d!=null && decompr!=null){
- decompr.decompress(inpBuffer, outBuffer);
- }else{
- logger.warn("This Hadoop implementation does not support a " + codecName +
- " direct decompression codec interface. "+
- "Direct decompression is available only on *nix systems with Hadoop 2.3 or greater. "+
- "Read operations will be a little slower. ");
- BytesInput outBytesInp = this.decompress(
- new HadoopByteBufBytesInput(inpBuffer, 0, inpBuffer.limit()),
- uncompressedSize,
- codecName);
- // COPY the data back into the output buffer.
- // (DrillBufs can only refer to direct memory, so we cannot pass back a BytesInput backed
- // by a byte array).
- outBuffer.put(outBytesInp.toByteArray());
- }
- }
-
- private DirectDecompressionCodec getCodec(CompressionCodecName codecName) {
- String codecClassName = codecName.getHadoopCompressionCodecClassName();
- if (codecClassName == null) {
- return null;
- }
- DirectDecompressionCodec codec = codecByName.get(codecClassName);
- if (codec != null) {
- return codec;
- }
-
- try {
- Class<?> codecClass = Class.forName(codecClassName);
- codec = (DirectDecompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
- codecByName.put(codecClassName, codec);
- return codec;
- } catch (ClassNotFoundException e) {
- throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
- }
- }
-
- public static class HadoopByteBufBytesInput extends BytesInput {
-
- private final ByteBuffer byteBuf;
- private final int length;
- private final int offset;
-
- public HadoopByteBufBytesInput(ByteBuffer byteBuf, int offset, int length) {
- super();
- this.byteBuf = byteBuf;
- this.offset = offset;
- this.length = length;
- }
-
- @Override
- public void writeAllTo(OutputStream out) throws IOException {
- final WritableByteChannel outputChannel = Channels.newChannel(out);
- byteBuf.position(offset);
- ByteBuffer tempBuf = byteBuf.slice();
- tempBuf.limit(length);
- outputChannel.write(tempBuf);
- }
-
- @Override
- public ByteBuffer toByteBuffer() throws IOException {
- byteBuf.position(offset);
- ByteBuffer buf = byteBuf.slice();
- buf.limit(length);
- return buf;
- }
-
- @Override
- public long size() {
- return length;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
index 242cd28..6337d4c 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -28,7 +28,7 @@ import java.util.Map;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -52,14 +52,15 @@ public class ColumnChunkIncReadStore implements PageReadStore {
private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
- private CodecFactory codecFactory = new CodecFactory(new Configuration());
+ private DirectCodecFactory codecFactory;
private BufferAllocator allocator;
private FileSystem fs;
private Path path;
private long rowCount;
private List<FSDataInputStream> streams = new ArrayList();
- public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator, FileSystem fs, Path path) {
+ public ColumnChunkIncReadStore(long rowCount, DirectCodecFactory codecFactory, BufferAllocator allocator,
+ FileSystem fs, Path path) {
this.codecFactory = codecFactory;
this.allocator = allocator;
this.fs = fs;
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
index 0e9dec0..743d185 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkPageWriteStoreExposer.java
@@ -21,21 +21,19 @@ import java.io.IOException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
-import org.apache.hadoop.conf.Configuration;
import parquet.column.page.PageWriteStore;
import parquet.hadoop.CodecFactory.BytesCompressor;
-import parquet.hadoop.metadata.CompressionCodecName;
import parquet.schema.MessageType;
public class ColumnChunkPageWriteStoreExposer {
- public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(OperatorContext oContext,
- CompressionCodecName codec,
- int pageSize,
- MessageType schema,
- int initialSize) {
- BytesCompressor compressor = new CodecFactory(new Configuration()).getCompressor(codec, pageSize);
+ public static ColumnChunkPageWriteStore newColumnChunkPageWriteStore(
+ OperatorContext oContext,
+ BytesCompressor compressor,
+ MessageType schema,
+ int initialSize
+ ) {
return new ColumnChunkPageWriteStore(compressor, schema, initialSize, new ParquetDirectByteBufferAllocator(oContext));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java
new file mode 100644
index 0000000..644144e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestDirectCodecFactory.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import io.netty.buffer.DrillBuf;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.drill.common.DeferredException;
+import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.ByteBufBytesInput;
+import org.apache.drill.exec.store.parquet.DirectCodecFactory.DirectBytesDecompressor;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import parquet.bytes.BytesInput;
+import parquet.hadoop.CodecFactory.BytesCompressor;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+public class TestDirectCodecFactory extends ExecTest {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestDirectCodecFactory.class);
+
+ private static enum Decompression {
+ ON_HEAP, OFF_HEAP, DRILLBUF
+ }
+
+ private void test(int size, CompressionCodecName codec, boolean useOnHeapCompression, Decompression decomp) {
+ DrillBuf rawBuf = null;
+ DrillBuf outBuf = null;
+ try (BufferAllocator allocator = new TopLevelAllocator();
+ DirectCodecFactory codecFactory = new DirectCodecFactory(new Configuration(), allocator)) {
+ try {
+ rawBuf = allocator.buffer(size);
+ final byte[] rawArr = new byte[size];
+ outBuf = allocator.buffer(size * 2);
+ Random r = new Random();
+ byte[] random = new byte[1024];
+ int pos = 0;
+ while (pos < size) {
+ r.nextBytes(random);
+ rawBuf.writeBytes(random);
+ System.arraycopy(random, 0, rawArr, pos, random.length);
+ pos += random.length;
+ }
+
+ BytesCompressor c = codecFactory.getCompressor(codec, 64 * 1024);
+ DirectBytesDecompressor d = codecFactory.getDecompressor(codec);
+
+ BytesInput compressed;
+ if (useOnHeapCompression) {
+ compressed = c.compress(BytesInput.from(rawArr));
+ } else {
+ compressed = c.compress(new ByteBufBytesInput(rawBuf));
+ }
+
+ switch (decomp) {
+ case DRILLBUF: {
+ ByteBuffer buf = compressed.toByteBuffer();
+ DrillBuf b = allocator.buffer(buf.capacity());
+ try {
+ b.writeBytes(buf);
+ d.decompress(b, (int) compressed.size(), outBuf, size);
+ for (int i = 0; i < size; i++) {
+ Assert.assertTrue("Data didn't match at " + i, outBuf.getByte(i) == rawBuf.getByte(i));
+ }
+ } finally {
+ b.release();
+ }
+ break;
+ }
+
+ case OFF_HEAP: {
+ ByteBuffer buf = compressed.toByteBuffer();
+ DrillBuf b = allocator.buffer(buf.capacity());
+ try {
+ b.writeBytes(buf);
+ BytesInput input = d.decompress(new ByteBufBytesInput(b), size);
+ Assert.assertArrayEquals(input.toByteArray(), rawArr);
+ } finally {
+ b.release();
+ }
+ break;
+ }
+ case ON_HEAP: {
+ byte[] buf = compressed.toByteArray();
+ BytesInput input = d.decompress(BytesInput.from(buf), size);
+ Assert.assertArrayEquals(input.toByteArray(), rawArr);
+ break;
+ }
+ }
+ } catch (Exception e) {
+ String msg = String.format(
+ "Failure while testing Codec: %s, OnHeapCompressionInput: %s, Decompression Mode: %s, Data Size: %d",
+ codec.name(),
+ useOnHeapCompression, decomp.name(), size);
+ System.out.println(msg);
+ throw new RuntimeException(msg, e);
+ } finally {
+ if (rawBuf != null) {
+ rawBuf.release();
+ }
+ if (outBuf != null) {
+ outBuf.release();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void compressionCodecs() throws Exception {
+ int[] sizes = { 4 * 1024, 1 * 1024 * 1024 };
+ boolean[] comp = { true, false };
+
+ try (DeferredException ex = new DeferredException()) {
+ for (int size : sizes) {
+ for (boolean useOnHeapComp : comp) {
+ for (Decompression decomp : Decompression.values()) {
+ for (CompressionCodecName codec : CompressionCodecName.values()) {
+ if (codec == CompressionCodecName.LZO) {
+ // not installed as gpl.
+ continue;
+ }
+ try {
+ test(size, codec, useOnHeapComp, decomp);
+ } catch (Exception e) {
+ ex.addException(e);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index e50e3fb..83a1cb8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -70,7 +70,6 @@ import parquet.bytes.BytesInput;
import parquet.column.page.DataPageV1;
import parquet.column.page.PageReadStore;
import parquet.column.page.PageReader;
-import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.Footer;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
@@ -625,7 +624,7 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
BufferAllocator allocator = new TopLevelAllocator();
for(int i = 0; i < 25; i++) {
ParquetRecordReader rr = new ParquetRecordReader(context, 256000, fileName, 0, fs,
- new CodecFactoryExposer(dfsConfig), f.getParquetMetadata(), columns);
+ new DirectCodecFactory(dfsConfig, allocator), f.getParquetMetadata(), columns);
TestOutputMutator mutator = new TestOutputMutator(allocator);
rr.setup(mutator);
Stopwatch watch = new Stopwatch();
http://git-wip-us.apache.org/repos/asf/drill/blob/e1fb13f4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f0f4bc5..a207f2a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -33,7 +33,7 @@
<proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path>
<dep.junit.version>4.11</dep.junit.version>
<dep.slf4j.version>1.7.6</dep.slf4j.version>
- <parquet.version>1.6.0rc3-drill-r0.1</parquet.version>
+ <parquet.version>1.6.0rc3-drill-r0.3</parquet.version>
</properties>
<scm>