You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/13 00:44:03 UTC
[1/3] drill git commit: DRILL-3033: Add memory leak fixes found so
far in DRILL-1942 to 1.0.0 Fixes some missing buffer retains() and missing
vector clears().
Repository: drill
Updated Branches:
refs/heads/master d10769f47 -> 6d7cda8ea
DRILL-3033: Add memory leak fixes found so far in DRILL-1942 to 1.0.0 Fixes some missing buffer retains() and missing vector clears().
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/da70b63f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/da70b63f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/da70b63f
Branch: refs/heads/master
Commit: da70b63fb63419414f54b3159f7e92694039d1da
Parents: d10769f
Author: Chris Westin <cw...@yahoo.com>
Authored: Mon May 11 22:06:30 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 12 14:06:55 2015 -0700
----------------------------------------------------------------------
.../codegen/templates/FixedValueVectors.java | 9 +-
.../codegen/templates/NullableValueVectors.java | 75 +++---
.../templates/VariableLengthVectors.java | 111 ++++-----
.../cache/VectorAccessibleSerializable.java | 12 +-
.../drill/exec/memory/BufferAllocator.java | 3 +
.../impl/TopN/PriorityQueueTemplate.java | 17 +-
.../exec/physical/impl/TopN/TopNBatch.java | 44 ++--
.../exec/physical/impl/join/MergeJoinBatch.java | 1 +
.../impl/join/MergeJoinBatchBuilder.java | 18 +-
.../impl/mergereceiver/MergingRecordBatch.java | 2 +
.../OrderedPartitionRecordBatch.java | 226 ++++++++++---------
.../exec/physical/impl/sort/SortBatch.java | 1 +
.../impl/sort/SortRecordBatchBuilder.java | 15 +-
.../physical/impl/xsort/ExternalSortBatch.java | 5 +
.../exec/physical/impl/xsort/MSortTemplate.java | 10 +-
.../impl/xsort/PriorityQueueCopierTemplate.java | 7 +-
.../apache/drill/exec/record/WritableBatch.java | 55 ++---
.../org/apache/drill/exec/vector/BitVector.java | 2 +
.../exec/vector/complex/AbstractMapVector.java | 9 +
.../drill/exec/vector/complex/MapVector.java | 10 +
.../exec/record/vector/TestValueVector.java | 37 ++-
21 files changed, 376 insertions(+), 293 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
index 7d85810..0dffa0b 100644
--- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java
@@ -50,7 +50,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
private int allocationValueCount = INITIAL_VALUE_ALLOCATION;
private int allocationMonitor = 0;
-
+
public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) {
super(field, allocator);
}
@@ -187,6 +187,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
int currentWriterIndex = data.writerIndex();
int startPoint = startIndex * ${type.width};
int sliceLength = length * ${type.width};
+ target.clear();
target.data = this.data.slice(startPoint, sliceLength);
target.data.writerIndex(sliceLength);
target.data.retain();
@@ -749,9 +750,9 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
}
</#if> <#-- type.width -->
-
-
+
+
public void setValueCount(int valueCount) {
int currentValueCapacity = getValueCapacity();
int idx = (${type.width} * valueCount);
@@ -770,7 +771,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
-
+
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index 9d03efb..ce6a3a7 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -79,13 +79,14 @@ public final class ${className} extends BaseDataValueVector implements <#if type
}
return buffers;
}
-
+
@Override
public void clear() {
bits.clear();
values.clear();
+ super.clear();
}
-
+
public int getBufferSize(){
return values.getBufferSize() + bits.getBufferSize();
}
@@ -120,7 +121,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
throw new OutOfMemoryRuntimeException("Failure while allocating buffer.");
}
}
-
+
@Override
public boolean allocateNewSafe() {
if(!values.allocateNewSafe()) return false;
@@ -144,7 +145,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
public int load(int dataBytes, int valueCount, DrillBuf buf){
clear();
int loaded = bits.load(valueCount, buf);
-
+
// remove bits part of buffer.
buf = buf.slice(loaded, buf.capacity() - loaded);
dataBytes -= loaded;
@@ -152,14 +153,14 @@ public final class ${className} extends BaseDataValueVector implements <#if type
this.mutator.lastSet = valueCount;
return loaded;
}
-
+
@Override
public void load(SerializedField metadata, DrillBuf buffer) {
assert this.field.matches(metadata) : String.format("The field %s doesn't match the provided metadata %s.", this.field, metadata);
int loaded = load(metadata.getBufferLength(), metadata.getValueCount(), buffer);
assert metadata.getBufferLength() == loaded : String.format("Expected to load %d bytes but actually loaded %d bytes", metadata.getBufferLength(), loaded);
}
-
+
@Override
public int getByteCapacity(){
return values.getByteCapacity();
@@ -180,7 +181,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
mutator.reset();
accessor.reset();
}
-
+
@Override
public boolean allocateNewSafe() {
@@ -213,22 +214,22 @@ public final class ${className} extends BaseDataValueVector implements <#if type
public int load(int valueCount, DrillBuf buf){
clear();
int loaded = bits.load(valueCount, buf);
-
+
// remove bits part of buffer.
buf = buf.slice(loaded, buf.capacity() - loaded);
loaded += values.load(valueCount, buf);
return loaded;
}
-
+
@Override
public void load(SerializedField metadata, DrillBuf buffer) {
assert this.field.matches(metadata);
int loaded = load(metadata.getValueCount(), buffer);
assert metadata.getBufferLength() == loaded;
}
-
+
</#if>
-
+
public TransferPair getTransferPair(){
return new TransferImpl(getField());
}
@@ -240,7 +241,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
return new TransferImpl((Nullable${minor.class}Vector) to);
}
-
+
public void transferTo(Nullable${minor.class}Vector target){
bits.transferTo(target.bits);
values.transferTo(target.values);
@@ -257,10 +258,10 @@ public final class ${className} extends BaseDataValueVector implements <#if type
target.mutator.lastSet = length - 1;
</#if>
}
-
+
private class TransferImpl implements TransferPair{
Nullable${minor.class}Vector to;
-
+
public TransferImpl(MaterializedField field){
this.to = new Nullable${minor.class}Vector(field, allocator);
}
@@ -272,7 +273,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
public Nullable${minor.class}Vector getTo(){
return to;
}
-
+
public void transfer(){
transferTo(to);
}
@@ -286,15 +287,15 @@ public final class ${className} extends BaseDataValueVector implements <#if type
to.copyFromSafe(fromIndex, toIndex, Nullable${minor.class}Vector.this);
}
}
-
+
public Accessor getAccessor(){
return accessor;
}
-
+
public Mutator getMutator(){
return mutator;
}
-
+
public ${minor.class}Vector convertToRequiredVector(){
${minor.class}Vector v = new ${minor.class}Vector(getField().getOtherNullableVersion(), allocator);
v.data = values.data;
@@ -303,14 +304,14 @@ public final class ${className} extends BaseDataValueVector implements <#if type
return v;
}
-
+
public void copyFrom(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
if (!from.getAccessor().isNull(fromIndex)) {
mutator.set(thisIndex, from.getAccessor().get(fromIndex));
}
}
-
+
public void copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
<#if type.major == "VarLen">
mutator.fillEmpties(thisIndex);
@@ -318,7 +319,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
values.copyFromSafe(fromIndex, thisIndex, from);
bits.getMutator().setSafe(thisIndex, 1);
}
-
+
public void copyFromSafe(int fromIndex, int thisIndex, Nullable${minor.class}Vector from){
<#if type.major == "VarLen">
mutator.fillEmpties(thisIndex);
@@ -340,7 +341,9 @@ public final class ${className} extends BaseDataValueVector implements <#if type
* @throws NullValueException if the value is null
*/
public <#if type.major == "VarLen">byte[]<#else>${minor.javaType!type.javaType}</#if> get(int index) {
- assert !isNull(index) : "Tried to get null value";
+ if (isNull(index)) {
+ throw new IllegalStateException("Can't get a null value");
+ }
return vAccessor.get(index);
}
@@ -351,7 +354,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
public int isSet(int index){
return bAccessor.get(index);
}
-
+
<#if type.major == "VarLen">
public long getStartEnd(int index){
return vAccessor.getStartEnd(index);
@@ -394,12 +397,12 @@ public final class ${className} extends BaseDataValueVector implements <#if type
public int getValueCount(){
return bits.getAccessor().getValueCount();
}
-
+
public void reset(){}
}
-
+
public final class Mutator extends BaseDataValueVector.BaseMutator implements NullableVectorDefinitionSetter<#if type.major = "VarLen">, VariableWidthVector.VariableWidthMutator</#if> {
-
+
private int setCount;
<#if type.major = "VarLen"> private int lastSet = -1;</#if>
@@ -447,7 +450,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
values.getMutator().setValueLengthSafe(index, length);
}
</#if>
-
+
public void setSafe(int index, byte[] value, int start, int length) {
<#if type.major != "VarLen">
throw new UnsupportedOperationException();
@@ -460,7 +463,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
<#if type.major == "VarLen">lastSet = index;</#if>
</#if>
}
-
+
public void setSafe(int index, ByteBuffer value, int start, int length) {
<#if type.major != "VarLen">
throw new UnsupportedOperationException();
@@ -477,7 +480,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
public void setNull(int index){
bits.getMutator().setSafe(index, 0);
}
-
+
public void setSkipNull(int index, ${minor.class}Holder holder){
values.getMutator().set(index, holder);
}
@@ -485,8 +488,8 @@ public final class ${className} extends BaseDataValueVector implements <#if type
public void setSkipNull(int index, Nullable${minor.class}Holder holder){
values.getMutator().set(index, holder);
}
-
-
+
+
public void set(int index, Nullable${minor.class}Holder holder){
<#if type.major == "VarLen">
for (int i = lastSet + 1; i < index; i++) {
@@ -508,7 +511,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
values.getMutator().set(index, holder);
<#if type.major == "VarLen">lastSet = index;</#if>
}
-
+
public boolean isSafe(int outIndex) {
return outIndex < Nullable${minor.class}Vector.this.getValueCapacity();
}
@@ -524,12 +527,12 @@ public final class ${className} extends BaseDataValueVector implements <#if type
values.getMutator().set(index<#list fields as field><#if field.include!true >, ${field.name}Field</#if></#list>);
<#if type.major == "VarLen">lastSet = index;</#if>
}
-
+
public void setSafe(int index, int isSet<#list fields as field><#if field.include!true >, ${field.type} ${field.name}Field</#if></#list> ) {
<#if type.major == "VarLen">
fillEmpties(index);
</#if>
-
+
bits.getMutator().setSafe(index, isSet);
values.getMutator().setSafe(index<#list fields as field><#if field.include!true >, ${field.name}Field</#if></#list>);
setCount++;
@@ -558,7 +561,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
setCount++;
<#if type.major == "VarLen">lastSet = index;</#if>
}
-
+
<#if !(type.major == "VarLen" || minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense" || minor.class == "Interval" || minor.class == "IntervalDay")>
public void setSafe(int index, ${minor.javaType!type.javaType} value) {
<#if type.major == "VarLen">
@@ -591,7 +594,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
setCount = 0;
<#if type.major = "VarLen">lastSet = -1;</#if>
}
-
+
}
}
</#list>
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 659d99b..529f21b 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -55,15 +55,15 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
private static final int DEFAULT_RECORD_BYTE_COUNT = 8;
private static final int INITIAL_BYTE_COUNT = 4096 * DEFAULT_RECORD_BYTE_COUNT;
private static final int MIN_BYTE_COUNT = 4096;
-
+
private final UInt${type.width}Vector offsetVector;
private final FieldReader reader = new ${minor.class}ReaderImpl(${minor.class}Vector.this);
private final Accessor accessor;
private final Mutator mutator;
-
+
private final UInt${type.width}Vector.Accessor oAccessor;
-
+
private int allocationTotalByteCount = INITIAL_BYTE_COUNT;
private int allocationMonitor = 0;
@@ -85,23 +85,23 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
if (getAccessor().getValueCount() == 0) return 0;
return offsetVector.getBufferSize() + data.writerIndex();
}
-
+
int getSizeFromCount(int valueCount) {
return valueCount * ${type.width};
}
-
+
public int getValueCapacity(){
return offsetVector.getValueCapacity() - 1;
}
-
+
public int getByteCapacity(){
- return data.capacity();
+ return data.capacity();
}
public int getCurrentSizeInBytes() {
return offsetVector.getAccessor().get(getAccessor().getValueCount());
}
-
+
/**
* Return the number of bytes contained in the current var len byte vector.
* @return
@@ -111,7 +111,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
if(valueCount == 0) return 0;
return offsetVector.getAccessor().get(valueCount);
}
-
+
@Override
public SerializedField getMetadata() {
return getMetadataBuilder() //
@@ -126,26 +126,27 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
allocateNew(0,0);
return 0;
}
+ clear();
int loaded = offsetVector.load(valueCount+1, buf);
data = buf.slice(loaded, dataBytes - loaded);
data.retain();
return dataBytes;
}
-
+
@Override
public void load(SerializedField metadata, DrillBuf buffer) {
assert this.field.matches(metadata) : String.format("The field %s doesn't match the provided metadata %s.", this.field, metadata);
int loaded = load(metadata.getBufferLength(), metadata.getValueCount(), buffer);
assert metadata.getBufferLength() == loaded : String.format("Expected to load %d bytes but actually loaded %d bytes", metadata.getBufferLength(), loaded);
}
-
+
@Override
public void clear() {
super.clear();
offsetVector.clear();
}
-
+
@Override
public DrillBuf[] getBuffers(boolean clear) {
DrillBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(false), super.getBuffers(false), DrillBuf.class);
@@ -158,15 +159,15 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
}
return buffers;
}
-
+
public long getOffsetAddr(){
return offsetVector.getBuffer().memoryAddress();
}
-
+
public UInt${type.width}Vector getOffsetVector(){
return offsetVector;
}
-
+
public TransferPair getTransferPair(){
return new TransferImpl(getField());
}
@@ -177,7 +178,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
public TransferPair makeTransferPair(ValueVector to) {
return new TransferImpl((${minor.class}Vector) to);
}
-
+
public void transferTo(${minor.class}Vector target){
target.clear();
this.offsetVector.transferTo(target.offsetVector);
@@ -198,25 +199,25 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
target.data.retain();
target.getMutator().setValueCount(length);
}
-
+
protected void copyFrom(int fromIndex, int thisIndex, ${minor.class}Vector from){
int start = from.offsetVector.getAccessor().get(fromIndex);
int end = from.offsetVector.getAccessor().get(fromIndex+1);
int len = end - start;
-
+
int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(thisIndex * ${type.width});
from.data.getBytes(start, data, outputStart, len);
offsetVector.data.set${(minor.javaType!type.javaType)?cap_first}( (thisIndex+1) * ${type.width}, outputStart + len);
}
-
+
public boolean copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){
int start = from.offsetVector.getAccessor().get(fromIndex);
int end = from.offsetVector.getAccessor().get(fromIndex+1);
int len = end - start;
-
+
int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(thisIndex * ${type.width});
-
+
while(data.capacity() < outputStart + len) {
reAlloc();
}
@@ -228,10 +229,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
return true;
}
-
+
private class TransferImpl implements TransferPair{
${minor.class}Vector to;
-
+
public TransferImpl(MaterializedField field){
this.to = new ${minor.class}Vector(field, allocator);
}
@@ -243,7 +244,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
public ${minor.class}Vector getTo(){
return to;
}
-
+
public void transfer(){
transferTo(to);
}
@@ -251,7 +252,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
public void splitAndTransfer(int startIndex, int length) {
splitAndTransferTo(startIndex, length, to);
}
-
+
@Override
public void copyValueSafe(int fromIndex, int toIndex) {
to.copyFromSafe(fromIndex, toIndex, ${minor.class}Vector.this);
@@ -269,7 +270,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
throw new OutOfMemoryRuntimeException("Failure while allocating buffer.");
}
}
-
+
@Override
public boolean allocateNewSafe() {
clear();
@@ -295,7 +296,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
offsetVector.zeroVector();
return true;
}
-
+
public void allocateNew(int totalBytes, int valueCount) {
clear();
assert totalBytes >= 0;
@@ -338,18 +339,18 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
public Accessor getAccessor(){
return accessor;
}
-
+
public Mutator getMutator() {
return mutator;
}
-
+
public final class Accessor extends BaseValueVector.BaseAccessor implements VariableWidthAccessor {
final UInt${type.width}Vector.Accessor oAccessor = offsetVector.getAccessor();
public long getStartEnd(int index){
return oAccessor.getTwoAsLong(index);
}
-
+
public byte[] get(int index) {
assert index >= 0;
int startIdx = oAccessor.get(index);
@@ -363,20 +364,20 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
public int getValueLength(int index) {
return offsetVector.getAccessor().get(index + 1) - offsetVector.getAccessor().get(index);
}
-
+
public void get(int index, ${minor.class}Holder holder){
holder.start = oAccessor.get(index);
holder.end = oAccessor.get(index + 1);
holder.buffer = data;
}
-
+
public void get(int index, Nullable${minor.class}Holder holder){
holder.isSet = 1;
holder.start = oAccessor.get(index);
holder.end = oAccessor.get(index + 1);
holder.buffer = data;
}
-
+
<#switch minor.class>
<#case "VarChar">
@@ -397,9 +398,9 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
}
</#switch>
-
-
-
+
+
+
public int getValueCount() {
return Math.max(offsetVector.getAccessor().getValueCount()-1, 0);
}
@@ -407,12 +408,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
public boolean isNull(int index){
return false;
}
-
+
public UInt${type.width}Vector getOffsetVector(){
return offsetVector;
}
}
-
+
/**
* Mutable${minor.class} implements a vector of variable width values. Elements in the vector
* are accessed by position from the logical start of the vector. A fixed width offsetVector
@@ -464,7 +465,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
offsetVector.getMutator().set(index + 1, currentOffset + length);
data.setBytes(currentOffset, bytes, start, length);
}
-
+
public void setSafe(int index, ByteBuffer bytes, int start, int length) {
assert index >= 0;
@@ -500,51 +501,51 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
public void setSafe(int index, int start, int end, DrillBuf buffer){
int len = end - start;
-
+
int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
-
+
while(data.capacity() < outputStart + len) {
reAlloc();
}
-
+
offsetVector.getMutator().setSafe( index+1, outputStart + len);
buffer.getBytes(start, data, outputStart, len);
}
-
-
+
+
public void setSafe(int index, Nullable${minor.class}Holder holder){
assert holder.isSet == 1;
int start = holder.start;
int end = holder.end;
int len = end - start;
-
+
int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
-
+
while(data.capacity() < outputStart + len) {
reAlloc();
}
-
+
holder.buffer.getBytes(start, data, outputStart, len);
offsetVector.getMutator().setSafe( index+1, outputStart + len);
}
-
+
public void setSafe(int index, ${minor.class}Holder holder){
int start = holder.start;
int end = holder.end;
int len = end - start;
-
+
int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width});
-
+
while(data.capacity() < outputStart + len) {
reAlloc();
}
-
+
holder.buffer.getBytes(start, data, outputStart, len);
offsetVector.getMutator().setSafe( index+1, outputStart + len);
}
-
+
protected void set(int index, int start, int length, DrillBuf buffer){
assert index >= 0;
int currentOffset = offsetVector.getAccessor().get(index);
@@ -559,14 +560,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
offsetVector.getMutator().set(index + 1, currentOffset + length);
data.setBytes(currentOffset, holder.buffer, holder.start, length);
}
-
+
protected void set(int index, ${minor.class}Holder holder){
int length = holder.end - holder.start;
int currentOffset = offsetVector.getAccessor().get(index);
offsetVector.getMutator().set(index + 1, currentOffset + length);
data.setBytes(currentOffset, holder.buffer, holder.start, length);
}
-
+
public void setValueCount(int valueCount) {
int currentByteCapacity = getByteCapacity();
int idx = offsetVector.getAccessor().get(valueCount);
@@ -601,7 +602,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
setValueCount(size);
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 8e2ce96..016cd92 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -117,10 +117,14 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
if (buf == null) {
throw new IOException(new OutOfMemoryException());
}
- buf.writeBytes(input, dataLength);
- ValueVector vector = TypeHelper.getNewVector(field, allocator);
- vector.load(metaData, buf);
- buf.release();
+ final ValueVector vector;
+ try {
+ buf.writeBytes(input, dataLength);
+ vector = TypeHelper.getNewVector(field, allocator);
+ vector.load(metaData, buf);
+ } finally {
+ buf.release();
+ }
vectorList.add(vector);
}
container.addCollection(vectorList);
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index c233ac5..811cceb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -101,6 +101,9 @@ public interface BufferAllocator extends Closeable {
/**
* Not thread safe.
+ *
+ * WARNING: unclaimed pre-allocations leak memory. If you call preAllocate(), you must
+ * make sure to ultimately try to get the buffer and release it.
*/
public interface PreAllocator {
public boolean preAllocate(int bytes);
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 369c0ec..7e22e65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.impl.TopN;
+import io.netty.buffer.DrillBuf;
+
import java.util.concurrent.TimeUnit;
import javax.inject.Named;
@@ -53,9 +55,8 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
this.limit = limit;
this.context = context;
this.allocator = allocator;
- BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
- preAlloc.preAllocate(4 * (limit + 1));
- heapSv4 = new SelectionVector4(preAlloc.getAllocation(), limit, Character.MAX_VALUE);
+ final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1));
+ heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE);
this.hasSv2 = hasSv2;
}
@@ -70,9 +71,8 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
this.hyperBatch = new ExpandableHyperContainer(newContainer);
this.batchCount = hyperBatch.iterator().next().getValueVectors().length;
- BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
- preAlloc.preAllocate(4 * (limit + 1));
- this.heapSv4 = new SelectionVector4(preAlloc.getAllocation(), limit, Character.MAX_VALUE);
+ final DrillBuf drillBuf = allocator.buffer(4 * (limit + 1));
+ this.heapSv4 = new SelectionVector4(drillBuf, limit, Character.MAX_VALUE);
for (int i = 0; i < v4.getTotalCount(); i++) {
heapSv4.set(i, v4.get(i));
}
@@ -120,9 +120,8 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
public void generate() throws SchemaChangeException {
Stopwatch watch = new Stopwatch();
watch.start();
- BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
- preAlloc.preAllocate(4 * queueSize);
- finalSv4 = new SelectionVector4(preAlloc.getAllocation(), queueSize, 4000);
+ final DrillBuf drillBuf = allocator.buffer(4 * queueSize);
+ finalSv4 = new SelectionVector4(drillBuf, queueSize, 4000);
for (int i = queueSize - 1; i >= 0; i--) {
finalSv4.set(i, pop());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 1cf6213..349f1b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -278,26 +278,30 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
copier.setupRemover(context, batch, newBatch);
}
SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
- do {
- int count = selectionVector4.getCount();
- int copiedRecords = copier.copyRecords(0, count);
- assert copiedRecords == count;
- for (VectorWrapper<?> v : newContainer) {
- ValueVector.Mutator m = v.getValueVector().getMutator();
- m.setValueCount(count);
- }
- newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- newContainer.setRecordCount(count);
- builder.add(newBatch);
- } while (selectionVector4.next());
- selectionVector4.clear();
- c.clear();
- VectorContainer newQueue = new VectorContainer();
- builder.canonicalize();
- builder.build(context, newQueue);
- priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
- builder.getSv4().clear();
- selectionVector4.clear();
+ try {
+ do {
+ int count = selectionVector4.getCount();
+ int copiedRecords = copier.copyRecords(0, count);
+ assert copiedRecords == count;
+ for (VectorWrapper<?> v : newContainer) {
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(count);
+ }
+ newContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ newContainer.setRecordCount(count);
+ builder.add(newBatch);
+ } while (selectionVector4.next());
+ selectionVector4.clear();
+ c.clear();
+ VectorContainer newQueue = new VectorContainer();
+ builder.canonicalize();
+ builder.build(context, newQueue);
+ priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
+ builder.getSv4().clear();
+ selectionVector4.clear();
+ } finally {
+ builder.close();
+ }
logger.debug("Took {} us to purge", watch.elapsed(TimeUnit.MICROSECONDS));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 026d79e..ee2ce7f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -256,6 +256,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
public void resetBatchBuilder() {
+ batchBuilder.close();
batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
index 1187bd6..2798010 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.impl.join;
+import io.netty.buffer.DrillBuf;
+
import java.util.List;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -33,7 +35,7 @@ import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.ArrayListMultimap;
-public class MergeJoinBatchBuilder {
+public class MergeJoinBatchBuilder implements AutoCloseable {
private final ArrayListMultimap<BatchSchema, RecordBatchData> queuedRightBatches = ArrayListMultimap.create();
private VectorContainer container;
@@ -41,6 +43,7 @@ public class MergeJoinBatchBuilder {
private int runningBatches;
private int recordCount;
private PreAllocator svAllocator;
+ private boolean svAllocatorUsed = false;
private JoinStatus status;
public MergeJoinBatchBuilder(BufferAllocator allocator, JoinStatus status) {
@@ -90,7 +93,9 @@ public class MergeJoinBatchBuilder {
if (queuedRightBatches.size() > Character.MAX_VALUE) {
throw new SchemaChangeException("Join cannot work on more than %d batches at a time.", (int) Character.MAX_VALUE);
}
- status.sv4 = new SelectionVector4(svAllocator.getAllocation(), recordCount, Character.MAX_VALUE);
+ final DrillBuf drillBuf = svAllocator.getAllocation();
+ svAllocatorUsed = true;
+ status.sv4 = new SelectionVector4(drillBuf, recordCount, Character.MAX_VALUE);
BatchSchema schema = queuedRightBatches.keySet().iterator().next();
List<RecordBatchData> data = queuedRightBatches.get(schema);
@@ -140,4 +145,13 @@ public class MergeJoinBatchBuilder {
container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
}
+ @Override
+ public void close() {
+ if (!svAllocatorUsed) {
+ final DrillBuf drillBuf = svAllocator.getAllocation();
+ if (drillBuf != null) {
+ drillBuf.release();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index b28b7b0..611052b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -733,6 +733,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
f.cleanup();
}
}
+
+ super.close();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index ca6d83c..1286fe1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -172,74 +172,84 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// Start collecting batches until recordsToSample records have been collected
SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
- builder.add(incoming);
-
- recordsSampled += incoming.getRecordCount();
-
- outer: while (recordsSampled < recordsToSample) {
- upstream = next(incoming);
- switch (upstream) {
- case NONE:
- case NOT_YET:
- case STOP:
- upstreamNone = true;
- break outer;
- default:
- // fall through
- }
+ WritableBatch batch = null;
+ CachedVectorContainer sampleToSave = null;
+ VectorContainer containerToCache = new VectorContainer();
+ try {
builder.add(incoming);
+
recordsSampled += incoming.getRecordCount();
- if (upstream == IterOutcome.NONE) {
- break;
- }
- }
- VectorContainer sortedSamples = new VectorContainer();
- builder.build(context, sortedSamples);
- // Sort the records according the orderings given in the configuration
+ outer: while (recordsSampled < recordsToSample) {
+ upstream = next(incoming);
+ switch (upstream) {
+ case NONE:
+ case NOT_YET:
+ case STOP:
+ upstreamNone = true;
+ break outer;
+ default:
+ // fall through
+ }
+ builder.add(incoming);
+ recordsSampled += incoming.getRecordCount();
+ if (upstream == IterOutcome.NONE) {
+ break;
+ }
+ }
+ VectorContainer sortedSamples = new VectorContainer();
+ builder.build(context, sortedSamples);
+
+ // Sort the records according the orderings given in the configuration
+
+ Sorter sorter = SortBatch.createNewSorter(context, popConfig.getOrderings(), sortedSamples);
+ SelectionVector4 sv4 = builder.getSv4();
+ sorter.setup(context, sv4, sortedSamples);
+ sorter.sort(sv4, sortedSamples);
+
+ // Project every Nth record to a new vector container, where N = recordsSampled/(samplingFactor * partitions).
+ // Uses the
+ // the expressions from the Orderings to populate each column. There is one column for each Ordering in
+ // popConfig.orderings.
+
+ List<ValueVector> localAllocationVectors = Lists.newArrayList();
+ SampleCopier copier = getCopier(sv4, sortedSamples, containerToCache, popConfig.getOrderings(), localAllocationVectors);
+ int allocationSize = 50;
+ while (true) {
+ for (ValueVector vv : localAllocationVectors) {
+ AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize);
+ }
+ if (copier.copyRecords(recordsSampled / (samplingFactor * partitions), 0, samplingFactor * partitions)) {
+ break;
+ } else {
+ containerToCache.zeroVectors();
+ allocationSize *= 2;
+ }
+ }
+ for (VectorWrapper<?> vw : containerToCache) {
+ vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
+ }
+ containerToCache.setRecordCount(copier.getOutputRecords());
- Sorter sorter = SortBatch.createNewSorter(context, popConfig.getOrderings(), sortedSamples);
- SelectionVector4 sv4 = builder.getSv4();
- sorter.setup(context, sv4, sortedSamples);
- sorter.sort(sv4, sortedSamples);
+ // Get a distributed multimap handle from the distributed cache, and put the vectors from the new vector container
+ // into a serializable wrapper object, and then add to distributed map
- // Project every Nth record to a new vector container, where N = recordsSampled/(samplingFactor * partitions).
- // Uses the
- // the expressions from the Orderings to populate each column. There is one column for each Ordering in
- // popConfig.orderings.
+ batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), containerToCache, false);
+ sampleToSave = new CachedVectorContainer(batch, context.getAllocator());
- VectorContainer containerToCache = new VectorContainer();
- List<ValueVector> localAllocationVectors = Lists.newArrayList();
- SampleCopier copier = getCopier(sv4, sortedSamples, containerToCache, popConfig.getOrderings(), localAllocationVectors);
- int allocationSize = 50;
- while (true) {
- for (ValueVector vv : localAllocationVectors) {
- AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize);
+ mmap.put(mapKey, sampleToSave);
+ this.sampledIncomingBatches = builder.getHeldRecordBatches();
+ } finally {
+ builder.clear();
+ builder.close();
+ if (batch != null) {
+ batch.clear();
}
- if (copier.copyRecords(recordsSampled / (samplingFactor * partitions), 0, samplingFactor * partitions)) {
- break;
- } else {
- containerToCache.zeroVectors();
- allocationSize *= 2;
+ containerToCache.clear();
+ if (sampleToSave != null) {
+ sampleToSave.clear();
}
}
- for (VectorWrapper<?> vw : containerToCache) {
- vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
- }
- containerToCache.setRecordCount(copier.getOutputRecords());
-
- // Get a distributed multimap handle from the distributed cache, and put the vectors from the new vector container
- // into a serializable wrapper object, and then add to distributed map
-
- WritableBatch batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), containerToCache, false);
- CachedVectorContainer sampleToSave = new CachedVectorContainer(batch, context.getAllocator());
-
- mmap.put(mapKey, sampleToSave);
- this.sampledIncomingBatches = builder.getHeldRecordBatches();
- builder.clear();
- batch.clear();
- containerToCache.clear();
- sampleToSave.clear();
return true;
@@ -335,57 +345,63 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// Get all samples from distributed map
SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES);
- for (CachedVectorContainer w : mmap.get(mapKey)) {
- containerBuilder.add(w.get());
- }
- VectorContainer allSamplesContainer = new VectorContainer();
- containerBuilder.build(context, allSamplesContainer);
-
- List<Ordering> orderDefs = Lists.newArrayList();
- int i = 0;
- for (Ordering od : popConfig.getOrderings()) {
- SchemaPath sp = SchemaPath.getSimplePath("f" + i++);
- orderDefs.add(new Ordering(od.getDirection(), new FieldReference(sp)));
- }
+ final VectorContainer allSamplesContainer = new VectorContainer();
+ final VectorContainer candidatePartitionTable = new VectorContainer();
+ CachedVectorContainer wrap = null;
+ try {
+ for (CachedVectorContainer w : mmap.get(mapKey)) {
+ containerBuilder.add(w.get());
+ }
+ containerBuilder.build(context, allSamplesContainer);
- // sort the data incoming samples.
- SelectionVector4 newSv4 = containerBuilder.getSv4();
- Sorter sorter = SortBatch.createNewSorter(context, orderDefs, allSamplesContainer);
- sorter.setup(context, newSv4, allSamplesContainer);
- sorter.sort(newSv4, allSamplesContainer);
-
- // Copy every Nth record from the samples into a candidate partition table, where N = totalSampledRecords/partitions
- // Attempt to push this to the distributed map. Only the first candidate to get pushed will be used.
- VectorContainer candidatePartitionTable = new VectorContainer();
- SampleCopier copier = null;
- List<ValueVector> localAllocationVectors = Lists.newArrayList();
- copier = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs, localAllocationVectors);
- int allocationSize = 50;
- while (true) {
- for (ValueVector vv : localAllocationVectors) {
- AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize);
+ List<Ordering> orderDefs = Lists.newArrayList();
+ int i = 0;
+ for (Ordering od : popConfig.getOrderings()) {
+ SchemaPath sp = SchemaPath.getSimplePath("f" + i++);
+ orderDefs.add(new Ordering(od.getDirection(), new FieldReference(sp)));
}
- int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions;
- if (copier.copyRecords(skipRecords, skipRecords, partitions - 1)) {
- assert copier.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier.getOutputRecords(), partitions);
- for (VectorWrapper<?> vw : candidatePartitionTable) {
- vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
+
+ // sort the data incoming samples.
+ SelectionVector4 newSv4 = containerBuilder.getSv4();
+ Sorter sorter = SortBatch.createNewSorter(context, orderDefs, allSamplesContainer);
+ sorter.setup(context, newSv4, allSamplesContainer);
+ sorter.sort(newSv4, allSamplesContainer);
+
+ // Copy every Nth record from the samples into a candidate partition table, where N = totalSampledRecords/partitions
+ // Attempt to push this to the distributed map. Only the first candidate to get pushed will be used.
+ SampleCopier copier = null;
+ List<ValueVector> localAllocationVectors = Lists.newArrayList();
+ copier = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs, localAllocationVectors);
+ int allocationSize = 50;
+ while (true) {
+ for (ValueVector vv : localAllocationVectors) {
+ AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize);
}
- break;
- } else {
- candidatePartitionTable.zeroVectors();
- allocationSize *= 2;
+ int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions;
+ if (copier.copyRecords(skipRecords, skipRecords, partitions - 1)) {
+ assert copier.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier.getOutputRecords(), partitions);
+ for (VectorWrapper<?> vw : candidatePartitionTable) {
+ vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
+ }
+ break;
+ } else {
+ candidatePartitionTable.zeroVectors();
+ allocationSize *= 2;
+ }
+ }
+ candidatePartitionTable.setRecordCount(copier.getOutputRecords());
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
+ wrap = new CachedVectorContainer(batch, context.getDrillbitContext().getAllocator());
+ tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
+ } finally {
+ candidatePartitionTable.clear();
+ allSamplesContainer.clear();
+ containerBuilder.clear();
+ containerBuilder.close();
+ if (wrap != null) {
+ wrap.clear();
}
}
- candidatePartitionTable.setRecordCount(copier.getOutputRecords());
- WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
- CachedVectorContainer wrap = new CachedVectorContainer(batch, context.getDrillbitContext().getAllocator());
- tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
-
- candidatePartitionTable.clear();
- allSamplesContainer.clear();
- containerBuilder.clear();
- wrap.clear();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 8748aaf..dea6ba8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -85,6 +85,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
@Override
public void close() {
builder.clear();
+ builder.close();
super.close();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index e559ece..00f1992 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -39,7 +39,7 @@ import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
-public class SortRecordBatchBuilder {
+public class SortRecordBatchBuilder implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class);
private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create();
@@ -50,6 +50,7 @@ public class SortRecordBatchBuilder {
private final long maxBytes;
private SelectionVector4 sv4;
final PreAllocator svAllocator;
+ private boolean svAllocatorUsed = false;
public SortRecordBatchBuilder(BufferAllocator a, long maxBytes) {
this.maxBytes = maxBytes;
@@ -165,6 +166,7 @@ public class SortRecordBatchBuilder {
if (svBuffer == null) {
throw new OutOfMemoryError("Failed to allocate direct memory for SV4 vector in SortRecordBatchBuilder.");
}
+ svAllocatorUsed = true;
sv4 = new SelectionVector4(svBuffer, recordCount, Character.MAX_VALUE);
BatchSchema schema = batches.keySet().iterator().next();
List<RecordBatchData> data = batches.get(schema);
@@ -228,6 +230,17 @@ public class SortRecordBatchBuilder {
}
}
+ @Override
+ public void close() {
+ // Don't leak unused pre-allocated memory.
+ if (!svAllocatorUsed) {
+ final DrillBuf drillBuf = svAllocator.getAllocation();
+ if (drillBuf != null) {
+ drillBuf.release();
+ }
+ }
+ }
+
public List<VectorContainer> getHeldRecordBatches() {
ArrayList<VectorContainer> containerList = Lists.newArrayList();
for (BatchSchema bs : batches.keySet()) {
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 5cdd2bb..612777e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -160,6 +160,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
if (builder != null) {
builder.clear();
+ builder.close();
}
if (sv4 != null) {
sv4.clear();
@@ -346,6 +347,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
Stopwatch watch = new Stopwatch();
watch.start();
+ if (builder != null) {
+ builder.clear();
+ builder.close();
+ }
builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
for (BatchGroup group : batchGroups) {
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 9b97e1c..9acae9e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.impl.xsort;
+import io.netty.buffer.DrillBuf;
+
import java.util.Queue;
import javax.inject.Named;
@@ -53,7 +55,8 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
doSetup(context, hyperBatch, null);
runStarts.add(0);
int batch = 0;
- for (int i = 0; i < this.vector4.getTotalCount(); i++) {
+ final int totalCount = this.vector4.getTotalCount();
+ for (int i = 0; i < totalCount; i++) {
final int newBatch = this.vector4.get(i) >>> 16;
if (newBatch == batch) {
continue;
@@ -64,9 +67,8 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
throw new UnsupportedOperationException("Missing batch");
}
}
- final BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
- preAlloc.preAllocate(4 * this.vector4.getTotalCount());
- aux = new SelectionVector4(preAlloc.getAllocation(), this.vector4.getTotalCount(), Character.MAX_VALUE);
+ final DrillBuf drillBuf = allocator.buffer(4 * totalCount);
+ aux = new SelectionVector4(drillBuf, totalCount, Character.MAX_VALUE);
}
/**
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index f7786b7..facf192 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.impl.xsort;
+import io.netty.buffer.DrillBuf;
+
import java.util.List;
import javax.inject.Named;
@@ -47,9 +49,8 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
this.outgoing = outgoing;
this.size = batchGroups.size();
- BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
- preAlloc.preAllocate(4 * size);
- vector4 = new SelectionVector4(preAlloc.getAllocation(), size, Character.MAX_VALUE);
+ final DrillBuf drillBuf = allocator.buffer(4 * size);
+ vector4 = new SelectionVector4(drillBuf, size, Character.MAX_VALUE);
doSetup(context, hyperBatch, outgoing);
queueSize = 0;
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 308a8bc..324829a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -68,33 +68,36 @@ public class WritableBatch {
}
DrillBuf newBuf = buffers[0].getAllocator().buffer(len);
-
- /* Copy data from each buffer into the compound buffer */
- int offset = 0;
- for (DrillBuf buf : buffers) {
- newBuf.setBytes(offset, buf);
- offset += buf.capacity();
- buf.release();
- }
-
- List<SerializedField> fields = def.getFieldList();
-
- int bufferOffset = 0;
-
- /*
- * For each value vector slice up the appropriate size from the compound buffer and load it into the value vector
- */
- int vectorIndex = 0;
-
- for (VectorWrapper<?> vv : container) {
- SerializedField fmd = fields.get(vectorIndex);
- ValueVector v = vv.getValueVector();
- DrillBuf bb = newBuf.slice(bufferOffset, fmd.getBufferLength());
+ try {
+ /* Copy data from each buffer into the compound buffer */
+ int offset = 0;
+ for (DrillBuf buf : buffers) {
+ newBuf.setBytes(offset, buf);
+ offset += buf.capacity();
+ buf.release();
+ }
+
+ List<SerializedField> fields = def.getFieldList();
+
+ int bufferOffset = 0;
+
+ /*
+ * For each value vector slice up the appropriate size from the compound buffer and load it into the value vector
+ */
+ int vectorIndex = 0;
+
+ for (VectorWrapper<?> vv : container) {
+ SerializedField fmd = fields.get(vectorIndex);
+ ValueVector v = vv.getValueVector();
+ DrillBuf bb = newBuf.slice(bufferOffset, fmd.getBufferLength());
// v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
- v.load(fmd, bb);
- bb.release();
- vectorIndex++;
- bufferOffset += fmd.getBufferLength();
+ v.load(fmd, bb);
+ vectorIndex++;
+ bufferOffset += fmd.getBufferLength();
+ }
+ } finally {
+ // Any vectors that loaded material from newBuf slices above will retain those.
+ newBuf.release(1);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index ae5fad5..f88a7bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -200,6 +200,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
public void transferTo(BitVector target) {
+ target.clear();
target.data = data;
target.data.retain();
target.valueCount = valueCount;
@@ -212,6 +213,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
int byteSize = getSizeFromCount(length);
int offset = startIndex % 8;
if (offset == 0) {
+ target.clear();
// slice
target.data = (DrillBuf) this.data.slice(firstByte, byteSize);
target.data.retain();
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
index 78846dc..3c01939 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java
@@ -242,4 +242,13 @@ public abstract class AbstractMapVector extends AbstractContainerVector {
}
return actualBufSize;
}
+
+ @Override
+ public void close() {
+ for(final ValueVector valueVector : vectors.values()) {
+ valueVector.close();
+ }
+
+ super.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index b615b66..d0f38c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -342,4 +342,14 @@ public class MapVector extends AbstractMapVector {
v.clear();
}
}
+
+ @Override
+ public void close() {
+ for (final ValueVector v : getChildren()) {
+ v.close();
+ }
+ valueCount = 0;
+
+ super.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/da70b63f/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
index 2b1dff0..037c8c6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestValueVector.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.record.vector;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.nio.charset.Charset;
@@ -102,12 +103,10 @@ public class TestValueVector extends ExecTest {
boolean b = false;
try {
v.getAccessor().get(3);
- } catch(AssertionError e) {
+ } catch(IllegalStateException e) {
b = true;
}finally{
- if(!b){
- assert false;
- }
+ assertTrue(b);
}
}
@@ -139,12 +138,10 @@ public class TestValueVector extends ExecTest {
boolean b = false;
try {
v.getAccessor().get(3);
- } catch(AssertionError e) {
+ } catch(IllegalStateException e) {
b = true;
}finally{
- if(!b){
- assert false;
- }
+ assertTrue(b);
}
}
@@ -154,12 +151,10 @@ public class TestValueVector extends ExecTest {
boolean b = false;
try {
v.getAccessor().get(0);
- } catch(AssertionError e) {
+ } catch(IllegalStateException e) {
b = true;
}finally{
- if(!b){
- assert false;
- }
+ assertTrue(b);
}
}
@@ -180,12 +175,10 @@ public class TestValueVector extends ExecTest {
boolean b = false;
try {
v.getAccessor().get(3);
- } catch(AssertionError e) {
+ } catch(IllegalStateException e) {
b = true;
}finally{
- if(!b){
- assert false;
- }
+ assertTrue(b);
}
}
@@ -217,12 +210,10 @@ public class TestValueVector extends ExecTest {
boolean b = false;
try {
v.getAccessor().get(3);
- } catch(AssertionError e) {
+ } catch(IllegalStateException e) {
b = true;
}finally{
- if(!b){
- assert false;
- }
+ assertTrue(b);
}
}
@@ -231,12 +222,10 @@ public class TestValueVector extends ExecTest {
boolean b = false;
try {
v.getAccessor().get(0);
- } catch(AssertionError e) {
+ } catch(IllegalStateException e) {
b = true;
}finally{
- if(!b){
- assert false;
- }
+ assertTrue(b);
}
}
}
[2/3] drill git commit: DRILL-1662: Decrease drillbit kill -9 timeout
in shutdown script to 120 seconds
Posted by ja...@apache.org.
DRILL-1662: Decrease drillbit kill -9 timeout in shutdown script to 120 seconds
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/720db09b
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/720db09b
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/720db09b
Branch: refs/heads/master
Commit: 720db09b899d65781c3cc83f57094d42ed5c6c8a
Parents: da70b63
Author: rinukonda <ri...@maprtech.com>
Authored: Fri May 8 13:42:38 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 12 14:09:00 2015 -0700
----------------------------------------------------------------------
distribution/src/resources/drillbit.sh | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/720db09b/distribution/src/resources/drillbit.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drillbit.sh b/distribution/src/resources/drillbit.sh
index e250c0d..2b324ad 100755
--- a/distribution/src/resources/drillbit.sh
+++ b/distribution/src/resources/drillbit.sh
@@ -28,7 +28,7 @@
# DRILL_IDENT_STRING A string representing this instance of drillbit. $USER by default
# DRILL_NICENESS The scheduling priority for daemons. Defaults to 0.
# DRILL_STOP_TIMEOUT Time, in seconds, after which we kill -9 the server if it has not stopped.
-# Default 1200 seconds.
+# Default 120 seconds.
#
# Modelled after $HADOOP_HOME/bin/hadoop-daemon.sh
@@ -61,8 +61,8 @@ waitForProcessEnd() {
do
echo -n "."
sleep 1;
- # if process persists more than $DRILL_STOP_TIMEOUT (default 1200 sec) no mercy
- if [ $(( `date +%s` - $processedAt )) -gt ${DRILL_STOP_TIMEOUT:-1200} ]; then
+ # if process persists more than $DRILL_STOP_TIMEOUT (default 120 sec) no mercy
+ if [ $(( `date +%s` - $processedAt )) -gt ${DRILL_STOP_TIMEOUT:-120} ]; then
break;
fi
done
[3/3] drill git commit: DRILL-2528 - make sure DrillResultSet is
accurately preserved in drill-jdbc-all
Posted by ja...@apache.org.
DRILL-2528 - make sure DrillResultSet is accurately preserved in drill-jdbc-all
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6d7cda8e
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6d7cda8e
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6d7cda8e
Branch: refs/heads/master
Commit: 6d7cda8eaf9963dbedba560aef5d1786ff2b86d9
Parents: 720db09
Author: Patrick Wong <pw...@maprtech.com>
Authored: Wed May 6 00:49:20 2015 +0000
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue May 12 14:09:34 2015 -0700
----------------------------------------------------------------------
exec/jdbc-all/pom.xml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/6d7cda8e/exec/jdbc-all/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml
index 307006f..6dbf097 100644
--- a/exec/jdbc-all/pom.xml
+++ b/exec/jdbc-all/pom.xml
@@ -221,6 +221,7 @@
<option>-ignorewarnings</option>
<option>-keep class org.apache.drill.exec.proto.** { *; }</option>
<option>-keep class org.apache.drill.common.types.** { *; }</option>
+ <option>-keep class org.apache.drill.jdbc.DrillResultSet { *; }</option>
<option>-keep class org.apache.drill.jdbc.Driver { *; }</option>
<option>-keep class org.apache.drill.jdbc.DrillJdbc40Factory { *; }</option>
<option>-keep class org.apache.drill.jdbc.DrillJdbc41Factory { *; }</option>