You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:51 UTC
[13/27] git commit: fixes for memory management and rpc throttling
fixes for memory management and rpc throttling
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/402be7e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/402be7e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/402be7e0
Branch: refs/heads/master
Commit: 402be7e04e744004beb16e9222cf649f2da6fc93
Parents: 0a2f997
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Aug 15 08:45:20 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 16:59:09 2013 -0700
----------------------------------------------------------------------
.../drill/common/logical/LogicalPlan.java | 6 +-
sandbox/prototype/exec/bufferl/pom.xml | 37 +-
.../main/java/io/netty/buffer/PoolArenaL.java | 3 +-
.../java/io/netty/buffer/PooledByteBufL.java | 3 +-
.../java/io/netty/buffer/PooledHeapBufferL.java | 282 +++++++++
.../buffer/PooledUnsafeDirectByteBufL.java | 3 +-
sandbox/prototype/exec/java-exec/pom.xml | 6 +-
.../templates/FixedValueVectors.java | 1 -
.../templates/NullableValueVectors.java | 6 +-
.../templates/RepeatedValueVectors.java | 8 +-
.../templates/VariableLengthVectors.java | 9 +-
.../apache/drill/exec/client/DrillClient.java | 11 +-
.../apache/drill/exec/opt/BasicOptimizer.java | 37 +-
.../exec/physical/config/MockStorageEngine.java | 52 --
.../drill/exec/physical/impl/ScanBatch.java | 3 +
.../drill/exec/physical/impl/ScreenCreator.java | 23 +-
.../exec/physical/impl/SingleSenderCreator.java | 2 -
.../exec/physical/impl/WireRecordBatch.java | 1 +
.../exec/record/FragmentWritableBatch.java | 6 +-
.../drill/exec/record/RawFragmentBatch.java | 4 +
.../drill/exec/record/VectorContainer.java | 1 +
.../apache/drill/exec/record/WritableBatch.java | 1 -
.../exec/rpc/AbstractHandshakeHandler.java | 2 +
.../drill/exec/rpc/CoordinationQueue.java | 10 +-
.../drill/exec/rpc/DrillRpcFutureImpl.java | 1 -
.../apache/drill/exec/rpc/RemoteConnection.java | 22 +-
.../java/org/apache/drill/exec/rpc/RpcBus.java | 12 +-
.../drill/exec/rpc/user/QueryResultBatch.java | 6 +-
.../drill/exec/store/AbstractStorageEngine.java | 6 +-
.../drill/exec/store/StorageEngineRegistry.java | 2 +-
.../exec/store/mock/MockStorageEngine.java | 51 ++
.../exec/store/parquet/ParquetRecordReader.java | 7 +-
.../store/parquet/ParquetStorageEngine.java | 5 +-
.../exec/work/RemoteFragmentRunnerListener.java | 2 +-
.../org/apache/drill/exec/work/WorkManager.java | 2 +-
.../work/batch/AbstractFragmentCollector.java | 2 +-
.../work/batch/UnlimitedRawBatchBuffer.java | 73 +++
.../exec/work/batch/UnlmitedRawBatchBuffer.java | 73 ---
.../physical/impl/TestSimpleFragmentRun.java | 2 +-
.../apache/drill/exec/store/MockScantTest.java | 115 ----
.../exec/store/ParquetRecordReaderTest.java | 594 -------------------
.../store/parquet/ParquetRecordReaderTest.java | 347 +++++++++++
.../exec/store/parquet/TestFileGenerator.java | 210 +++++++
.../src/test/resources/scan_screen_logical.json | 8 +-
.../exec/java-exec/src/test/sh/logback.xml | 35 ++
.../prototype/exec/java-exec/src/test/sh/runbit | 4 +-
sandbox/prototype/pom.xml | 10 +
47 files changed, 1168 insertions(+), 938 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
index 742001a..6692661 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
@@ -17,8 +17,8 @@
******************************************************************************/
package org.apache.drill.common.logical;
-import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,8 +38,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
@JsonPropertyOrder({ "head", "storage", "query" })
public class LogicalPlan {
@@ -53,7 +51,7 @@ public class LogicalPlan {
public LogicalPlan(@JsonProperty("head") PlanProperties head,
@JsonProperty("storage") Map<String, StorageEngineConfig> storageEngineMap,
@JsonProperty("query") List<LogicalOperator> operators) {
- this.storageEngineMap = storageEngineMap;
+ this.storageEngineMap = storageEngineMap != null ? storageEngineMap : new HashMap<String, StorageEngineConfig>();
this.properties = head;
this.graph = Graph.newGraph(operators, SinkOperator.class, SourceOperator.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/bufferl/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/pom.xml b/sandbox/prototype/exec/bufferl/pom.xml
index baa2c3d..11eef91 100644
--- a/sandbox/prototype/exec/bufferl/pom.xml
+++ b/sandbox/prototype/exec/bufferl/pom.xml
@@ -1,25 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Copyright 2012 The Netty Project
- ~
- ~ The Netty Project licenses this file to you under the Apache License,
- ~ version 2.0 (the "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at:
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- ~ License for the specific language governing permissions and limitations
- ~ under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<!-- ~ Copyright 2012 The Netty Project ~ ~ The Netty Project licenses this
+ file to you under the Apache License, ~ version 2.0 (the "License"); you
+ may not use this file except in compliance ~ with the License. You may obtain
+ a copy of the License at: ~ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ ~ Unless required by applicable law or agreed to in writing, software ~
+ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ ~ License for the specific language governing permissions and limitations
+ ~ under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.drill.exec</groupId>
+ <artifactId>exec-parent</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ </parent>
+
<groupId>org.apache.drill.exec</groupId>
- <version>4.0.3.Final</version>
+ <version>4.0.7.Final</version>
<artifactId>netty-bufferl</artifactId>
<name>Netty/Drill/Buffer</name>
@@ -31,5 +32,5 @@
<version>${project.version}</version>
</dependency>
</dependencies>
-
+
</project>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
index db9818d..12fd1ae 100644
--- a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
@@ -354,8 +354,7 @@ abstract class PoolArenaL<T> {
@Override
protected PooledByteBufL<byte[]> newByteBuf(int maxCapacity) {
- throw new UnsupportedOperationException();
-// return PooledHeapByteBufL.newInstance(maxCapacity);
+ return PooledHeapByteBufL.newInstance(maxCapacity);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
index c25c2e9..ded7c62 100644
--- a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
@@ -148,7 +148,7 @@ abstract class PooledByteBufL<T> extends AbstractReferenceCountedByteBuf {
this.handle = -1;
memory = null;
chunk.arena.free(chunk, handle);
- if (ResourceLeakDetector.ENABLED) {
+ if (leak != null) {
leak.close();
} else {
recycle();
@@ -160,7 +160,6 @@ abstract class PooledByteBufL<T> extends AbstractReferenceCountedByteBuf {
private void recycle() {
Recycler.Handle recyclerHandle = this.recyclerHandle;
if (recyclerHandle != null) {
- setRefCnt(1);
((Recycler<Object>) recycler()).recycle(this, recyclerHandle);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledHeapBufferL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledHeapBufferL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledHeapBufferL.java
new file mode 100644
index 0000000..70b517c
--- /dev/null
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledHeapBufferL.java
@@ -0,0 +1,282 @@
+package io.netty.buffer;
+
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file tothe License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import io.netty.util.Recycler;
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+final class PooledHeapByteBufL extends PooledByteBufL<byte[]> {
+
+ private static final Recycler<PooledHeapByteBufL> RECYCLER = new Recycler<PooledHeapByteBufL>() {
+ @Override
+ protected PooledHeapByteBufL newObject(Handle handle) {
+ return new PooledHeapByteBufL(handle, 0);
+ }
+ };
+
+ static PooledHeapByteBufL newInstance(int maxCapacity) {
+ PooledHeapByteBufL buf = RECYCLER.get();
+ buf.setRefCnt(1);
+ buf.maxCapacity(maxCapacity);
+ return buf;
+ }
+
+ private PooledHeapByteBufL(Recycler.Handle recyclerHandle, int maxCapacity) {
+ super(recyclerHandle, maxCapacity);
+ }
+
+ @Override
+ public boolean isDirect() {
+ return false;
+ }
+
+ @Override
+ protected byte _getByte(int index) {
+ return memory[idx(index)];
+ }
+
+ @Override
+ protected short _getShort(int index) {
+ index = idx(index);
+ return (short) (memory[index] << 8 | memory[index + 1] & 0xFF);
+ }
+
+ @Override
+ protected int _getUnsignedMedium(int index) {
+ index = idx(index);
+ return (memory[index] & 0xff) << 16 |
+ (memory[index + 1] & 0xff) << 8 |
+ memory[index + 2] & 0xff;
+ }
+
+ @Override
+ protected int _getInt(int index) {
+ index = idx(index);
+ return (memory[index] & 0xff) << 24 |
+ (memory[index + 1] & 0xff) << 16 |
+ (memory[index + 2] & 0xff) << 8 |
+ memory[index + 3] & 0xff;
+ }
+
+ @Override
+ protected long _getLong(int index) {
+ index = idx(index);
+ return ((long) memory[index] & 0xff) << 56 |
+ ((long) memory[index + 1] & 0xff) << 48 |
+ ((long) memory[index + 2] & 0xff) << 40 |
+ ((long) memory[index + 3] & 0xff) << 32 |
+ ((long) memory[index + 4] & 0xff) << 24 |
+ ((long) memory[index + 5] & 0xff) << 16 |
+ ((long) memory[index + 6] & 0xff) << 8 |
+ (long) memory[index + 7] & 0xff;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+ checkDstIndex(index, length, dstIndex, dst.capacity());
+ if (dst.hasMemoryAddress()) {
+ PlatformDependent.copyMemory(memory, idx(index), dst.memoryAddress() + dstIndex, length);
+ } else if (dst.hasArray()) {
+ getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
+ } else {
+ dst.setBytes(dstIndex, memory, idx(index), length);
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+ checkDstIndex(index, length, dstIndex, dst.length);
+ System.arraycopy(memory, idx(index), dst, dstIndex, length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, ByteBuffer dst) {
+ checkIndex(index);
+ dst.put(memory, idx(index), Math.min(capacity() - index, dst.remaining()));
+ return this;
+ }
+
+ @Override
+ public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+ checkIndex(index, length);
+ out.write(memory, idx(index), length);
+ return this;
+ }
+
+ @Override
+ public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+ checkIndex(index, length);
+ index = idx(index);
+ return out.write((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
+ }
+
+ @Override
+ protected void _setByte(int index, int value) {
+ memory[idx(index)] = (byte) value;
+ }
+
+ @Override
+ protected void _setShort(int index, int value) {
+ index = idx(index);
+ memory[index] = (byte) (value >>> 8);
+ memory[index + 1] = (byte) value;
+ }
+
+ @Override
+ protected void _setMedium(int index, int value) {
+ index = idx(index);
+ memory[index] = (byte) (value >>> 16);
+ memory[index + 1] = (byte) (value >>> 8);
+ memory[index + 2] = (byte) value;
+ }
+
+ @Override
+ protected void _setInt(int index, int value) {
+ index = idx(index);
+ memory[index] = (byte) (value >>> 24);
+ memory[index + 1] = (byte) (value >>> 16);
+ memory[index + 2] = (byte) (value >>> 8);
+ memory[index + 3] = (byte) value;
+ }
+
+ @Override
+ protected void _setLong(int index, long value) {
+ index = idx(index);
+ memory[index] = (byte) (value >>> 56);
+ memory[index + 1] = (byte) (value >>> 48);
+ memory[index + 2] = (byte) (value >>> 40);
+ memory[index + 3] = (byte) (value >>> 32);
+ memory[index + 4] = (byte) (value >>> 24);
+ memory[index + 5] = (byte) (value >>> 16);
+ memory[index + 6] = (byte) (value >>> 8);
+ memory[index + 7] = (byte) value;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+ checkSrcIndex(index, length, srcIndex, src.capacity());
+ if (src.hasMemoryAddress()) {
+ PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, memory, idx(index), length);
+ } else if (src.hasArray()) {
+ setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
+ } else {
+ src.getBytes(srcIndex, memory, idx(index), length);
+ }
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+ checkSrcIndex(index, length, srcIndex, src.length);
+ System.arraycopy(src, srcIndex, memory, idx(index), length);
+ return this;
+ }
+
+ @Override
+ public ByteBuf setBytes(int index, ByteBuffer src) {
+ int length = src.remaining();
+ checkIndex(index, length);
+ src.get(memory, idx(index), length);
+ return this;
+ }
+
+ @Override
+ public int setBytes(int index, InputStream in, int length) throws IOException {
+ checkIndex(index, length);
+ return in.read(memory, idx(index), length);
+ }
+
+ @Override
+ public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+ checkIndex(index, length);
+ index = idx(index);
+ try {
+ return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
+ } catch (ClosedChannelException e) {
+ return -1;
+ }
+ }
+
+ @Override
+ public ByteBuf copy(int index, int length) {
+ checkIndex(index, length);
+ ByteBuf copy = alloc().heapBuffer(length, maxCapacity());
+ copy.writeBytes(memory, idx(index), length);
+ return copy;
+ }
+
+ @Override
+ public int nioBufferCount() {
+ return 1;
+ }
+
+ @Override
+ public ByteBuffer[] nioBuffers(int index, int length) {
+ return new ByteBuffer[] { nioBuffer(index, length) };
+ }
+
+ @Override
+ public ByteBuffer internalNioBuffer(int index, int length) {
+ checkIndex(index, length);
+ index = idx(index);
+ return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length);
+ }
+
+ @Override
+ public boolean hasArray() {
+ return true;
+ }
+
+ @Override
+ public byte[] array() {
+ return memory;
+ }
+
+ @Override
+ public int arrayOffset() {
+ return offset;
+ }
+
+ @Override
+ public boolean hasMemoryAddress() {
+ return false;
+ }
+
+ @Override
+ public long memoryAddress() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected ByteBuffer newInternalNioBuffer(byte[] memory) {
+ return ByteBuffer.wrap(memory);
+ }
+
+ @Override
+ protected Recycler<?> recycler() {
+ return RECYCLER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
index 949f9fb..99daf62 100644
--- a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
@@ -30,7 +30,7 @@ import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
final class PooledUnsafeDirectByteBufL extends PooledByteBufL<ByteBuffer> {
-
+
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private static final Recycler<PooledUnsafeDirectByteBufL> RECYCLER = new Recycler<PooledUnsafeDirectByteBufL>() {
@@ -42,6 +42,7 @@ final class PooledUnsafeDirectByteBufL extends PooledByteBufL<ByteBuffer> {
static PooledUnsafeDirectByteBufL newInstance(int maxCapacity) {
PooledUnsafeDirectByteBufL buf = RECYCLER.get();
+ buf.setRefCnt(1);
buf.maxCapacity(maxCapacity);
return buf;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index cd9bc9a..a2e8501 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -65,7 +65,7 @@
</dependency>
<dependency>
<groupId>org.apache.drill.exec</groupId>
- <version>4.0.3.Final</version>
+ <version>4.0.7.Final</version>
<artifactId>netty-bufferl</artifactId>
</dependency>
<dependency>
@@ -166,8 +166,8 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>4.0.3.Final</version>
+ <artifactId>netty-handler</artifactId>
+ <version>4.0.7.Final</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
index 147762e..311e715 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
@@ -61,7 +61,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
public void allocateNew(int valueCount) {
clear();
this.data = allocator.buffer(valueCount * ${type.width});
- this.data.retain();
this.data.readerIndex(0);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
index 483166b..ec7af46 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
@@ -10,6 +10,7 @@ import java.lang.UnsupportedOperationException;
package org.apache.drill.exec.vector;
+
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import io.netty.buffer.ByteBuf;
@@ -30,6 +31,7 @@ import org.apache.drill.exec.vector.UInt2Vector;
import org.apache.drill.exec.vector.UInt4Vector;
import com.google.common.collect.Lists;
+import com.google.common.collect.ObjectArrays;
/**
* Nullable${minor.class} implements a vector of values which could be null. Elements in the vector
@@ -59,7 +61,9 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
@Override
public ByteBuf[] getBuffers() {
- return ArrayUtils.addAll(bits.getBuffers(), values.getBuffers());
+ ByteBuf[] buffers = ObjectArrays.concat(bits.getBuffers(), values.getBuffers(), ByteBuf.class);
+ clear();
+ return buffers;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
index c1660e8..44d036b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
@@ -24,8 +24,11 @@ import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.record.DeadBuf;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
+
import java.util.List;
+
import com.google.common.collect.Lists;
+import com.google.common.collect.ObjectArrays;
@SuppressWarnings("unused")
/**
@@ -176,8 +179,11 @@ import com.google.common.collect.Lists;
}
</#if>
+ @Override
public ByteBuf[] getBuffers() {
- return ArrayUtils.addAll(offsets.getBuffers(), values.getBuffers());
+ ByteBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(), values.getBuffers(), ByteBuf.class);
+ clear();
+ return buffers;
}
public void clear(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
index 7ceafe4..3be6dc2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
import com.google.common.base.Charsets;
+import com.google.common.collect.ObjectArrays;
/**
@@ -74,7 +75,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
@Override
public FieldMetadata getMetadata() {
- int len = valueCount * ${type.width} + getVarByteLength();
+ int len = (valueCount+1) * ${type.width} + getVarByteLength();
return FieldMetadata.newBuilder()
.setDef(getField().getDef())
.setValueCount(valueCount)
@@ -104,9 +105,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
offsetVector.clear();
}
+
@Override
public ByteBuf[] getBuffers() {
- return ArrayUtils.addAll(offsetVector.getBuffers(), super.getBuffers());
+ ByteBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(), super.getBuffers(), ByteBuf.class);
+ clear();
+ return buffers;
}
public TransferPair getTransferPair(){
@@ -151,7 +155,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
clear();
assert totalBytes >= 0;
data = allocator.buffer(totalBytes);
- data.retain();
data.readerIndex(0);
offsetVector.allocateNew(valueCount+1);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 1b49d54..cf99abd 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -34,6 +34,7 @@ import java.util.Vector;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
+import org.apache.drill.exec.memory.DirectBufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserProtos.QueryType;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
@@ -58,6 +59,7 @@ public class DrillClient implements Closeable{
private UserClient client;
private volatile ClusterCoordinator clusterCoordinator;
private volatile boolean connected = false;
+ private final DirectBufferAllocator allocator = new DirectBufferAllocator();
public DrillClient() {
this(DrillConfig.create());
@@ -99,8 +101,7 @@ public class DrillClient implements Closeable{
checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
// just use the first endpoint for now
DrillbitEndpoint endpoint = endpoints.iterator().next();
- ByteBufAllocator bb = new PooledByteBufAllocatorL(true);
- this.client = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
+ this.client = new UserClient(allocator.getUnderlyingAllocator(), new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
try {
logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
FutureHandler f = new FutureHandler();
@@ -112,6 +113,12 @@ public class DrillClient implements Closeable{
}
}
+
+
+ public DirectBufferAllocator getAllocator() {
+ return allocator;
+ }
+
/**
* Closes this client's connection to the server
*
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index c4a7e43..b5eea03 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -13,6 +13,7 @@ import org.apache.drill.common.expression.FunctionDefinition;
import org.apache.drill.common.expression.NoArgValidator;
import org.apache.drill.common.expression.OutputTypeDeterminer;
import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.StorageEngineConfig;
import org.apache.drill.common.logical.data.Filter;
import org.apache.drill.common.logical.data.Project;
import org.apache.drill.common.logical.data.Scan;
@@ -27,7 +28,10 @@ import org.apache.drill.exec.exception.SetupException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.*;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.store.StorageEngine;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -99,32 +103,15 @@ public class BasicOptimizer extends Optimizer{
@Override
public PhysicalOperator visitScan(Scan scan, Object obj) throws OptimizerException {
- List<MockGroupScanPOP.MockScanEntry> myObjects;
-
+ StorageEngineConfig config = logicalPlan.getStorageEngineConfig(scan.getStorageEngine());
+ if(config == null) throw new OptimizerException(String.format("Logical plan referenced the storage engine config %s but the logical plan didn't have that available as a config.", scan.getStorageEngine()));
+ StorageEngine engine;
try {
- if (scan.getStorageEngine().equals("parquet")) {
- return context.getStorageEngine(logicalPlan.getStorageEngineConfig(scan.getStorageEngine())).getPhysicalScan(scan);
- }
- if (scan.getStorageEngine().equals("local-logs")) {
- myObjects = scan.getSelection().getListWith(config,
- new TypeReference<ArrayList<MockGroupScanPOP.MockScanEntry>>() {
- });
- } else {
- myObjects = new ArrayList<>();
- MockGroupScanPOP.MockColumn[] cols = {
- new MockGroupScanPOP.MockColumn("blah", MinorType.INT, DataMode.REQUIRED, 4, 4, 4),
- new MockGroupScanPOP.MockColumn("blah_2", MinorType.INT, DataMode.REQUIRED, 4, 4, 4) };
- myObjects.add(new MockGroupScanPOP.MockScanEntry(50, cols));
- }
- } catch (IOException e) {
- throw new OptimizerException(
- "Error reading selection attribute of GroupScan node in Logical to Physical plan conversion.", e);
- } catch (SetupException e) {
- throw new OptimizerException(
- "Storage engine not found: " + scan.getStorageEngine(), e);
+ engine = context.getStorageEngine(config);
+ return engine.getPhysicalScan(scan);
+ } catch (SetupException | IOException e) {
+ throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
}
-
- return new MockGroupScanPOP("http://apache.org", myObjects);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
deleted file mode 100644
index 6348686..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.AbstractStorageEngine;
-import org.apache.drill.exec.store.RecordReader;
-
-import com.google.common.collect.ListMultimap;
-
-public class MockStorageEngine extends AbstractStorageEngine{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
-
- @Override
- public boolean supportsRead() {
- return true;
- }
-
- @Override
- public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
- return null;
- }
-
- @Override
- public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException {
- return null;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 4227450..ea98c29 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -162,6 +162,9 @@ public class ScanBatch implements RecordBatch {
@Override
public void removeAllFields() {
+ for(VectorWrapper<?> vw : holder){
+ vw.release();
+ }
holder.clear();
fieldVectorMap.clear();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 5c5e2e5..9b31407 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -17,6 +17,8 @@
******************************************************************************/
package org.apache.drill.exec.physical.impl;
+import io.netty.buffer.ByteBuf;
+
import java.util.List;
import org.apache.drill.exec.ops.FragmentContext;
@@ -76,16 +78,16 @@ public class ScreenCreator implements RootCreator<Screen>{
logger.debug("Screen Outcome {}", outcome);
switch(outcome){
case STOP: {
- QueryResult header1 = QueryResult.newBuilder() //
+ QueryResult header = QueryResult.newBuilder() //
.setQueryId(context.getHandle().getQueryId()) //
.setRowCount(0) //
.addError(ErrorHelper.logAndConvertError(context.getIdentity(), "Screen received stop request sent.", context.getFailureCause(), logger))
.setDef(RecordBatchDef.getDefaultInstance()) //
.setIsLastChunk(true) //
.build();
- QueryWritableBatch batch1 = new QueryWritableBatch(header1);
+ QueryWritableBatch batch = new QueryWritableBatch(header);
+ connection.sendResult(listener, batch);
- connection.sendResult(listener, batch1);
return false;
}
case NONE: {
@@ -93,16 +95,18 @@ public class ScreenCreator implements RootCreator<Screen>{
// receive no results.
context.batchesCompleted.inc(1);
context.recordsCompleted.inc(incoming.getRecordCount());
- QueryResult header2 = QueryResult.newBuilder() //
+ QueryResult header = QueryResult.newBuilder() //
.setQueryId(context.getHandle().getQueryId()) //
.setRowCount(0) //
.setDef(RecordBatchDef.getDefaultInstance()) //
.setIsLastChunk(true) //
.build();
- QueryWritableBatch batch2 = new QueryWritableBatch(header2);
- connection.sendResult(listener, batch2);
+ QueryWritableBatch batch = new QueryWritableBatch(header);
+ connection.sendResult(listener, batch);
+
}else{
- connection.sendResult(listener, materializer.convertNext(true));
+ QueryWritableBatch batch = materializer.convertNext(true);
+ connection.sendResult(listener, batch);
}
return false;
}
@@ -112,7 +116,8 @@ public class ScreenCreator implements RootCreator<Screen>{
case OK:
context.batchesCompleted.inc(1);
context.recordsCompleted.inc(incoming.getRecordCount());
- connection.sendResult(listener, materializer.convertNext(false));
+ QueryWritableBatch batch = materializer.convertNext(false);
+ connection.sendResult(listener, batch);
return true;
default:
throw new UnsupportedOperationException();
@@ -128,6 +133,8 @@ public class ScreenCreator implements RootCreator<Screen>{
private class SendListener extends BaseRpcOutcomeListener<Ack>{
+
+
@Override
public void failed(RpcException ex) {
logger.error("Failure while sending data to user.", ex);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index a40031e..69455a1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -75,14 +75,12 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
case NONE:
FragmentWritableBatch b2 = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
tunnel.sendRecordBatch(new RecordSendFailure(), context, b2);
- b2.release();
return false;
case OK_NEW_SCHEMA:
case OK:
FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
tunnel.sendRecordBatch(new RecordSendFailure(), context, batch);
- batch.release();
return true;
case NOT_YET:
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index 93f643d..c128504 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -105,6 +105,7 @@ public class WireRecordBatch implements RecordBatch{
RecordBatchDef rbd = batch.getHeader().getDef();
boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
+ batch.release();
if(schemaChanged){
this.schema = batchLoader.getSchema();
return IterOutcome.OK_NEW_SCHEMA;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index 964ef5c..3c25204 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -55,11 +55,7 @@ public class FragmentWritableBatch{
return header;
}
- public void release(){
- for(ByteBuf b : buffers){
- b.release();
- }
- }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index 164bf59..217d34d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -46,5 +46,9 @@ public class RawFragmentBatch {
public String toString() {
return "RawFragmentBatch [header=" + header + ", body=" + body + "]";
}
+
+ public void release(){
+ if(body != null) body.release();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 923fbd5..57aad79 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -70,6 +70,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> {
for (Iterator<VectorWrapper<?>> iter = wrappers.iterator(); iter.hasNext();) {
VectorWrapper<?> w = iter.next();
if (!w.isHyper() && v == w.getValueVector()) {
+ w.release();
iter.remove();
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index cac042b..eb9c2c7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -79,7 +79,6 @@ public class WritableBatch {
for (ByteBuf b : vv.getBuffers()) {
buffers.add(b);
- b.retain();
}
// remove vv access to buffers.
vv.clear();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
index 112b537..e5f2cc3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
@@ -51,6 +51,8 @@ public abstract class AbstractHandshakeHandler<T extends MessageLite> extends Me
T msg = parser.parseFrom(inbound.getProtobufBodyAsIS());
consumeHandshake(ctx, msg);
+ inbound.pBody.release();
+ if(inbound.dBody != null) inbound.dBody.release();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index f36530f..4b7d611 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -52,9 +52,9 @@ public class CoordinationQueue {
}
}
- public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz){
+ public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz, RemoteConnection connection){
int i = circularInt.getNext();
- RpcListener<V> future = new RpcListener<V>(handler, clazz, i);
+ RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection);
Object old = map.put(i, future);
if (old != null)
throw new IllegalStateException(
@@ -66,17 +66,19 @@ public class CoordinationQueue {
final RpcOutcomeListener<T> handler;
final Class<T> clazz;
final int coordinationId;
+ final RemoteConnection connection;
- public RpcListener(RpcOutcomeListener<T> handler, Class<T> clazz, int coordinationId) {
+ public RpcListener(RpcOutcomeListener<T> handler, Class<T> clazz, int coordinationId, RemoteConnection connection) {
super();
this.handler = handler;
this.clazz = clazz;
this.coordinationId = coordinationId;
+ this.connection = connection;
}
@Override
public void operationComplete(ChannelFuture future) throws Exception {
-
+ connection.releasePermit();
if(!future.isSuccess()){
removeFromMap(coordinationId);
future.get();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
index 3b2452c..6a9d8c7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -67,7 +67,6 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
@Override
public void success(V value, ByteBuf buffer) {
this.buffer = buffer;
- if(buffer != null) buffer.retain();
( (InnerFuture<V>)delegate()).setValue(value);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index cedba10..35b5938 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -19,12 +19,32 @@ package org.apache.drill.exec.rpc;
import io.netty.channel.Channel;
+import java.util.concurrent.Semaphore;
+
public class RemoteConnection{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
private final Channel channel;
- public RemoteConnection(Channel channel) {
+ final Semaphore throttle;
+
+ public void acquirePermit() throws InterruptedException{
+ if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Acquiring send permit.");
+ this.throttle.acquire();
+ if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Send permit acquired.");
+ }
+
+ public void releasePermit() {
+ throttle.release();
+ }
+
+ public RemoteConnection(Channel channel, int maxOutstanding) {
super();
this.channel = channel;
+ this.throttle = new Semaphore(maxOutstanding);
+ }
+
+ public RemoteConnection(Channel channel) {
+ this(channel, 100);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index b08aa96..4e672b7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -82,11 +82,15 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
try {
Preconditions.checkNotNull(protobufBody);
- ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz);
+ ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz, connection);
OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(), protobufBody, dataBodies);
+ connection.acquirePermit();
ChannelFuture channelFuture = connection.getChannel().writeAndFlush(m);
channelFuture.addListener(futureListener);
completed = true;
+ } catch (InterruptedException e) {
+ completed = true;
+ listener.failed(new RpcException("Interrupted while attempting to acquire outbound queue.", e));
} finally {
if (!completed) {
if (pBuffer != null) pBuffer.release();
@@ -136,8 +140,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
case REQUEST:
// handle message and ack.
Response r = handle(connection, msg.rpcType, msg.pBody, msg.dBody);
- msg.pBody.release();
- if(msg.dBody != null) msg.dBody.release(); // we release our ownership. Handle could have taken over ownership.
+ msg.release(); // we release our ownership. Handle could have taken over ownership.
assert rpcConfig.checkResponseSend(r.rpcType, r.pBody.getClass());
OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, msg.coordinationId,
r.pBody, r.dBodies);
@@ -152,9 +155,8 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
Parser<?> parser = m.getParserForType();
Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
- msg.pBody.release();
rpcFuture.set(value, msg.dBody);
- if(msg.dBody != null) msg.dBody.release();
+ msg.release(); // we release our ownership. Handle could have taken over ownership.
if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
}catch(Exception ex){
logger.error("Failure while handling response.", ex);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
index 48b0dae..c3aad22 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
@@ -31,7 +31,7 @@ public class QueryResultBatch {
// logger.debug("New Result Batch with header {} and data {}", header, data);
this.header = header;
this.data = data;
- if(data != null) data.retain();
+ if(this.data != null) data.retain();
}
public QueryResult getHeader() {
@@ -47,6 +47,10 @@ public class QueryResultBatch {
return data != null;
}
+ public void release(){
+ if(data != null) data.release();
+ }
+
@Override
public String toString() {
return "QueryResultBatch [header=" + header + ", data=" + data + "]";
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
index 9c48052..d854789 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
@@ -31,9 +31,13 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimap;
-public class AbstractStorageEngine implements StorageEngine{
+public abstract class AbstractStorageEngine implements StorageEngine{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStorageEngine.class);
+ protected AbstractStorageEngine(){
+ }
+
+
@Override
public boolean supportsRead() {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
index 26504a2..4551c1f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
@@ -51,7 +51,7 @@ public class StorageEngineRegistry {
for(Constructor<?> c : engine.getConstructors()){
Class<?>[] params = c.getParameterTypes();
if(params.length != 2 || params[1] != DrillbitContext.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
- logger.debug("Skipping StorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillbitContext)]", c, engine);
+ logger.info("Skipping StorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillbitContext)]", c, engine);
continue;
}
availableEngines.put(params[0], (Constructor<? extends StorageEngine>) c);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
new file mode 100644
index 0000000..23ac2b8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -0,0 +1,51 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStorageEngine;
+import org.apache.drill.storage.MockStorageEngineConfig;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class MockStorageEngine extends AbstractStorageEngine {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+
+ public MockStorageEngine(MockStorageEngineConfig configuration, DrillbitContext context) {
+
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(Scan scan) throws IOException {
+
+ ArrayList<MockScanEntry> readEntries = scan.getSelection().getListWith(new ObjectMapper(),
+ new TypeReference<ArrayList<MockScanEntry>>() {
+ });
+
+ return new MockGroupScanPOP(null, readEntries);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 2ad7b44..2d36a08 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -60,8 +60,8 @@ public class ParquetRecordReader implements RecordReader {
private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
// TODO - should probably find a smarter way to set this, currently 2 megabytes
- private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 2;
- public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 5;
+ private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1;
+ public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;
private static final String SEPERATOR = System.getProperty("file.separator");
@@ -398,6 +398,7 @@ public class ParquetRecordReader implements RecordReader {
@Override
public void cleanup() {
columnStatuses.clear();
- bufferWithAllData.clear();
+ this.varLengthReader.columns.clear();
+ bufferWithAllData.release();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
index f070f0f..cf5b5d8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
@@ -24,21 +24,24 @@ import java.util.Collection;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.data.Scan;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.ReadEntry;
import org.apache.drill.exec.physical.ReadEntryWithPath;
-import org.apache.drill.exec.physical.config.MockStorageEngine;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStorageEngine;
import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.mock.MockStorageEngine;
import com.google.common.collect.ListMultimap;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
+
import parquet.format.converter.ParquetMetadataConverter;
import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.ParquetFileReader;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
index 93f1af7..4ecbd0e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
@@ -38,7 +38,7 @@ public class RemoteFragmentRunnerListener extends AbstractFragmentRunnerListener
@Override
protected void statusChange(FragmentHandle handle, FragmentStatus status) {
- logger.debug("Sending remote failure.");
+ logger.debug("Sending remote status message. {}", status);
tunnel.sendFragmentStatus(status);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 1170a1e..fa51b09 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -63,7 +63,7 @@ public class WorkManager implements Closeable{
private final BitComHandler bitComWorker;
private final UserWorker userWorker;
private final WorkerBee bee;
- private Executor executor = Executors.newFixedThreadPool(4, new NamedThreadFactory("Working Thread - "));
+ private Executor executor = Executors.newFixedThreadPool(4, new NamedThreadFactory("WorkManager-"));
private final EventThread eventThread;
public WorkManager(BootStrapContext context){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
index dd55377..9e420c1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
@@ -49,7 +49,7 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
this.buffers = new RawBatchBuffer[minInputsRequired];
for(int i = 0; i < buffers.length; i++){
- buffers[i] = new UnlmitedRawBatchBuffer();
+ buffers[i] = new UnlimitedRawBatchBuffer();
}
if (receiver.supportsOutOfOrderExchange()) {
this.remainingRequired = new AtomicInteger(1);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
new file mode 100644
index 0000000..faff6c7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+
+import com.google.common.collect.Queues;
+
+public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
+
+ private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();
+ private volatile boolean finished = false;
+
+ @Override
+ public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) {
+ buffer.add(batch);
+ }
+
+// @Override
+// public RawFragmentBatch dequeue() {
+// return buffer.poll();
+// }
+
+ @Override
+ public void kill(FragmentContext context) {
+ // TODO: Pass back or kill handler?
+ }
+
+
+ @Override
+ public void finished() {
+ finished = true;
+ }
+
+ @Override
+ public RawFragmentBatch getNext(){
+
+ RawFragmentBatch b = buffer.poll();
+ if(b == null && !finished){
+ try {
+ return buffer.take();
+ } catch (InterruptedException e) {
+ return null;
+ }
+ }
+
+ return b;
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
deleted file mode 100644
index 71ae576..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.work.batch;
-
-import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
-
-import com.google.common.collect.Queues;
-
-public class UnlmitedRawBatchBuffer implements RawBatchBuffer{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlmitedRawBatchBuffer.class);
-
- private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();
- private volatile boolean finished = false;
-
- @Override
- public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) {
- buffer.add(batch);
- }
-
-// @Override
-// public RawFragmentBatch dequeue() {
-// return buffer.poll();
-// }
-
- @Override
- public void kill(FragmentContext context) {
- // TODO: Pass back or kill handler?
- }
-
-
- @Override
- public void finished() {
- finished = true;
- }
-
- @Override
- public RawFragmentBatch getNext(){
-
- RawFragmentBatch b = buffer.poll();
- if(b == null && !finished){
- try {
- return buffer.take();
- } catch (InterruptedException e) {
- return null;
- }
- }
-
- return b;
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index db6c437..6587237 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -62,7 +62,7 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_test2.json"), Charsets.UTF_8));
// look at records
- RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+ RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator());
int recordCount = 0;
for (QueryResultBatch batch : results) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java
deleted file mode 100644
index f9a1ecb..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.store;
-
-import com.beust.jcommander.internal.Lists;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.util.FileUtils;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.proto.UserProtos;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
-import org.apache.drill.exec.rpc.user.UserResultsListener;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.store.parquet.ParquetStorageEngine;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-import parquet.bytes.BytesInput;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.Page;
-import parquet.column.page.PageReadStore;
-import parquet.column.page.PageReader;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileWriter;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.*;
-import static parquet.column.Encoding.PLAIN;
-
-
-public class MockScantTest {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StorageEngineRegistry.class);
-
- private boolean VERBOSE_DEBUG = false;
-
- private class ParquetResultListener implements UserResultsListener {
-
- CountDownLatch latch = new CountDownLatch(1);
- @Override
- public void submissionFailed(RpcException ex) {
- latch.countDown();
- }
-
- @Override
- public void resultArrived(QueryResultBatch result) {
- if(result.getHeader().getIsLastChunk()) latch.countDown();
- result.getData().release(1);
- }
-
- public void await() throws Exception {
- latch.await();
- }
- }
-
-
- @Test
- public void testMockScanFullEngine() throws Exception{
- RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-
- DrillConfig config = DrillConfig.create();
-
-// try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());){
- try(DrillClient client = new DrillClient(config)){
- long A = System.nanoTime();
-// bit1.run();
- long B = System.nanoTime();
- client.connect();
- long C = System.nanoTime();
- ParquetResultListener listener = new ParquetResultListener();
- client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/mock-scan.json"), Charsets.UTF_8), listener);
- listener.await();
- long D = System.nanoTime();
- System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9));
- }
- }
-}