You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/03 05:03:04 UTC
[2/4] drill git commit: DRILL-2940,
DRILL-2847: Improve Memory Characteristics
DRILL-2940, DRILL-2847: Improve Memory Characteristics
- Update Large Buffer allocation so Drill releases immediately rather than waiting for Garbage Collection
- Remove DrillBuf.wrap() and all references to it.
- Update Parquet Reader to reduce object churn and indirection.
- Add additional metric to memory iterator
- Add Large and small buffer metric historgram tracking
- Add memory tracking reporter
- Update Netty to 4.0.27
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0dd0e833
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0dd0e833
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0dd0e833
Branch: refs/heads/master
Commit: 0dd0e833714120c77e3e7ef34de654f5246953b9
Parents: 88bb051
Author: Jacques Nadeau <ja...@apache.org>
Authored: Tue Apr 28 08:53:12 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sat May 2 19:33:54 2015 -0700
----------------------------------------------------------------------
exec/java-exec/pom.xml | 2 +-
.../src/main/codegen/includes/vv_imports.ftl | 1 +
.../codegen/templates/NullableValueVectors.java | 13 +
.../templates/VariableLengthVectors.java | 12 +
.../src/main/java/io/netty/buffer/DrillBuf.java | 47 +--
.../main/java/io/netty/buffer/LargeBuffer.java | 350 +++++++++++++++++++
.../netty/buffer/PooledByteBufAllocatorL.java | 199 ++++++++++-
.../netty/buffer/UnsafeDirectLittleEndian.java | 80 ++++-
.../apache/drill/exec/TestMemoryRetention.java | 144 ++++++++
.../drill/exec/memory/TopLevelAllocator.java | 2 +-
.../apache/drill/exec/metrics/DrillMetrics.java | 6 +-
.../drill/exec/physical/impl/ScanBatch.java | 6 +
.../apache/drill/exec/rpc/data/DataServer.java | 74 ++--
.../exec/store/parquet/ColumnDataReader.java | 19 +-
.../store/parquet/columnreaders/BitReader.java | 5 +-
.../parquet/columnreaders/ColumnReader.java | 25 +-
.../columnreaders/FixedByteAlignedReader.java | 28 +-
.../columnreaders/FixedWidthRepeatedReader.java | 2 +-
.../columnreaders/NullableBitReader.java | 2 +-
.../columnreaders/NullableColumnReader.java | 6 +-
.../NullableFixedByteAlignedReaders.java | 26 +-
.../NullableVarLengthValuesColumn.java | 6 +-
.../store/parquet/columnreaders/PageReader.java | 249 +++++++------
.../ParquetFixedWidthDictionaryReaders.java | 18 +-
.../columnreaders/VarLengthColumnReaders.java | 94 ++---
.../columnreaders/VarLengthValuesColumn.java | 7 +-
.../drill/exec/store/sys/MemoryIterator.java | 25 +-
.../apache/drill/exec/util/DecimalUtility.java | 8 +
.../exec/work/fragment/FragmentExecutor.java | 4 +-
.../parquet/hadoop/CodecFactoryExposer.java | 19 +-
.../exec/vector/complex/TestEmptyPopulator.java | 14 +-
.../src/test/resources/drill-module.conf | 3 +-
pom.xml | 2 +-
33 files changed, 1134 insertions(+), 364 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 17f5e6e..35df625 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -115,7 +115,7 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
- <version>4.0.24.Final</version>
+ <version>4.0.27.Final</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.ext</groupId>
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
index d0f6291..92c8007 100644
--- a/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
+++ b/exec/java-exec/src/main/codegen/includes/vv_imports.ftl
@@ -57,6 +57,7 @@ import java.util.List;
import java.io.Closeable;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index 9373fc3..9d03efb 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -460,6 +460,19 @@ public final class ${className} extends BaseDataValueVector implements <#if type
<#if type.major == "VarLen">lastSet = index;</#if>
</#if>
}
+
+ public void setSafe(int index, ByteBuffer value, int start, int length) {
+ <#if type.major != "VarLen">
+ throw new UnsupportedOperationException();
+ <#else>
+ fillEmpties(index);
+
+ bits.getMutator().setSafe(index, 1);
+ values.getMutator().setSafe(index, value, start, length);
+ setCount++;
+ <#if type.major == "VarLen">lastSet = index;</#if>
+ </#if>
+ }
public void setNull(int index){
bits.getMutator().setSafe(index, 0);
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 0273304..8a4b663 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -444,6 +444,18 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
offsetVector.getMutator().set(index + 1, currentOffset + length);
data.setBytes(currentOffset, bytes, start, length);
}
+
+ public void setSafe(int index, ByteBuffer bytes, int start, int length) {
+ assert index >= 0;
+
+ int currentOffset = offsetVector.getAccessor().get(index);
+
+ while (data.capacity() < currentOffset + length) {
+ reAlloc();
+ }
+ offsetVector.getMutator().setSafe(index + 1, currentOffset + length);
+ data.setBytes(currentOffset, bytes, start, length);
+ }
public void setSafe(int index, byte[] bytes, int start, int length) {
assert index >= 0;
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
index 43b8b48..2016e1e 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
@@ -36,6 +36,8 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.util.AssertionUtil;
+import com.google.common.base.Preconditions;
+
public final class DrillBuf extends AbstractByteBuf {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillBuf.class);
@@ -71,19 +73,6 @@ public final class DrillBuf extends AbstractByteBuf {
this.allocator = allocator;
}
- private DrillBuf(ByteBuffer bb) {
- super(bb.remaining());
- UnpooledUnsafeDirectByteBuf bytebuf = new UnpooledUnsafeDirectByteBuf(UnpooledByteBufAllocator.DEFAULT, bb, bb.remaining());
- this.acct = FakeAllocator.FAKE_ACCOUNTOR;
- this.addr = bytebuf.memoryAddress();
- this.allocator = FakeAllocator.FAKE_ALLOCATOR;
- this.b = bytebuf;
- this.length = bytebuf.capacity();
- this.offset = 0;
- this.rootBuffer = true;
- this.writerIndex(bb.remaining());
- }
-
private DrillBuf(BufferAllocator allocator, Accountor a) {
super(0);
this.b = new EmptyByteBuf(allocator.getUnderlyingAllocator()).order(ByteOrder.LITTLE_ENDIAN);
@@ -257,7 +246,9 @@ public final class DrillBuf extends AbstractByteBuf {
public synchronized boolean release(int decrement) {
if(rootBuffer){
- if(0 == this.rootRefCnt.addAndGet(-decrement)){
+ final long newRefCnt = this.rootRefCnt.addAndGet(-decrement);
+ Preconditions.checkArgument(newRefCnt > -1, "Buffer has negative reference count.");
+ if (newRefCnt == 0) {
b.release(decrement);
acct.release(this, length);
return true;
@@ -699,6 +690,25 @@ public final class DrillBuf extends AbstractByteBuf {
return this;
}
+ public ByteBuf setBytes(int index, ByteBuffer src, int srcIndex, int length) {
+ if (src.isDirect()) {
+ checkIndex(index, length);
+ PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcIndex, this.memoryAddress() + index,
+ length);
+ } else {
+ if (srcIndex == 0 && src.capacity() == length) {
+ b.setBytes(index + offset, src);
+ } else {
+ ByteBuffer newBuf = src.duplicate();
+ newBuf.position(srcIndex);
+ newBuf.limit(srcIndex + length);
+ b.setBytes(index + offset, src);
+ }
+ }
+
+ return this;
+ }
+
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
b.setBytes(index + offset, src, srcIndex, length);
@@ -735,13 +745,4 @@ public final class DrillBuf extends AbstractByteBuf {
return rootBuffer;
}
- public static DrillBuf wrapByteBuffer(ByteBuffer b) {
- if (!b.isDirect()) {
- throw new IllegalStateException("DrillBufs can only refer to direct memory.");
- } else {
- return new DrillBuf(b);
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/io/netty/buffer/LargeBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/LargeBuffer.java b/exec/java-exec/src/main/java/io/netty/buffer/LargeBuffer.java
new file mode 100644
index 0000000..f1d4842
--- /dev/null
+++ b/exec/java-exec/src/main/java/io/netty/buffer/LargeBuffer.java
@@ -0,0 +1,350 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.netty.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This is basically a complete copy of DuplicatedByteBuf. We copy because we can't override the release methods to keep
+ * global track of created Large Buffers.
+ */
+public class LargeBuffer extends AbstractByteBuf {
+
+ private final AtomicLong hugeBufferSize;
+ private final AtomicLong hugeBufferCount;
+
+ @Override
+ public ByteBuffer nioBuffer(int index, int length) {
+ return unwrap().nioBuffer(index, length);
+ }
+
+ private final ByteBuf buffer;
+ private final int initCap;
+
+ public LargeBuffer(ByteBuf buffer, AtomicLong hugeBufferSize, AtomicLong hugeBufferCount) {
+ super(buffer.maxCapacity());
+ initCap = buffer.capacity();
+ this.hugeBufferCount = hugeBufferCount;
+ this.hugeBufferSize = hugeBufferSize;
+
+ if (buffer instanceof LargeBuffer) {
+ this.buffer = ((LargeBuffer) buffer).buffer;
+ } else {
+ this.buffer = buffer;
+ }
+
+ setIndex(buffer.readerIndex(), buffer.writerIndex());
+ }
+
+ @Override
+ public ByteBuf unwrap() {
+ return buffer;
+ }
+
+ @Override
+ public ByteBufAllocator alloc() {
+ return buffer.alloc();
+ }
+
+ @Override
+ public ByteOrder order() {
+ return buffer.order();
+ }
+
+ @Override
+ public boolean isDirect() {
+ return buffer.isDirect();
+ }
+
+ @Override
+ public int capacity() {
+ return buffer.capacity();
+ }
+
+ @Override
+ public ByteBuf capacity(int newCapacity) {
+ buffer.capacity(newCapacity);
+ return this;
+ }
+
+ @Override
+ public boolean hasArray() {
+ return buffer.hasArray();
+ }
+
+ @Override
+ public byte[] array() {
+ return buffer.array();
+ }
+
+ @Override
+ public int arrayOffset() {
+ return buffer.arrayOffset();
+ }
+
+ @Override
+ public boolean hasMemoryAddress() {
+ return buffer.hasMemoryAddress();
+ }
+
+ @Override
+ public long memoryAddress() {
+ return buffer.memoryAddress();
+ }
+
+ @Override
+ public byte getByte(int index) {
+ return _getByte(index);
+ }
+
+ @Override
+ protected byte _getByte(int index) {
+ return buffer.getByte(index);
+ }
+
+ @Override
+ public short getShort(int index) {
+ return _getShort(index);
+ }
+
+ @Override
+ protected short _getShort(int index) {
+ return buffer.getShort(index);
+ }
+
+ @Override
+ public int getUnsignedMedium(int index) {
+ return _getUnsignedMedium(index);
+ }
+
+ @Override
+ protected int _getUnsignedMedium(int index) {
+ return buffer.getUnsignedMedium(index);
+ }
+
+ @Override
+ public int getInt(int index) {
+ return _getInt(index);
+ }
+
+ @Override
+ protected int _getInt(int index) {
+ return buffer.getInt(index);
+ }
+
+ @Override
+ public long getLong(int index) {
+ return _getLong(index);
+ }
+
+ @Override
+ protected long _getLong(int index) {
+ return buffer.getLong(index);
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ return new LargeBuffer(buffer.copy(index, length), hugeBufferSize, hugeBufferCount);
+ }
+
+ @Override
+ public ByteBuf slice(int index, int length) {
+ return new SlicedByteBuf(this, index, length);
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+ buffer.getBytes(index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+ buffer.getBytes(index, dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuffer dst) {
+ buffer.getBytes(index, dst);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setByte(int index, int value) {
+ _setByte(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setByte(int index, int value) {
+ buffer.setByte(index, value);
+ }
+
+ @Override
+ public ByteBuf setShort(int index, int value) {
+ _setShort(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setShort(int index, int value) {
+ buffer.setShort(index, value);
+ }
+
+ @Override
+ public ByteBuf setMedium(int index, int value) {
+ _setMedium(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setMedium(int index, int value) {
+ buffer.setMedium(index, value);
+ }
+
+ @Override
+ public ByteBuf setInt(int index, int value) {
+ _setInt(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setInt(int index, int value) {
+ buffer.setInt(index, value);
+ }
+
+ @Override
+ public ByteBuf setLong(int index, long value) {
+ _setLong(index, value);
+ return this;
+ }
+
+ @Override
+ protected void _setLong(int index, long value) {
+ buffer.setLong(index, value);
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+ buffer.setBytes(index, src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+ buffer.setBytes(index, src, srcIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuffer src) {
+ buffer.setBytes(index, src);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, OutputStream out, int length)
+ throws IOException {
+ buffer.getBytes(index, out, length);
+ return this;
+ }
+
+ @Override
+ public int getBytes(int index, GatheringByteChannel out, int length)
+ throws IOException {
+ return buffer.getBytes(index, out, length);
+ }
+
+ @Override
+ public int setBytes(int index, InputStream in, int length)
+ throws IOException {
+ return buffer.setBytes(index, in, length);
+ }
+
+ @Override
+ public int setBytes(int index, ScatteringByteChannel in, int length)
+ throws IOException {
+ return buffer.setBytes(index, in, length);
+ }
+
+ @Override
+ public int nioBufferCount() {
+ return buffer.nioBufferCount();
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers(int index, int length) {
+ return buffer.nioBuffers(index, length);
+ }
+
+ @Override
+ public ByteBuffer internalNioBuffer(int index, int length) {
+ return nioBuffer(index, length);
+ }
+
+ @Override
+ public int forEachByte(int index, int length, ByteBufProcessor processor) {
+ return buffer.forEachByte(index, length, processor);
+ }
+
+ @Override
+ public int forEachByteDesc(int index, int length, ByteBufProcessor processor) {
+ return buffer.forEachByteDesc(index, length, processor);
+ }
+
+ @Override
+ public final int refCnt() {
+ return unwrap().refCnt();
+ }
+
+ @Override
+ public final ByteBuf retain() {
+ unwrap().retain();
+ return this;
+ }
+
+ @Override
+ public final ByteBuf retain(int increment) {
+ unwrap().retain(increment);
+ return this;
+ }
+
+ @Override
+ public boolean release() {
+ return release(1);
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ boolean released = unwrap().release(decrement);
+ if (released) {
+ hugeBufferSize.addAndGet(-initCap);
+ hugeBufferCount.decrementAndGet();
+ }
+ return released;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java b/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
index c0de544..2ca79f0 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
@@ -17,18 +17,100 @@
*/
package io.netty.buffer;
+import io.netty.util.internal.StringUtil;
+
+import java.lang.reflect.Field;
import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.util.AssertionUtil;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PooledByteBufAllocatorL.class);
+ private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("drill.allocator");
+ private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;
+
+ private static final String METRIC_PREFIX = "drill.allocator.";
public static final PooledByteBufAllocatorL DEFAULT = new PooledByteBufAllocatorL();
-// public final UnsafeDirectLittleEndian emptyBuf;
+ private final MetricRegistry registry = DrillMetrics.getInstance();
+ private final AtomicLong hugeBufferSize = new AtomicLong(0);
+ private final AtomicLong hugeBufferCount = new AtomicLong(0);
+ private final AtomicLong normalBufferSize = new AtomicLong(0);
+ private final AtomicLong normalBufferCount = new AtomicLong(0);
- public PooledByteBufAllocatorL() {
+ private final PoolArena<ByteBuffer>[] directArenas;
+ private final MemoryStatusThread statusThread;
+ private final Histogram largeBuffersHist;
+ private final Histogram normalBuffersHist;
+
+ private PooledByteBufAllocatorL() {
super(true);
-// emptyBuf = newDirectBuffer(0,0);
+ try {
+ Field f = PooledByteBufAllocator.class.getDeclaredField("directArenas");
+ f.setAccessible(true);
+ this.directArenas = (PoolArena<ByteBuffer>[]) f.get(this);
+ } catch (Exception e) {
+ throw new RuntimeException("Failure while initializing allocator. Unable to retrieve direct arenas field.", e);
+ }
+
+ if (memoryLogger.isTraceEnabled()) {
+ statusThread = new MemoryStatusThread();
+ statusThread.start();
+ } else {
+ statusThread = null;
+ }
+ removeOldMetrics();
+
+ registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return normalBufferSize.get();
+ }
+ });
+
+ registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return normalBufferCount.get();
+ }
+ });
+
+ registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return hugeBufferSize.get();
+ }
+ });
+
+ registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
+ @Override
+ public Long getValue() {
+ return hugeBufferCount.get();
+ }
+ });
+
+ largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
+ normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");
+
+ }
+
+ private synchronized void removeOldMetrics() {
+ registry.removeMatching(new MetricFilter() {
+ @Override
+ public boolean matches(String name, Metric metric) {
+ return name.startsWith("drill.allocator.");
+ }
+ });
}
@Override
@@ -41,19 +123,42 @@ public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
- ByteBuf buf;
if (directArena != null) {
- buf = directArena.allocate(cache, initialCapacity, maxCapacity);
- } else {
- throw new UnsupportedOperationException("Drill requries that the allocator operates in DirectBuffer mode.");
- }
- if(buf instanceof PooledUnsafeDirectByteBuf){
- return new UnsafeDirectLittleEndian( (PooledUnsafeDirectByteBuf) buf);
- }else{
- throw new UnsupportedOperationException("Drill requries that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
+ if (initialCapacity > directArena.chunkSize) {
+ // This is beyond chunk size so we'll allocate separately.
+ ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);
+
+ hugeBufferCount.incrementAndGet();
+ hugeBufferSize.addAndGet(buf.capacity());
+ largeBuffersHist.update(buf.capacity());
+ // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
+ return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
+
+ } else {
+ // within chunk, use arena.
+ ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
+ if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
+ fail();
+ }
+
+ normalBuffersHist.update(buf.capacity());
+ if (AssertionUtil.ASSERT_ENABLED) {
+ normalBufferSize.addAndGet(buf.capacity());
+ normalBufferCount.incrementAndGet();
+ }
+
+ return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize);
+ }
+
+ } else {
+ throw fail();
}
+ }
+ private UnsupportedOperationException fail() {
+ return new UnsupportedOperationException(
+ "Drill requries that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
}
@@ -81,5 +186,73 @@ public class PooledByteBufAllocatorL extends PooledByteBufAllocator{
"initialCapacity: %d (expected: not greater than maxCapacity(%d)",
initialCapacity, maxCapacity));
}
-}
+ }
+
+ private class MemoryStatusThread extends Thread {
+
+ public MemoryStatusThread() {
+ super("memory-status-logger");
+ this.setDaemon(true);
+ this.setName("allocation.logger");
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ memoryLogger.trace("Memory Usage: \n{}", PooledByteBufAllocatorL.this.toString());
+ try {
+ Thread.sleep(MEMORY_LOGGER_FREQUENCY_SECONDS * 1000);
+ } catch (InterruptedException e) {
+ return;
+ }
+
+ }
+ }
+
+ }
+
+ public void checkAndReset() {
+ if (hugeBufferCount.get() != 0 || normalBufferCount.get() != 0) {
+ StringBuilder buf = new StringBuilder();
+ buf.append("Large buffers outstanding: ");
+ buf.append(hugeBufferCount.get());
+ buf.append(" totaling ");
+ buf.append(hugeBufferSize.get());
+ buf.append(" bytes.");
+ buf.append('\n');
+ buf.append("Normal buffers outstanding: ");
+ buf.append(normalBufferCount.get());
+ buf.append(" totaling ");
+ buf.append(normalBufferSize.get());
+ buf.append(" bytes.");
+ hugeBufferCount.set(0);
+ normalBufferCount.set(0);
+ hugeBufferSize.set(0);
+ normalBufferSize.set(0);
+ throw new DrillRuntimeException(buf.toString());
+ }
+ }
+
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(directArenas.length);
+ buf.append(" direct arena(s):");
+ buf.append(StringUtil.NEWLINE);
+ for (PoolArena<ByteBuffer> a : directArenas) {
+ buf.append(a);
+ }
+
+ buf.append("Large buffers outstanding: ");
+ buf.append(this.hugeBufferCount.get());
+ buf.append(" totaling ");
+ buf.append(this.hugeBufferSize.get());
+ buf.append(" bytes.");
+ buf.append('\n');
+ buf.append("Normal buffers outstanding: ");
+ buf.append(this.normalBufferCount.get());
+ buf.append(" totaling ");
+ buf.append(this.normalBufferSize.get());
+ buf.append(" bytes.");
+ return buf.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java b/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
index dfdc114..e332b13 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/UnsafeDirectLittleEndian.java
@@ -21,21 +21,39 @@ package io.netty.buffer;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteOrder;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.drill.exec.util.AssertionUtil;
public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
- private final PooledUnsafeDirectByteBuf wrapped;
- private final long memoryAddress;
-
- UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) {
- super(buf);
- if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
- throw new IllegalStateException("Drill only runs on LittleEndian systems.");
- }
- wrapped = buf;
- this.memoryAddress = buf.memoryAddress();
- }
-
+ private final AbstractByteBuf wrapped;
+ private final long memoryAddress;
+ private AtomicLong bufferCount;
+ private AtomicLong bufferSize;
+ private long initCap = -1;
+
+ UnsafeDirectLittleEndian(LargeBuffer buf) {
+ this(buf, true);
+ }
+
+ UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) {
+ this(buf, true);
+ this.bufferCount = bufferCount;
+ this.bufferSize = bufferSize;
+
+ // initCap is used if we're tracking memory release. If we're in non-debug mode, we'll skip this.
+ this.initCap = AssertionUtil.ASSERT_ENABLED ? capacity() : -1;
+ }
+
+ private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
+ super(buf);
+ if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
+ throw new IllegalStateException("Drill only runs on LittleEndian systems.");
+ }
+ wrapped = buf;
+ this.memoryAddress = buf.memoryAddress();
+ }
private long addr(int index) {
return memoryAddress + index;
}
@@ -52,7 +70,27 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
return Float.intBitsToFloat(getInt(index));
}
- @Override
+ @Override
+ public ByteBuf slice() {
+ return slice(this.readerIndex(), readableBytes());
+ }
+
+ @Override
+ public ByteBuf slice(int index, int length) {
+ return new SlicedByteBuf(this, index, length);
+ }
+
+ @Override
+ public ByteOrder order() {
+ return ByteOrder.LITTLE_ENDIAN;
+ }
+
+ @Override
+ public ByteBuf order(ByteOrder endianness) {
+ return this;
+ }
+
+ @Override
public double getDouble(int index) {
return Double.longBitsToDouble(getLong(index));
}
@@ -190,4 +228,20 @@ public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
return this;
}
+ @Override
+ public boolean release() {
+ return release(1);
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ boolean released = super.release(decrement);
+ if (released && initCap != -1) {
+ bufferCount.decrementAndGet();
+ bufferSize.addAndGet(-initCap);
+ }
+ return released;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/TestMemoryRetention.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/TestMemoryRetention.java b/exec/java-exec/src/main/java/org/apache/drill/exec/TestMemoryRetention.java
new file mode 100644
index 0000000..37e5389
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/TestMemoryRetention.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec;
+
+import io.netty.buffer.DrillBuf;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+
+import com.google.common.collect.Lists;
+
+public class TestMemoryRetention {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestMemoryRetention.class);
+
+ static final int SMALL_AVERAGE_BYTES = 1024 * 32;
+ static final int LARGE_BYTES = 32 * 1024 * 1024;
+ static final int PARALLEL_THREADS = 32;
+ static final double SMALL_ALLOCATION_MEM = 0.20;
+ static final double OVERHEAD_ALLOWANCE = 0.20;
+ static final List<Integer> ALLOCATIONS;
+ static final int MAX_ALLOCS = 100;
+ static final AtomicInteger ALLOCS = new AtomicInteger(0);
+
+ static {
+ Random r = new Random();
+ long maxMemory = DrillConfig.getMaxDirectMemory();
+ long maxPerThread = maxMemory / PARALLEL_THREADS;
+ double smallCount = (maxPerThread * SMALL_ALLOCATION_MEM) / SMALL_AVERAGE_BYTES;
+ double largeCount = (maxPerThread * (1 - SMALL_ALLOCATION_MEM - OVERHEAD_ALLOWANCE)) / LARGE_BYTES;
+ List<Integer> allocations = Lists.newArrayList();
+
+ for (int i = 0; i < smallCount; i++) {
+ allocations.add(SMALL_AVERAGE_BYTES / 2 + r.nextInt(SMALL_AVERAGE_BYTES));
+ }
+
+ for (int i = 0; i < largeCount; i++) {
+ allocations.add(LARGE_BYTES);
+ }
+ Collections.shuffle(allocations);
+ ALLOCATIONS = allocations;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ final DrillConfig config = DrillConfig.create();
+ final TopLevelAllocator a = new TopLevelAllocator(config);
+ for (int i = 0; i < PARALLEL_THREADS; i++) {
+ Alloc alloc = new Alloc(a);
+ alloc.start();
+ }
+ }
+
+ private static class Alloc extends Thread {
+ final TopLevelAllocator allocator;
+
+ Alloc(TopLevelAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ @Override
+ public void run() {
+ Random r = new Random();
+ try {
+
+ if (ALLOCS.incrementAndGet() > MAX_ALLOCS) {
+ Thread.sleep(50000000000L);
+ }
+
+ Thread.sleep(r.nextInt(8000));
+ } catch (InterruptedException e) {
+ return;
+ }
+
+ logger.info("Starting alloc.");
+ final List<DrillBuf> bufs = Lists.newLinkedList();
+ for (Integer i : ALLOCATIONS) {
+ bufs.add(allocator.buffer(i));
+ }
+ Collections.shuffle(bufs);
+ logger.info("Finished alloc.");
+
+ final Dealloc d = new Dealloc(bufs, allocator);
+
+ // sometimes we'll deallocate locally, sometimes in different thread.
+ if (r.nextBoolean()) {
+ d.start();
+ } else {
+ d.run();
+ }
+
+ }
+
+ }
+
+ private static class Dealloc extends Thread {
+ final List<DrillBuf> bufs;
+ final TopLevelAllocator a;
+
+ public Dealloc(List<DrillBuf> bufs, TopLevelAllocator a) {
+ this.bufs = bufs;
+ this.a = a;
+ }
+
+ public void run() {
+ try {
+ Thread.sleep(8000);
+ logger.info("Starting release.");
+ for (DrillBuf buf : bufs) {
+ buf.release();
+ }
+ logger.info("Finished release.");
+
+ } catch (InterruptedException e) {
+ return;
+ }
+
+ // start another.
+ Alloc alloc = new Alloc(a);
+ alloc.start();
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 22fcb8e..a78deb6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -22,8 +22,8 @@ import io.netty.buffer.DrillBuf;
import io.netty.buffer.PooledByteBufAllocatorL;
import io.netty.buffer.UnsafeDirectLittleEndian;
-import java.util.IdentityHashMap;
import java.util.HashMap;
+import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Map.Entry;
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
index a9799b2..7ef121e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/DrillMetrics.java
@@ -44,12 +44,14 @@ public class DrillMetrics {
private static class RegistryHolder {
public static final MetricRegistry REGISTRY;
-// private static JmxReporter jmxReporter = getJmxReporter();
-// private static Slf4jReporter logReporter = getLogReporter();
+ // private static final JmxReporter JMX_REPORTER;
+ private static final Slf4jReporter LOG_REPORTER;
static {
REGISTRY = new MetricRegistry();
registerSysStats();
+ // JMX_REPORTER = getJmxReporter();
+ LOG_REPORTER = getLogReporter();
}
private static void registerSysStats(){
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 6ea43cd..4700dbd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -94,10 +94,16 @@ public class ScanBatch implements CloseableRecordBatch {
this.oContext = oContext;
this.currentReader.setOperatorContext(this.oContext);
+ boolean setup = false;
try {
oContext.getStats().startProcessing();
this.currentReader.setup(mutator);
+ setup = true;
} finally {
+ // if we had an exception during setup, make sure to release existing data.
+ if (!setup) {
+ currentReader.cleanup();
+ }
oContext.getStats().stopProcessing();
}
this.partitionColumns = partitionColumns.iterator();
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 62f1429..6f8e20b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -17,15 +17,14 @@
*/
package org.apache.drill.exec.rpc.data;
-import java.io.IOException;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
-import io.netty.buffer.UnsafeDirectLittleEndian;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
+import java.io.IOException;
+
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData.BitClientHandshake;
@@ -105,7 +104,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
private final static FragmentRecordBatch OOM_FRAGMENT = FragmentRecordBatch.newBuilder().setIsOutOfMemory(true).build();
- private FragmentHandle getHandle(FragmentRecordBatch batch, int index){
+ private static FragmentHandle getHandle(FragmentRecordBatch batch, int index) {
return FragmentHandle.newBuilder()
.setQueryId(batch.getQueryId())
.setMajorFragmentId(batch.getReceivingMajorFragmentId())
@@ -138,32 +137,14 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
}
}else{
-
- for(int minor = 0; minor < targetCount; minor++){
- FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
- if(manager == null){
- continue;
+ if (targetCount > 1) {
+ for (int minor = 0; minor < targetCount; minor++) {
+ send(fragmentBatch, (DrillBuf) body, minor, ack, true);
}
-
- BufferAllocator allocator = manager.getFragmentContext().getAllocator();
-
- boolean withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body, out);
-
- if(!withinMemoryEnvelope){
- // if we over reserved, we need to add poison pill before batch.
- dataHandler.handle(manager, OOM_FRAGMENT, null, null);
- }
-
- ack.increment();
- dataHandler.handle(manager, fragmentBatch, out.value, ack);
-
- // make sure to release the reference count we have to the new buffer.
- // dataHandler.handle should have taken any ownership it needed.
- out.value.release();
+ } else {
+ send(fragmentBatch, (DrillBuf) body, 0, ack, false);
}
- out = null;
}
-
} catch (IOException | FragmentSetupException e) {
logger.error("Failure while getting fragment manager. {}",
QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(),
@@ -181,6 +162,45 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
}
}
+ private void send(final FragmentRecordBatch fragmentBatch, final DrillBuf body, final int minor, final AckSender ack,
+ final boolean shared)
+ throws FragmentSetupException, IOException {
+
+ FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
+ if (manager == null) {
+ return;
+ }
+
+ final BufferAllocator allocator = manager.getFragmentContext().getAllocator();
+ final Pointer<DrillBuf> out = new Pointer<DrillBuf>();
+
+ final boolean withinMemoryEnvelope;
+ final DrillBuf submitBody;
+
+ if (shared) {
+ withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body, out);
+ submitBody = out.value;
+ }else{
+ withinMemoryEnvelope = allocator.takeOwnership((DrillBuf) body.unwrap());
+ submitBody = body;
+ }
+
+ if (!withinMemoryEnvelope) {
+ // if we over reserved, we need to add poison pill before batch.
+ dataHandler.handle(manager, OOM_FRAGMENT, null, null);
+ }
+
+ ack.increment();
+ dataHandler.handle(manager, fragmentBatch, submitBody, ack);
+
+ if (shared) {
+ // make sure to release the reference count we have to the new buffer.
+ // dataHandler.handle should have taken any ownership it needed.
+ out.value.release();
+ }
+
+ }
+
private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
private volatile GenericFutureListener<ChannelFuture> handler;
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index 1b10b1d..1663cd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -17,13 +17,12 @@
*/
package org.apache.drill.exec.store.parquet;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.hadoop.fs.FSDataInputStream;
import parquet.bytes.BytesInput;
@@ -53,19 +52,11 @@ public class ColumnDataReader {
return new HadoopBytesInput(b);
}
- public ByteBuf getPageAsBytesBuf(ByteBuf byteBuf, int pageLength) throws IOException{
- ByteBuffer directBuffer=byteBuf.nioBuffer(0, pageLength);
- int l=directBuffer.remaining();
- int bl=byteBuf.capacity();
- try{
- while (directBuffer.remaining() > 0) {
- CompatibilityUtil.getBuf(input, directBuffer, directBuffer.remaining());
- }
- }catch(Exception e) {
- logger.error("Failed to read data into Direct ByteBuffer with exception: "+e.getMessage());
- throw new DrillRuntimeException(e.getMessage());
+ public void loadPage(DrillBuf target, int pageLength) throws IOException {
+ ByteBuffer directBuffer = target.nioBuffer(0, pageLength);
+ while (directBuffer.remaining() > 0) {
+ CompatibilityUtil.getBuf(input, directBuffer, directBuffer.remaining());
}
- return byteBuf;
}
public void clear(){
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
index 7416463..81b8002 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
@@ -17,10 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
-import io.netty.buffer.ByteBuf;
-
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.BitVector;
import org.apache.drill.exec.vector.ValueVector;
@@ -38,7 +35,7 @@ final class BitReader extends ColumnReader {
@Override
protected void readField(long recordsToReadInThisPass) {
- recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
// A more optimized reader for bit columns was removed to fix the bug
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index d3f1a30..5650ae3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
import java.io.IOException;
@@ -67,7 +67,7 @@ public abstract class ColumnReader<V extends ValueVector> {
int dataTypeLengthInBits;
int bytesReadInCurrentPass;
- protected ByteBuf vectorData;
+ protected DrillBuf vectorData;
// when reading definition levels for nullable columns, it is a one-way stream of integers
// when reading var length data, where we don't know if all of the records will fit until we've read all of them
// we must store the last definition level an use it in at the start of the next batch
@@ -106,14 +106,14 @@ public abstract class ColumnReader<V extends ValueVector> {
do {
determineSize(recordsToReadInThisPass, 0);
- } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null);
+ } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.hasPage());
}
valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
}
public void clear() {
valueVec.clear();
- this.pageReader.clear();
+ pageReader.clear();
}
public void readValues(long recordsToRead) {
@@ -189,11 +189,11 @@ public abstract class ColumnReader<V extends ValueVector> {
// Read a page if we need more data, returns true if we need to exit the read loop
public boolean readPage() throws IOException {
- if (pageReader.currentPage == null
- || totalValuesReadAndReadyToReadInPage() == pageReader.currentPage.getValueCount()) {
+ if (!pageReader.hasPage()
+ || totalValuesReadAndReadyToReadInPage() == pageReader.currentPageCount) {
readRecords(pageReader.valuesReadyToRead);
- if (pageReader.currentPage != null) {
- totalValuesRead += pageReader.currentPage.getValueCount();
+ if (pageReader.hasPage()) {
+ totalValuesRead += pageReader.currentPageCount;
}
if (!pageReader.next()) {
hitRowGroupEnd();
@@ -225,4 +225,13 @@ public abstract class ColumnReader<V extends ValueVector> {
return false;
}
+ // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
+ public static int readIntLittleEndian(DrillBuf in, int offset) {
+ int ch4 = in.getByte(offset) & 0xff;
+ int ch3 = in.getByte(offset + 1) & 0xff;
+ int ch2 = in.getByte(offset + 2) & 0xff;
+ int ch1 = in.getByte(offset + 3) & 0xff;
+ return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index a425bc1..fe0234b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
-import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import java.math.BigDecimal;
@@ -52,21 +51,20 @@ class FixedByteAlignedReader extends ColumnReader {
@Override
protected void readField(long recordsToReadInThisPass) {
- recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
readStartInBytes = pageReader.readPosInBytes;
readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
readLength = (int) Math.ceil(readLengthInBits / 8.0);
- bytebuf = pageReader.pageDataByteArray;
+ bytebuf = pageReader.pageData;
// vectorData is assigned by the superclass read loop method
writeData();
}
protected void writeData() {
- vectorData.writeBytes(bytebuf,
- (int) readStartInBytes, (int) readLength);
+ vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength);
}
public static class FixedBinaryReader extends FixedByteAlignedReader {
@@ -120,12 +118,12 @@ class FixedByteAlignedReader extends ColumnReader {
public static class DateReader extends ConvertedReader {
- DateVector dateVector;
+ private final DateVector.Mutator mutator;
DateReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
- dateVector = (DateVector) v;
+ mutator = ((DateVector) v).getMutator();
}
@Override
@@ -137,17 +135,9 @@ class FixedByteAlignedReader extends ColumnReader {
intValue = readIntLittleEndian(bytebuf, start);
}
- dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+ mutator.set(index, DateTimeUtils.fromJulianDay(intValue - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
}
- // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
- public static int readIntLittleEndian(ByteBuf in, int offset) {
- int ch4 = in.getByte(offset) & 0xff;
- int ch3 = in.getByte(offset + 1) & 0xff;
- int ch2 = in.getByte(offset + 2) & 0xff;
- int ch1 = in.getByte(offset + 3) & 0xff;
- return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
- }
}
public static class Decimal28Reader extends ConvertedReader {
@@ -163,7 +153,8 @@ class FixedByteAlignedReader extends ColumnReader {
@Override
void addNext(int start, int index) {
int width = Decimal28SparseHolder.WIDTH;
- BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale());
+ BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes,
+ schemaElement.getScale());
DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getBuffer(), index * width, schemaElement.getScale(),
schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits);
}
@@ -182,7 +173,8 @@ class FixedByteAlignedReader extends ColumnReader {
@Override
void addNext(int start, int index) {
int width = Decimal38SparseHolder.WIDTH;
- BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale());
+ BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes,
+ schemaElement.getScale());
DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getBuffer(), index * width, schemaElement.getScale(),
schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
index 05f6417..7f8b611 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedWidthRepeatedReader.java
@@ -178,7 +178,7 @@ public class FixedWidthRepeatedReader extends VarLengthColumn {
definitionLevelsRead++;
// we hit the end of this page, without confirmation that we reached the end of the current record
- if (definitionLevelsRead == pageReader.currentPage.getValueCount()) {
+ if (definitionLevelsRead == pageReader.currentPageCount) {
// check that we have not hit the end of the row group (in which case we will not find the repetition level indicating
// the end of this record as there is no next page to check, we have read all the values in this repetition so it is okay
// to add it to the read )
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
index a4143d5..8a8ac29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableBitReader.java
@@ -43,7 +43,7 @@ final class NullableBitReader extends ColumnReader {
@Override
public void readField(long recordsToReadInThisPass) {
- recordsReadInThisIteration = Math.min(pageReader.currentPage.getValueCount()
+ recordsReadInThisIteration = Math.min(pageReader.currentPageCount
- pageReader.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
int defLevel;
for (int i = 0; i < recordsReadInThisIteration; i++){
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
index 9e62520..d721601 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableColumnReader.java
@@ -70,9 +70,9 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
boolean lastRunBrokenByNull = false;
while (indexInOutputVector < recordsToReadInThisPass && indexInOutputVector < valueVec.getValueCapacity()){
// read a page if needed
- if ( pageReader.currentPage == null
+ if (!pageReader.hasPage()
|| ((readStartInBytes + readLength >= pageReader.byteLength && bitsUsed == 0) &&
- definitionLevelsRead >= pageReader.currentPage.getValueCount())) {
+ definitionLevelsRead >= pageReader.currentPageCount)) {
if (!pageReader.next()) {
break;
}
@@ -89,7 +89,7 @@ abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<
// loop to find the longest run of defined values available, can be preceded by several nulls
while(indexInOutputVector < recordsToReadInThisPass
&& indexInOutputVector < valueVec.getValueCapacity()
- && definitionLevelsRead < pageReader.currentPage.getValueCount()){
+ && definitionLevelsRead < pageReader.currentPageCount) {
currentDefinitionLevel = pageReader.definitionLevels.readInteger();
definitionLevelsRead++;
indexInOutputVector++;
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index ff1d7f9..c2221d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
-import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import java.math.BigDecimal;
@@ -29,15 +28,15 @@ import org.apache.drill.exec.store.ParquetOutputRecordWriter;
import org.apache.drill.exec.util.DecimalUtility;
import org.apache.drill.exec.vector.NullableBigIntVector;
import org.apache.drill.exec.vector.NullableDateVector;
+import org.apache.drill.exec.vector.NullableDecimal18Vector;
import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
+import org.apache.drill.exec.vector.NullableDecimal9Vector;
import org.apache.drill.exec.vector.NullableFloat4Vector;
import org.apache.drill.exec.vector.NullableFloat8Vector;
import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableDecimal9Vector;
-import org.apache.drill.exec.vector.NullableDecimal18Vector;
-import org.apache.drill.exec.vector.NullableTimeVector;
import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableTimeVector;
import org.apache.drill.exec.vector.ValueVector;
import org.joda.time.DateTimeUtils;
@@ -58,7 +57,7 @@ public class NullableFixedByteAlignedReaders {
// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass) {
- this.bytebuf = pageReader.pageDataByteArray;
+ this.bytebuf = pageReader.pageData;
// fill in data.
vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength);
@@ -259,7 +258,7 @@ public class NullableFixedByteAlignedReaders {
@Override
protected void readField(long recordsToReadInThisPass) {
- this.bytebuf = pageReader.pageDataByteArray;
+ this.bytebuf = pageReader.pageData;
dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
for (int i = 0; i < recordsToReadInThisPass; i++) {
@@ -292,15 +291,6 @@ public class NullableFixedByteAlignedReaders {
dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(intValue - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
}
- // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared
- public static int readIntLittleEndian(ByteBuf in, int offset) {
- int ch4 = in.getByte(offset) & 0xff;
- int ch3 = in.getByte(offset + 1) & 0xff;
- int ch2 = in.getByte(offset + 2) & 0xff;
- int ch1 = in.getByte(offset + 3) & 0xff;
- return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
- }
-
}
public static class NullableDecimal28Reader extends NullableConvertedReader {
@@ -316,7 +306,8 @@ public class NullableFixedByteAlignedReaders {
@Override
void addNext(int start, int index) {
int width = NullableDecimal28SparseHolder.WIDTH;
- BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale());
+ BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes,
+ schemaElement.getScale());
DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getBuffer(), index * width, schemaElement.getScale(),
schemaElement.getPrecision(), NullableDecimal28SparseHolder.nDecimalDigits);
}
@@ -335,7 +326,8 @@ public class NullableFixedByteAlignedReaders {
@Override
void addNext(int start, int index) {
int width = NullableDecimal38SparseHolder.WIDTH;
- BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale());
+ BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes,
+ schemaElement.getScale());
DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getBuffer(), index * width, schemaElement.getScale(),
schemaElement.getPrecision(), NullableDecimal38SparseHolder.nDecimalDigits);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
index aa3d9c5..528b6db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
@@ -88,10 +88,10 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten
}
else {
// re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
- dataTypeLengthInBits = pageReader.pageDataByteArray.getInt((int) pageReader.readyToReadPosInBytes);
+ dataTypeLengthInBits = pageReader.pageData.getInt((int) pageReader.readyToReadPosInBytes);
}
// I think this also needs to happen if it is null for the random access
- boolean success = setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageDataByteArray,
+ boolean success = setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead, pageReader.pageData,
(int) pageReader.readyToReadPosInBytes + 4, dataTypeLengthInBits);
if ( ! success ) {
return true;
@@ -130,7 +130,7 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten
}
// re-purposing this field here for length in BYTES to prevent repetitive multiplication/division
dataTypeLengthInBits = variableWidthVector.getAccessor().getValueLength(valuesReadInCurrentPass);
- boolean success = setSafe(valuesReadInCurrentPass, pageReader.pageDataByteArray,
+ boolean success = setSafe(valuesReadInCurrentPass, pageReader.pageData,
(int) pageReader.readPosInBytes + 4, dataTypeLengthInBits);
assert success;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0dd0e833/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index d260029..6a41a04 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -17,14 +17,15 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;
+import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.store.parquet.ColumnDataReader;
import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
@@ -34,30 +35,35 @@ import org.apache.hadoop.fs.Path;
import parquet.bytes.BytesInput;
import parquet.column.Dictionary;
+import parquet.column.Encoding;
import parquet.column.ValuesType;
-import parquet.column.page.DataPageV1;
import parquet.column.page.DictionaryPage;
+import parquet.column.statistics.Statistics;
import parquet.column.values.ValuesReader;
import parquet.column.values.dictionary.DictionaryValuesReader;
import parquet.format.PageHeader;
import parquet.format.PageType;
import parquet.format.Util;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.CodecFactoryExposer.HadoopByteBufBytesInput;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.schema.PrimitiveType;
-import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
+import com.google.common.base.Preconditions;
// class to keep track of the read position of variable length columns
final class PageReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PageReader.class);
+ public static final ParquetMetadataConverter METADATA_CONVERTER = ParquetFormatPlugin.parquetMetadataConverter;
+
private final ColumnReader parentColumnReader;
private final ColumnDataReader dataReader;
- // store references to the pages that have been uncompressed, but not copied to ValueVectors yet
- DataPageV1 currentPage;
+
// buffer to store bytes of current page
- DrillBuf pageDataByteArray;
+ DrillBuf pageData;
// for variable length data we need to keep track of our current position in the page data
// as the values and lengths are intermixed, making random access to the length data impossible
@@ -90,60 +96,72 @@ final class PageReader {
Dictionary dictionary;
PageHeader pageHeader = null;
- List<ByteBuf> allocatedBuffers;
+ int currentPageCount = -1;
// These need to be held throughout reading of the entire column chunk
List<ByteBuf> allocatedDictionaryBuffers;
- PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
+ private final CodecFactoryExposer codecFactory;
+
+ PageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
throws ExecutionSetupException{
this.parentColumnReader = parentStatus;
- allocatedBuffers = new ArrayList<ByteBuf>();
allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
+ codecFactory = parentColumnReader.parentReader.getCodecFactoryExposer();
- long totalByteLength = columnChunkMetaData.getTotalUncompressedSize();
long start = columnChunkMetaData.getFirstDataPageOffset();
try {
FSDataInputStream f = fs.open(path);
this.dataReader = new ColumnDataReader(f, start, columnChunkMetaData.getTotalSize());
- if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
- f.seek(columnChunkMetaData.getDictionaryPageOffset());
- PageHeader pageHeader = Util.readPageHeader(f);
- assert pageHeader.type == PageType.DICTIONARY_PAGE;
-
- BytesInput bytesIn;
- ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
- allocatedDictionaryBuffers.add(uncompressedData);
- if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED) {
- dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size);
- bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
- pageHeader.getUncompressed_page_size());
- }else{
- ByteBuf compressedData=allocateBuffer(pageHeader.compressed_page_size);
- dataReader.getPageAsBytesBuf(compressedData, pageHeader.compressed_page_size);
- bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
- .decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+ loadDictionaryIfExists(parentStatus, columnChunkMetaData, f);
+
+ } catch (IOException e) {
+ throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: "
+ + path.getName(), e);
+ }
+
+ }
+
+ private void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
+ final ColumnChunkMetaData columnChunkMetaData, final FSDataInputStream f) throws IOException {
+ if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
+ f.seek(columnChunkMetaData.getDictionaryPageOffset());
+ final PageHeader pageHeader = Util.readPageHeader(f);
+ assert pageHeader.type == PageType.DICTIONARY_PAGE;
+
+ final DrillBuf dictionaryData = allocateDictionaryBuffer(pageHeader.getUncompressed_page_size());
+
+ if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED) {
+ dataReader.loadPage(dictionaryData, pageHeader.compressed_page_size);
+ } else {
+ final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
+ try {
+ dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
+ codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
compressedData,
- uncompressedData,
+ dictionaryData,
pageHeader.compressed_page_size,
pageHeader.getUncompressed_page_size());
+
+ } finally {
compressedData.release();
}
- DictionaryPage page = new DictionaryPage(
- bytesIn,
- pageHeader.uncompressed_page_size,
- pageHeader.dictionary_page_header.num_values,
- parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
- );
- this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
}
- } catch (IOException e) {
- throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: "
- + path.getName(), e);
- }
+ DictionaryPage page = new DictionaryPage(
+ getBytesInput(dictionaryData),
+ pageHeader.uncompressed_page_size,
+ pageHeader.dictionary_page_header.num_values,
+ parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
+ );
+ this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
+ }
}
+ public static BytesInput getBytesInput(DrillBuf uncompressedByteBuf) throws IOException {
+ final ByteBuffer outBuffer = uncompressedByteBuf.nioBuffer(0, uncompressedByteBuf.capacity());
+ return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit());
+ }
/**
* Grab the next page.
@@ -153,7 +171,7 @@ final class PageReader {
*/
public boolean next() throws IOException {
- currentPage = null;
+ currentPageCount = -1;
valuesRead = 0;
valuesReadyToRead = 0;
@@ -172,26 +190,24 @@ final class PageReader {
if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
//TODO: Handle buffer allocation exception
- BytesInput bytesIn;
- ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
- allocatedDictionaryBuffers.add(uncompressedData);
+ DrillBuf uncompressedData = allocateDictionaryBuffer(pageHeader.getUncompressed_page_size());
if( parentColumnReader.columnChunkMetaData.getCodec()== CompressionCodecName.UNCOMPRESSED) {
- dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size);
- bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
- pageHeader.getUncompressed_page_size());
+ dataReader.loadPage(uncompressedData, pageHeader.compressed_page_size);
}else{
- ByteBuf compressedData=allocateBuffer(pageHeader.compressed_page_size);
- dataReader.getPageAsBytesBuf(compressedData, pageHeader.compressed_page_size);
- bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
- .decompress(parentColumnReader.columnChunkMetaData.getCodec(),
- compressedData,
- uncompressedData,
- pageHeader.compressed_page_size,
- pageHeader.getUncompressed_page_size());
- compressedData.release();
+ final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
+ try{
+ dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
+ codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+ compressedData,
+ uncompressedData,
+ pageHeader.compressed_page_size,
+ pageHeader.getUncompressed_page_size());
+ } finally {
+ compressedData.release();
+ }
}
DictionaryPage page = new DictionaryPage(
- bytesIn,
+ getBytesInput(uncompressedData),
pageHeader.uncompressed_page_size,
pageHeader.dictionary_page_header.num_values,
parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
@@ -201,47 +217,40 @@ final class PageReader {
} while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
//TODO: Handle buffer allocation exception
- BytesInput bytesIn;
- ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
- allocatedBuffers.add(uncompressedData);
+
+ allocatePageData(pageHeader.getUncompressed_page_size());
+
if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED) {
- dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size);
- bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
- pageHeader.getUncompressed_page_size());
+ dataReader.loadPage(pageData, pageHeader.compressed_page_size);
}else{
- ByteBuf compressedData=allocateBuffer(pageHeader.compressed_page_size);
- dataReader.getPageAsBytesBuf(compressedData, pageHeader.compressed_page_size);
- bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
- .decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+ final DrillBuf compressedData = allocateTemporaryBuffer(pageHeader.compressed_page_size);
+ dataReader.loadPage(compressedData, pageHeader.compressed_page_size);
+ codecFactory.decompress(parentColumnReader.columnChunkMetaData.getCodec(),
compressedData,
- uncompressedData,
+ pageData,
pageHeader.compressed_page_size,
pageHeader.getUncompressed_page_size());
compressedData.release();
}
- currentPage = new DataPageV1(
- bytesIn,
- pageHeader.data_page_header.num_values,
- pageHeader.uncompressed_page_size,
- fromParquetStatistics(pageHeader.data_page_header.getStatistics(), parentColumnReader.getColumnDescriptor().getType()), // ?
- ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
- ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
- ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
- );
- byteLength = pageHeader.uncompressed_page_size;
+ currentPageCount = pageHeader.data_page_header.num_values;
- if (currentPage == null) {
- return false;
- }
+ final int uncompressedPageSize = pageHeader.uncompressed_page_size;
+ final Statistics<?> stats = fromParquetStatistics(pageHeader.data_page_header.getStatistics(), parentColumnReader
+ .getColumnDescriptor().getType());
+ final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
+
+ final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding);
+ final Encoding valueEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.encoding);
+
+ byteLength = pageHeader.uncompressed_page_size;
- pageDataByteArray = DrillBuf.wrapByteBuffer(currentPage.getBytes().toByteBuffer());
- allocatedBuffers.add(pageDataByteArray);
+ final ByteBuffer pageDataBuffer = pageData.nioBuffer(0, pageData.capacity());
readPosInBytes = 0;
if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
- repetitionLevels = currentPage.getRlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
- repetitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+ repetitionLevels = rlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL);
+ repetitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
// we know that the first value will be a 0, at the end of each list of repeated values we will hit another 0 indicating
// a new record, although we don't know the length until we hit it (and this is a one way stream of integers) so we
// read the first zero here to simplify the reading processes, and start reading the first value the same as all
@@ -252,25 +261,25 @@ final class PageReader {
}
if (parentColumnReader.columnDescriptor.getMaxDefinitionLevel() != 0){
parentColumnReader.currDefLevel = -1;
- definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
- definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+ definitionLevels = dlEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
+ definitionLevels.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
readPosInBytes = definitionLevels.getNextOffset();
- if ( ! currentPage.getValueEncoding().usesDictionary()) {
- valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
- valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+ if (!valueEncoding.usesDictionary()) {
+ valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
+ valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
}
}
if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
- valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
- valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+ valueReader = valueEncoding.getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
+ valueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
}
- if (currentPage.getValueEncoding().usesDictionary()) {
+ if (valueEncoding.usesDictionary()) {
// initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for
// actually copying the values out into the vectors
dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary);
- dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+ dictionaryLengthDeterminingReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
dictionaryValueReader = new DictionaryValuesReader(dictionary);
- dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes);
+ dictionaryValueReader.initFromPage(currentPageCount, pageDataBuffer, (int) readPosInBytes);
parentColumnReader.usingDictionary = true;
} else {
parentColumnReader.usingDictionary = false;
@@ -283,11 +292,41 @@ final class PageReader {
return true;
}
+ /**
+ * Allocate a page data buffer. Note that only one page data buffer should be active at a time. The reader will ensure
+ * that the page data is released after the reader is completed.
+ */
+ private void allocatePageData(int size) {
+ Preconditions.checkArgument(pageData == null);
+ pageData = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
+ }
+
+ /**
+ * Allocate a buffer which the user should release immediately. The reader does not manage release of these buffers.
+ */
+ private DrillBuf allocateTemporaryBuffer(int size) {
+ return parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
+ }
+
+ /**
+ * Allocate and return a dictionary buffer. These are maintained for the life of the reader and then released when the
+ * reader is cleared.
+ */
+ private DrillBuf allocateDictionaryBuffer(int size) {
+ DrillBuf buf = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
+ allocatedDictionaryBuffers.add(buf);
+ return buf;
+ }
+
+ protected boolean hasPage() {
+ return currentPageCount != -1;
+ }
+
public void clearBuffers() {
- for (ByteBuf b : allocatedBuffers) {
- b.release();
+ if (pageData != null) {
+ pageData.release();
+ pageData = null;
}
- allocatedBuffers.clear();
}
public void clearDictionaryBuffers() {
@@ -306,20 +345,6 @@ final class PageReader {
//}
}
- /*
- Allocate direct memory to read data into
- */
- private ByteBuf allocateBuffer(int size) {
- ByteBuf b;
- try {
- b = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
- //b = UnpooledByteBufAllocator.DEFAULT.heapBuffer(size);
- }catch(Exception e){
- throw new DrillRuntimeException("Unable to allocate "+size+" bytes of memory in the Parquet Reader."+
- "[Exception: "+e.getMessage()+"]"
- );
- }
- return b;
- }
+
}