You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:51 UTC

[13/27] git commit: fixes for memory management and rpc throttling

fixes for memory management and rpc throttling


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/402be7e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/402be7e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/402be7e0

Branch: refs/heads/master
Commit: 402be7e04e744004beb16e9222cf649f2da6fc93
Parents: 0a2f997
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu Aug 15 08:45:20 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Aug 15 16:59:09 2013 -0700

----------------------------------------------------------------------
 .../drill/common/logical/LogicalPlan.java       |   6 +-
 sandbox/prototype/exec/bufferl/pom.xml          |  37 +-
 .../main/java/io/netty/buffer/PoolArenaL.java   |   3 +-
 .../java/io/netty/buffer/PooledByteBufL.java    |   3 +-
 .../java/io/netty/buffer/PooledHeapBufferL.java | 282 +++++++++
 .../buffer/PooledUnsafeDirectByteBufL.java      |   3 +-
 sandbox/prototype/exec/java-exec/pom.xml        |   6 +-
 .../templates/FixedValueVectors.java            |   1 -
 .../templates/NullableValueVectors.java         |   6 +-
 .../templates/RepeatedValueVectors.java         |   8 +-
 .../templates/VariableLengthVectors.java        |   9 +-
 .../apache/drill/exec/client/DrillClient.java   |  11 +-
 .../apache/drill/exec/opt/BasicOptimizer.java   |  37 +-
 .../exec/physical/config/MockStorageEngine.java |  52 --
 .../drill/exec/physical/impl/ScanBatch.java     |   3 +
 .../drill/exec/physical/impl/ScreenCreator.java |  23 +-
 .../exec/physical/impl/SingleSenderCreator.java |   2 -
 .../exec/physical/impl/WireRecordBatch.java     |   1 +
 .../exec/record/FragmentWritableBatch.java      |   6 +-
 .../drill/exec/record/RawFragmentBatch.java     |   4 +
 .../drill/exec/record/VectorContainer.java      |   1 +
 .../apache/drill/exec/record/WritableBatch.java |   1 -
 .../exec/rpc/AbstractHandshakeHandler.java      |   2 +
 .../drill/exec/rpc/CoordinationQueue.java       |  10 +-
 .../drill/exec/rpc/DrillRpcFutureImpl.java      |   1 -
 .../apache/drill/exec/rpc/RemoteConnection.java |  22 +-
 .../java/org/apache/drill/exec/rpc/RpcBus.java  |  12 +-
 .../drill/exec/rpc/user/QueryResultBatch.java   |   6 +-
 .../drill/exec/store/AbstractStorageEngine.java |   6 +-
 .../drill/exec/store/StorageEngineRegistry.java |   2 +-
 .../exec/store/mock/MockStorageEngine.java      |  51 ++
 .../exec/store/parquet/ParquetRecordReader.java |   7 +-
 .../store/parquet/ParquetStorageEngine.java     |   5 +-
 .../exec/work/RemoteFragmentRunnerListener.java |   2 +-
 .../org/apache/drill/exec/work/WorkManager.java |   2 +-
 .../work/batch/AbstractFragmentCollector.java   |   2 +-
 .../work/batch/UnlimitedRawBatchBuffer.java     |  73 +++
 .../exec/work/batch/UnlmitedRawBatchBuffer.java |  73 ---
 .../physical/impl/TestSimpleFragmentRun.java    |   2 +-
 .../apache/drill/exec/store/MockScantTest.java  | 115 ----
 .../exec/store/ParquetRecordReaderTest.java     | 594 -------------------
 .../store/parquet/ParquetRecordReaderTest.java  | 347 +++++++++++
 .../exec/store/parquet/TestFileGenerator.java   | 210 +++++++
 .../src/test/resources/scan_screen_logical.json |   8 +-
 .../exec/java-exec/src/test/sh/logback.xml      |  35 ++
 .../prototype/exec/java-exec/src/test/sh/runbit |   4 +-
 sandbox/prototype/pom.xml                       |  10 +
 47 files changed, 1168 insertions(+), 938 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
index 742001a..6692661 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/logical/LogicalPlan.java
@@ -17,8 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.common.logical;
 
-import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -38,8 +38,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
 
 @JsonPropertyOrder({ "head", "storage", "query" })
 public class LogicalPlan {
@@ -53,7 +51,7 @@ public class LogicalPlan {
   public LogicalPlan(@JsonProperty("head") PlanProperties head,
       @JsonProperty("storage") Map<String, StorageEngineConfig> storageEngineMap,
       @JsonProperty("query") List<LogicalOperator> operators) {
-    this.storageEngineMap = storageEngineMap;
+    this.storageEngineMap = storageEngineMap != null ? storageEngineMap : new HashMap<String, StorageEngineConfig>();
     this.properties = head;
     this.graph = Graph.newGraph(operators, SinkOperator.class, SourceOperator.class);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/bufferl/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/pom.xml b/sandbox/prototype/exec/bufferl/pom.xml
index baa2c3d..11eef91 100644
--- a/sandbox/prototype/exec/bufferl/pom.xml
+++ b/sandbox/prototype/exec/bufferl/pom.xml
@@ -1,25 +1,26 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Copyright 2012 The Netty Project
-  ~
-  ~ The Netty Project licenses this file to you under the Apache License,
-  ~ version 2.0 (the "License"); you may not use this file except in compliance
-  ~ with the License. You may obtain a copy of the License at:
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-  ~ License for the specific language governing permissions and limitations
-  ~ under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<!-- ~ Copyright 2012 The Netty Project ~ ~ The Netty Project licenses this 
+  file to you under the Apache License, ~ version 2.0 (the "License"); you 
+  may not use this file except in compliance ~ with the License. You may obtain 
+  a copy of the License at: ~ ~ http://www.apache.org/licenses/LICENSE-2.0 
+  ~ ~ Unless required by applicable law or agreed to in writing, software ~ 
+  distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
+  ~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
+  ~ License for the specific language governing permissions and limitations 
+  ~ under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
   <modelVersion>4.0.0</modelVersion>
 
+  <parent>
+    <groupId>org.apache.drill.exec</groupId>
+    <artifactId>exec-parent</artifactId>
+    <version>1.0-SNAPSHOT</version>
+  </parent>
+
   <groupId>org.apache.drill.exec</groupId>
-  <version>4.0.3.Final</version>
+  <version>4.0.7.Final</version>
   <artifactId>netty-bufferl</artifactId>
 
   <name>Netty/Drill/Buffer</name>
@@ -31,5 +32,5 @@
       <version>${project.version}</version>
     </dependency>
   </dependencies>
-  
+
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
index db9818d..12fd1ae 100644
--- a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PoolArenaL.java
@@ -354,8 +354,7 @@ abstract class PoolArenaL<T> {
 
         @Override
         protected PooledByteBufL<byte[]> newByteBuf(int maxCapacity) {
-          throw new UnsupportedOperationException();
-//            return PooledHeapByteBufL.newInstance(maxCapacity);
+            return PooledHeapByteBufL.newInstance(maxCapacity);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
index c25c2e9..ded7c62 100644
--- a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledByteBufL.java
@@ -148,7 +148,7 @@ abstract class PooledByteBufL<T> extends AbstractReferenceCountedByteBuf {
             this.handle = -1;
             memory = null;
             chunk.arena.free(chunk, handle);
-            if (ResourceLeakDetector.ENABLED) {
+            if (leak != null) {
                 leak.close();
             } else {
                 recycle();
@@ -160,7 +160,6 @@ abstract class PooledByteBufL<T> extends AbstractReferenceCountedByteBuf {
     private void recycle() {
         Recycler.Handle recyclerHandle = this.recyclerHandle;
         if (recyclerHandle != null) {
-            setRefCnt(1);
             ((Recycler<Object>) recycler()).recycle(this, recyclerHandle);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledHeapBufferL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledHeapBufferL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledHeapBufferL.java
new file mode 100644
index 0000000..70b517c
--- /dev/null
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledHeapBufferL.java
@@ -0,0 +1,282 @@
+package io.netty.buffer;
+
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file tothe License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import io.netty.util.Recycler;
+import io.netty.util.internal.PlatformDependent;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+final class PooledHeapByteBufL extends PooledByteBufL<byte[]> {
+
+    private static final Recycler<PooledHeapByteBufL> RECYCLER = new Recycler<PooledHeapByteBufL>() {
+        @Override
+        protected PooledHeapByteBufL newObject(Handle handle) {
+            return new PooledHeapByteBufL(handle, 0);
+        }
+    };
+
+    static PooledHeapByteBufL newInstance(int maxCapacity) {
+        PooledHeapByteBufL buf = RECYCLER.get();
+        buf.setRefCnt(1);
+        buf.maxCapacity(maxCapacity);
+        return buf;
+    }
+
+    private PooledHeapByteBufL(Recycler.Handle recyclerHandle, int maxCapacity) {
+        super(recyclerHandle, maxCapacity);
+    }
+
+    @Override
+    public boolean isDirect() {
+        return false;
+    }
+
+    @Override
+    protected byte _getByte(int index) {
+        return memory[idx(index)];
+    }
+
+    @Override
+    protected short _getShort(int index) {
+        index = idx(index);
+        return (short) (memory[index] << 8 | memory[index + 1] & 0xFF);
+    }
+
+    @Override
+    protected int _getUnsignedMedium(int index) {
+        index = idx(index);
+        return (memory[index]     & 0xff) << 16 |
+               (memory[index + 1] & 0xff) <<  8 |
+                memory[index + 2] & 0xff;
+    }
+
+    @Override
+    protected int _getInt(int index) {
+        index = idx(index);
+        return (memory[index]     & 0xff) << 24 |
+               (memory[index + 1] & 0xff) << 16 |
+               (memory[index + 2] & 0xff) <<  8 |
+                memory[index + 3] & 0xff;
+    }
+
+    @Override
+    protected long _getLong(int index) {
+        index = idx(index);
+        return ((long) memory[index]     & 0xff) << 56 |
+               ((long) memory[index + 1] & 0xff) << 48 |
+               ((long) memory[index + 2] & 0xff) << 40 |
+               ((long) memory[index + 3] & 0xff) << 32 |
+               ((long) memory[index + 4] & 0xff) << 24 |
+               ((long) memory[index + 5] & 0xff) << 16 |
+               ((long) memory[index + 6] & 0xff) <<  8 |
+                (long) memory[index + 7] & 0xff;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+        checkDstIndex(index, length, dstIndex, dst.capacity());
+        if (dst.hasMemoryAddress()) {
+            PlatformDependent.copyMemory(memory, idx(index), dst.memoryAddress() + dstIndex, length);
+        } else if (dst.hasArray()) {
+            getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
+        } else {
+            dst.setBytes(dstIndex, memory, idx(index), length);
+        }
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+        checkDstIndex(index, length, dstIndex, dst.length);
+        System.arraycopy(memory, idx(index), dst, dstIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuffer dst) {
+        checkIndex(index);
+        dst.put(memory, idx(index), Math.min(capacity() - index, dst.remaining()));
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+        checkIndex(index, length);
+        out.write(memory, idx(index), length);
+        return this;
+    }
+
+    @Override
+    public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+        checkIndex(index, length);
+        index = idx(index);
+        return out.write((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
+    }
+
+    @Override
+    protected void _setByte(int index, int value) {
+        memory[idx(index)] = (byte) value;
+    }
+
+    @Override
+    protected void _setShort(int index, int value) {
+        index = idx(index);
+        memory[index]     = (byte) (value >>> 8);
+        memory[index + 1] = (byte) value;
+    }
+
+    @Override
+    protected void _setMedium(int index, int   value) {
+        index = idx(index);
+        memory[index]     = (byte) (value >>> 16);
+        memory[index + 1] = (byte) (value >>> 8);
+        memory[index + 2] = (byte) value;
+    }
+
+    @Override
+    protected void _setInt(int index, int   value) {
+        index = idx(index);
+        memory[index]     = (byte) (value >>> 24);
+        memory[index + 1] = (byte) (value >>> 16);
+        memory[index + 2] = (byte) (value >>> 8);
+        memory[index + 3] = (byte) value;
+    }
+
+    @Override
+    protected void _setLong(int index, long  value) {
+        index = idx(index);
+        memory[index]     = (byte) (value >>> 56);
+        memory[index + 1] = (byte) (value >>> 48);
+        memory[index + 2] = (byte) (value >>> 40);
+        memory[index + 3] = (byte) (value >>> 32);
+        memory[index + 4] = (byte) (value >>> 24);
+        memory[index + 5] = (byte) (value >>> 16);
+        memory[index + 6] = (byte) (value >>> 8);
+        memory[index + 7] = (byte) value;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+        checkSrcIndex(index, length, srcIndex, src.capacity());
+        if (src.hasMemoryAddress()) {
+            PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, memory, idx(index), length);
+        } else if (src.hasArray()) {
+            setBytes(index, src.array(), src.arrayOffset() + srcIndex, length);
+        } else {
+            src.getBytes(srcIndex, memory, idx(index), length);
+        }
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+        checkSrcIndex(index, length, srcIndex, src.length);
+        System.arraycopy(src, srcIndex, memory, idx(index), length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, ByteBuffer src) {
+        int length = src.remaining();
+        checkIndex(index, length);
+        src.get(memory, idx(index), length);
+        return this;
+    }
+
+    @Override
+    public int setBytes(int index, InputStream in, int length) throws IOException {
+        checkIndex(index, length);
+        return in.read(memory, idx(index), length);
+    }
+
+    @Override
+    public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+        checkIndex(index, length);
+        index = idx(index);
+        try {
+            return in.read((ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length));
+        } catch (ClosedChannelException e) {
+            return -1;
+        }
+    }
+
+    @Override
+    public ByteBuf copy(int index, int length) {
+        checkIndex(index, length);
+        ByteBuf copy = alloc().heapBuffer(length, maxCapacity());
+        copy.writeBytes(memory, idx(index), length);
+        return copy;
+    }
+
+    @Override
+    public int nioBufferCount() {
+        return 1;
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers(int index, int length) {
+        return new ByteBuffer[] { nioBuffer(index, length) };
+    }
+
+    @Override
+    public ByteBuffer internalNioBuffer(int index, int length) {
+        checkIndex(index, length);
+        index = idx(index);
+        return (ByteBuffer) internalNioBuffer().clear().position(index).limit(index + length);
+    }
+
+    @Override
+    public boolean hasArray() {
+        return true;
+    }
+
+    @Override
+    public byte[] array() {
+        return memory;
+    }
+
+    @Override
+    public int arrayOffset() {
+        return offset;
+    }
+
+    @Override
+    public boolean hasMemoryAddress() {
+        return false;
+    }
+
+    @Override
+    public long memoryAddress() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected ByteBuffer newInternalNioBuffer(byte[] memory) {
+        return ByteBuffer.wrap(memory);
+    }
+
+    @Override
+    protected Recycler<?> recycler() {
+        return RECYCLER;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
index 949f9fb..99daf62 100644
--- a/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
+++ b/sandbox/prototype/exec/bufferl/src/main/java/io/netty/buffer/PooledUnsafeDirectByteBufL.java
@@ -30,7 +30,7 @@ import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
 
 final class PooledUnsafeDirectByteBufL extends PooledByteBufL<ByteBuffer> {
-
+  
     private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
 
     private static final Recycler<PooledUnsafeDirectByteBufL> RECYCLER = new Recycler<PooledUnsafeDirectByteBufL>() {
@@ -42,6 +42,7 @@ final class PooledUnsafeDirectByteBufL extends PooledByteBufL<ByteBuffer> {
 
     static PooledUnsafeDirectByteBufL newInstance(int maxCapacity) {
         PooledUnsafeDirectByteBufL buf = RECYCLER.get();
+        buf.setRefCnt(1);
         buf.maxCapacity(maxCapacity);
         return buf;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index cd9bc9a..a2e8501 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -65,7 +65,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.drill.exec</groupId>
-      <version>4.0.3.Final</version>
+      <version>4.0.7.Final</version>
       <artifactId>netty-bufferl</artifactId>
     </dependency>
     <dependency>
@@ -166,8 +166,8 @@
     </dependency>
     <dependency>
       <groupId>io.netty</groupId>
-      <artifactId>netty-all</artifactId>
-      <version>4.0.3.Final</version>
+      <artifactId>netty-handler</artifactId>
+      <version>4.0.7.Final</version>
     </dependency>
     <dependency>
       <groupId>com.google.protobuf</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
index 147762e..311e715 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/FixedValueVectors.java
@@ -61,7 +61,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
   public void allocateNew(int valueCount) {
     clear();
     this.data = allocator.buffer(valueCount * ${type.width});
-    this.data.retain();
     this.data.readerIndex(0);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
index 483166b..ec7af46 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/NullableValueVectors.java
@@ -10,6 +10,7 @@ import java.lang.UnsupportedOperationException;
 
 package org.apache.drill.exec.vector;
 
+
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 import io.netty.buffer.ByteBuf;
@@ -30,6 +31,7 @@ import org.apache.drill.exec.vector.UInt2Vector;
 import org.apache.drill.exec.vector.UInt4Vector;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.ObjectArrays;
 
 /**
  * Nullable${minor.class} implements a vector of values which could be null.  Elements in the vector
@@ -59,7 +61,9 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
   
   @Override
   public ByteBuf[] getBuffers() {
-    return ArrayUtils.addAll(bits.getBuffers(), values.getBuffers());
+    ByteBuf[] buffers = ObjectArrays.concat(bits.getBuffers(), values.getBuffers(), ByteBuf.class);
+    clear();
+    return buffers;
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
index c1660e8..44d036b 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/RepeatedValueVectors.java
@@ -24,8 +24,11 @@ import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.record.DeadBuf;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
+
 import java.util.List;
+
 import com.google.common.collect.Lists;
+import com.google.common.collect.ObjectArrays;
 
 @SuppressWarnings("unused")
 /**
@@ -176,8 +179,11 @@ import com.google.common.collect.Lists;
   }
   </#if>
 
+  @Override
   public ByteBuf[] getBuffers() {
-    return ArrayUtils.addAll(offsets.getBuffers(), values.getBuffers());
+    ByteBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(), values.getBuffers(), ByteBuf.class);
+    clear();
+    return buffers;
   }
 
   public void clear(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
index 7ceafe4..3be6dc2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
+++ b/sandbox/prototype/exec/java-exec/src/main/codegen/ValueVectors/templates/VariableLengthVectors.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.TransferPair;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.ObjectArrays;
 
 
 /**
@@ -74,7 +75,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
   
   @Override
   public FieldMetadata getMetadata() {
-    int len = valueCount * ${type.width} + getVarByteLength();
+    int len = (valueCount+1) * ${type.width} + getVarByteLength();
     return FieldMetadata.newBuilder()
              .setDef(getField().getDef())
              .setValueCount(valueCount)
@@ -104,9 +105,12 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     offsetVector.clear();
   }
 
+  
   @Override
   public ByteBuf[] getBuffers() {
-    return ArrayUtils.addAll(offsetVector.getBuffers(), super.getBuffers());
+    ByteBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(), super.getBuffers(), ByteBuf.class);
+    clear();
+    return buffers;
   }
   
   public TransferPair getTransferPair(){
@@ -151,7 +155,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
     clear();
     assert totalBytes >= 0;
     data = allocator.buffer(totalBytes);
-    data.retain();
     data.readerIndex(0);
     offsetVector.allocateNew(valueCount+1);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 1b49d54..cf99abd 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -34,6 +34,7 @@ import java.util.Vector;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.ZKClusterCoordinator;
+import org.apache.drill.exec.memory.DirectBufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.UserProtos.QueryType;
 import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
@@ -58,6 +59,7 @@ public class DrillClient implements Closeable{
   private UserClient client;
   private volatile ClusterCoordinator clusterCoordinator;
   private volatile boolean connected = false;
+  private final DirectBufferAllocator allocator = new DirectBufferAllocator();
   
   public DrillClient() {
     this(DrillConfig.create());
@@ -99,8 +101,7 @@ public class DrillClient implements Closeable{
     checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
     // just use the first endpoint for now
     DrillbitEndpoint endpoint = endpoints.iterator().next();
-    ByteBufAllocator bb = new PooledByteBufAllocatorL(true);
-    this.client = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
+    this.client = new UserClient(allocator.getUnderlyingAllocator(), new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
     try {
       logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
       FutureHandler f = new FutureHandler();
@@ -112,6 +113,12 @@ public class DrillClient implements Closeable{
     }
   }
 
+  
+  
+  public DirectBufferAllocator getAllocator() {
+    return allocator;
+  }
+
   /**
    * Closes this client's connection to the server
    *

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index c4a7e43..b5eea03 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -13,6 +13,7 @@ import org.apache.drill.common.expression.FunctionDefinition;
 import org.apache.drill.common.expression.NoArgValidator;
 import org.apache.drill.common.expression.OutputTypeDeterminer;
 import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.logical.StorageEngineConfig;
 import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.Project;
 import org.apache.drill.common.logical.data.Scan;
@@ -27,7 +28,10 @@ import org.apache.drill.exec.exception.SetupException;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.*;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SelectionVectorRemover;
+import org.apache.drill.exec.store.StorageEngine;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 
@@ -99,32 +103,15 @@ public class BasicOptimizer extends Optimizer{
 
     @Override
     public PhysicalOperator visitScan(Scan scan, Object obj) throws OptimizerException {
-      List<MockGroupScanPOP.MockScanEntry> myObjects;
-
+      StorageEngineConfig config = logicalPlan.getStorageEngineConfig(scan.getStorageEngine());
+      if(config == null) throw new OptimizerException(String.format("Logical plan referenced the storage engine config %s but the logical plan didn't have that available as a config.", scan.getStorageEngine()));
+      StorageEngine engine;
       try {
-        if (scan.getStorageEngine().equals("parquet")) {
-          return context.getStorageEngine(logicalPlan.getStorageEngineConfig(scan.getStorageEngine())).getPhysicalScan(scan);
-        }
-        if (scan.getStorageEngine().equals("local-logs")) {
-          myObjects = scan.getSelection().getListWith(config,
-              new TypeReference<ArrayList<MockGroupScanPOP.MockScanEntry>>() {
-              });
-        } else {
-          myObjects = new ArrayList<>();
-          MockGroupScanPOP.MockColumn[] cols = {
-              new MockGroupScanPOP.MockColumn("blah", MinorType.INT, DataMode.REQUIRED, 4, 4, 4),
-              new MockGroupScanPOP.MockColumn("blah_2", MinorType.INT, DataMode.REQUIRED, 4, 4, 4) };
-          myObjects.add(new MockGroupScanPOP.MockScanEntry(50, cols));
-        }
-      } catch (IOException e) {
-        throw new OptimizerException(
-            "Error reading selection attribute of GroupScan node in Logical to Physical plan conversion.", e);
-      } catch (SetupException e) {
-        throw new OptimizerException(
-            "Storage engine not found: " + scan.getStorageEngine(), e);
+        engine = context.getStorageEngine(config);
+        return engine.getPhysicalScan(scan);
+      } catch (SetupException | IOException e) {
+        throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
       }
-
-      return new MockGroupScanPOP("http://apache.org", myObjects);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
deleted file mode 100644
index 6348686..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.physical.config;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.physical.ReadEntry;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.store.AbstractStorageEngine;
-import org.apache.drill.exec.store.RecordReader;
-
-import com.google.common.collect.ListMultimap;
-
-public class MockStorageEngine extends AbstractStorageEngine{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
-
-  @Override
-  public boolean supportsRead() {
-    return true;
-  }
-
-  @Override
-  public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
-    return null;
-  }
-
-  @Override
-  public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException {
-    return null;
-  }
-
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 4227450..ea98c29 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -162,6 +162,9 @@ public class ScanBatch implements RecordBatch {
 
     @Override
     public void removeAllFields() {
+      for(VectorWrapper<?> vw : holder){
+        vw.release();
+      }
       holder.clear();
       fieldVectorMap.clear();
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 5c5e2e5..9b31407 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
+import io.netty.buffer.ByteBuf;
+
 import java.util.List;
 
 import org.apache.drill.exec.ops.FragmentContext;
@@ -76,16 +78,16 @@ public class ScreenCreator implements RootCreator<Screen>{
       logger.debug("Screen Outcome {}", outcome);
       switch(outcome){
       case STOP: {
-          QueryResult header1 = QueryResult.newBuilder() //
+          QueryResult header = QueryResult.newBuilder() //
               .setQueryId(context.getHandle().getQueryId()) //
               .setRowCount(0) //
               .addError(ErrorHelper.logAndConvertError(context.getIdentity(), "Screen received stop request sent.", context.getFailureCause(), logger))
               .setDef(RecordBatchDef.getDefaultInstance()) //
               .setIsLastChunk(true) //
               .build();
-          QueryWritableBatch batch1 = new QueryWritableBatch(header1);
+          QueryWritableBatch batch = new QueryWritableBatch(header);
+          connection.sendResult(listener, batch);
 
-          connection.sendResult(listener, batch1);
           return false;
       }
       case NONE: {
@@ -93,16 +95,18 @@ public class ScreenCreator implements RootCreator<Screen>{
           // receive no results.
           context.batchesCompleted.inc(1);
           context.recordsCompleted.inc(incoming.getRecordCount());
-          QueryResult header2 = QueryResult.newBuilder() //
+          QueryResult header = QueryResult.newBuilder() //
               .setQueryId(context.getHandle().getQueryId()) //
               .setRowCount(0) //
               .setDef(RecordBatchDef.getDefaultInstance()) //
               .setIsLastChunk(true) //
               .build();
-          QueryWritableBatch batch2 = new QueryWritableBatch(header2);
-          connection.sendResult(listener, batch2);
+          QueryWritableBatch batch = new QueryWritableBatch(header);
+          connection.sendResult(listener, batch);
+
         }else{
-          connection.sendResult(listener, materializer.convertNext(true));
+          QueryWritableBatch batch = materializer.convertNext(true);
+          connection.sendResult(listener, batch);
         }
         return false;
       }
@@ -112,7 +116,8 @@ public class ScreenCreator implements RootCreator<Screen>{
       case OK:
         context.batchesCompleted.inc(1);
         context.recordsCompleted.inc(incoming.getRecordCount());
-        connection.sendResult(listener, materializer.convertNext(false));
+        QueryWritableBatch batch = materializer.convertNext(false);
+        connection.sendResult(listener, batch);
         return true;
       default:
         throw new UnsupportedOperationException();
@@ -128,6 +133,8 @@ public class ScreenCreator implements RootCreator<Screen>{
     
     private class SendListener extends BaseRpcOutcomeListener<Ack>{
 
+
+
       @Override
       public void failed(RpcException ex) {
         logger.error("Failure while sending data to user.", ex);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index a40031e..69455a1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -75,14 +75,12 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       case NONE:
         FragmentWritableBatch b2 = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
         tunnel.sendRecordBatch(new RecordSendFailure(), context, b2);
-        b2.release();
         return false;
 
       case OK_NEW_SCHEMA:
       case OK:
         FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
         tunnel.sendRecordBatch(new RecordSendFailure(), context, batch);
-        batch.release();
         return true;
 
       case NOT_YET:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index 93f643d..c128504 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -105,6 +105,7 @@ public class WireRecordBatch implements RecordBatch{
 
       RecordBatchDef rbd = batch.getHeader().getDef();
       boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
+      batch.release();
       if(schemaChanged){
         this.schema = batchLoader.getSchema();
         return IterOutcome.OK_NEW_SCHEMA;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index 964ef5c..3c25204 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -55,11 +55,7 @@ public class FragmentWritableBatch{
     return header;
   }
   
-  public void release(){
-    for(ByteBuf b : buffers){
-      b.release();
-    }
-  }
+
   
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index 164bf59..217d34d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -46,5 +46,9 @@ public class RawFragmentBatch {
   public String toString() {
     return "RawFragmentBatch [header=" + header + ", body=" + body + "]";
   }
+  
+  public void release(){
+    if(body != null) body.release();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 923fbd5..57aad79 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -70,6 +70,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>> {
     for (Iterator<VectorWrapper<?>> iter = wrappers.iterator(); iter.hasNext();) {
       VectorWrapper<?> w = iter.next();
       if (!w.isHyper() && v == w.getValueVector()) {
+        w.release();
         iter.remove();
         return;
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index cac042b..eb9c2c7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -79,7 +79,6 @@ public class WritableBatch {
       
       for (ByteBuf b : vv.getBuffers()) {
         buffers.add(b);
-        b.retain();
       }
       // remove vv access to buffers.
       vv.clear();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
index 112b537..e5f2cc3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
@@ -51,6 +51,8 @@ public abstract class AbstractHandshakeHandler<T extends MessageLite> extends Me
   
     T msg = parser.parseFrom(inbound.getProtobufBodyAsIS());
     consumeHandshake(ctx, msg);
+    inbound.pBody.release();
+    if(inbound.dBody != null) inbound.dBody.release();
     
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index f36530f..4b7d611 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -52,9 +52,9 @@ public class CoordinationQueue {
     }
   }
 
-  public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz){
+  public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V> handler, Class<V> clazz, RemoteConnection connection){
     int i = circularInt.getNext();
-    RpcListener<V> future = new RpcListener<V>(handler, clazz, i);
+    RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection);
     Object old = map.put(i, future);
     if (old != null)
       throw new IllegalStateException(
@@ -66,17 +66,19 @@ public class CoordinationQueue {
     final RpcOutcomeListener<T> handler;
     final Class<T> clazz;
     final int coordinationId;
+    final RemoteConnection connection;
     
-    public RpcListener(RpcOutcomeListener<T> handler, Class<T> clazz, int coordinationId) {
+    public RpcListener(RpcOutcomeListener<T> handler, Class<T> clazz, int coordinationId, RemoteConnection connection) {
       super();
       this.handler = handler;
       this.clazz = clazz;
       this.coordinationId = coordinationId;
+      this.connection = connection;
     }
 
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
-      
+      connection.releasePermit();
       if(!future.isSuccess()){
         removeFromMap(coordinationId);
         future.get();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
index 3b2452c..6a9d8c7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -67,7 +67,6 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
   @Override
   public void success(V value, ByteBuf buffer) {
     this.buffer = buffer;
-    if(buffer != null) buffer.retain();
     ( (InnerFuture<V>)delegate()).setValue(value);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index cedba10..35b5938 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -19,12 +19,32 @@ package org.apache.drill.exec.rpc;
 
 import io.netty.channel.Channel;
 
+import java.util.concurrent.Semaphore;
+
 public class RemoteConnection{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
   private final Channel channel;
   
-  public RemoteConnection(Channel channel) {
+  final Semaphore throttle;
+  
+  public void acquirePermit() throws InterruptedException{
+    if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Acquiring send permit.");
+    this.throttle.acquire();
+    if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Send permit acquired.");
+  }
+  
+  public void releasePermit() {
+    throttle.release();
+  }
+  
+  public RemoteConnection(Channel channel, int maxOutstanding) {
     super();
     this.channel = channel;
+    this.throttle  = new Semaphore(maxOutstanding);
+  }
+  
+  public RemoteConnection(Channel channel) {
+    this(channel, 100);
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index b08aa96..4e672b7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -82,11 +82,15 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
 
     try {
       Preconditions.checkNotNull(protobufBody);
-      ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz);
+      ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz, connection);
       OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(), protobufBody, dataBodies);
+      connection.acquirePermit();
       ChannelFuture channelFuture = connection.getChannel().writeAndFlush(m);
       channelFuture.addListener(futureListener);
       completed = true;
+    } catch (InterruptedException e) {
+      completed = true;
+      listener.failed(new RpcException("Interrupted while attempting to acquire outbound queue.", e));
     } finally {
       if (!completed) {
         if (pBuffer != null) pBuffer.release();
@@ -136,8 +140,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
       case REQUEST:
         // handle message and ack.
         Response r = handle(connection, msg.rpcType, msg.pBody, msg.dBody);
-        msg.pBody.release();
-        if(msg.dBody != null) msg.dBody.release(); // we release our ownership.  Handle could have taken over ownership.
+        msg.release();  // we release our ownership.  Handle could have taken over ownership.
         assert rpcConfig.checkResponseSend(r.rpcType, r.pBody.getClass());
         OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, msg.coordinationId,
             r.pBody, r.dBodies);
@@ -152,9 +155,8 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
         RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
         Parser<?> parser = m.getParserForType();
         Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
-        msg.pBody.release();
         rpcFuture.set(value, msg.dBody);
-        if(msg.dBody != null) msg.dBody.release();
+        msg.release();  // we release our ownership.  Handle could have taken over ownership.
         if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
         }catch(Exception ex){
           logger.error("Failure while handling response.", ex);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
index 48b0dae..c3aad22 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
@@ -31,7 +31,7 @@ public class QueryResultBatch {
 //    logger.debug("New Result Batch with header {} and data {}", header, data);
     this.header = header;
     this.data = data;
-    if(data != null) data.retain();
+    if(this.data != null) data.retain();
   }
 
   public QueryResult getHeader() {
@@ -47,6 +47,10 @@ public class QueryResultBatch {
     return data != null;
   }
 
+  public void release(){
+    if(data != null) data.release();
+  }
+  
   @Override
   public String toString() {
     return "QueryResultBatch [header=" + header + ", data=" + data + "]";

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
index 9c48052..d854789 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStorageEngine.java
@@ -31,9 +31,13 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multimap;
 
-public class AbstractStorageEngine implements StorageEngine{
+public abstract class AbstractStorageEngine implements StorageEngine{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStorageEngine.class);
 
+  protected AbstractStorageEngine(){
+  }
+  
+  
   @Override
   public boolean supportsRead() {
     return false;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
index 26504a2..4551c1f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/StorageEngineRegistry.java
@@ -51,7 +51,7 @@ public class StorageEngineRegistry {
       for(Constructor<?> c : engine.getConstructors()){
         Class<?>[] params = c.getParameterTypes();
         if(params.length != 2 || params[1] != DrillbitContext.class || !StorageEngineConfig.class.isAssignableFrom(params[0])){
-          logger.debug("Skipping StorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillbitContext)]", c, engine);
+          logger.info("Skipping StorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillbitContext)]", c, engine);
           continue;
         }
         availableEngines.put(params[0], (Constructor<? extends StorageEngine>) c);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
new file mode 100644
index 0000000..23ac2b8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -0,0 +1,51 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.mock;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP;
+import org.apache.drill.exec.physical.config.MockGroupScanPOP.MockScanEntry;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStorageEngine;
+import org.apache.drill.storage.MockStorageEngineConfig;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class MockStorageEngine extends AbstractStorageEngine {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+
+  public MockStorageEngine(MockStorageEngineConfig configuration, DrillbitContext context) {
+
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(Scan scan) throws IOException {
+
+    ArrayList<MockScanEntry> readEntries = scan.getSelection().getListWith(new ObjectMapper(),
+        new TypeReference<ArrayList<MockScanEntry>>() {
+        });
+    
+    return new MockGroupScanPOP(null, readEntries);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 2ad7b44..2d36a08 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -60,8 +60,8 @@ public class ParquetRecordReader implements RecordReader {
   private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
 
   // TODO - should probably find a smarter way to set this, currently 2 megabytes
-  private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 2;
-  public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 5;
+  private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 1;
+  public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 1;
   private static final String SEPERATOR = System.getProperty("file.separator");
 
 
@@ -398,6 +398,7 @@ public class ParquetRecordReader implements RecordReader {
   @Override
   public void cleanup() {
     columnStatuses.clear();
-    bufferWithAllData.clear();
+    this.varLengthReader.columns.clear();
+    bufferWithAllData.release();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
index f070f0f..cf5b5d8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
@@ -24,21 +24,24 @@ import java.util.Collection;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.data.Scan;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.ReadEntry;
 import org.apache.drill.exec.physical.ReadEntryWithPath;
-import org.apache.drill.exec.physical.config.MockStorageEngine;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStorageEngine;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.mock.MockStorageEngine;
 
 import com.google.common.collect.ListMultimap;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
+
 import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.CodecFactoryExposer;
 import parquet.hadoop.ParquetFileReader;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
index 93f1af7..4ecbd0e 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
@@ -38,7 +38,7 @@ public class RemoteFragmentRunnerListener extends AbstractFragmentRunnerListener
   
   @Override
   protected void statusChange(FragmentHandle handle, FragmentStatus status) {
-    logger.debug("Sending remote failure.");
+    logger.debug("Sending remote status message. {}", status);
     tunnel.sendFragmentStatus(status);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 1170a1e..fa51b09 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -63,7 +63,7 @@ public class WorkManager implements Closeable{
   private final BitComHandler bitComWorker;
   private final UserWorker userWorker;
   private final WorkerBee bee;
-  private Executor executor = Executors.newFixedThreadPool(4, new NamedThreadFactory("Working Thread - "));
+  private Executor executor = Executors.newFixedThreadPool(4, new NamedThreadFactory("WorkManager-"));
   private final EventThread eventThread;
   
   public WorkManager(BootStrapContext context){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
index dd55377..9e420c1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractFragmentCollector.java
@@ -49,7 +49,7 @@ public abstract class AbstractFragmentCollector implements BatchCollector{
     this.oppositeMajorFragmentId = receiver.getOppositeMajorFragmentId();
     this.buffers = new RawBatchBuffer[minInputsRequired];
     for(int i = 0; i < buffers.length; i++){
-      buffers[i] = new UnlmitedRawBatchBuffer();
+      buffers[i] = new UnlimitedRawBatchBuffer();
     }
     if (receiver.supportsOutOfOrderExchange()) {
       this.remainingRequired = new AtomicInteger(1);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
new file mode 100644
index 0000000..faff6c7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.batch;
+
+import java.util.Iterator;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+
+import com.google.common.collect.Queues;
+
+public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
+
+  private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();
+  private volatile boolean finished = false;
+  
+  @Override
+  public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) {
+    buffer.add(batch);
+  }
+
+//  @Override
+//  public RawFragmentBatch dequeue() {
+//    return buffer.poll();
+//  }
+
+  @Override
+  public void kill(FragmentContext context) {
+    // TODO: Pass back or kill handler?
+  }
+
+  
+  @Override
+  public void finished() {
+    finished = true;
+  }
+
+  @Override
+  public RawFragmentBatch getNext(){
+    
+    RawFragmentBatch b = buffer.poll();
+    if(b == null && !finished){
+      try {
+        return buffer.take();
+      } catch (InterruptedException e) {
+        return null;
+      }
+    }
+    
+    return b;
+    
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
deleted file mode 100644
index 71ae576..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlmitedRawBatchBuffer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.work.batch;
-
-import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
-
-import com.google.common.collect.Queues;
-
-public class UnlmitedRawBatchBuffer implements RawBatchBuffer{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlmitedRawBatchBuffer.class);
-
-  private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();
-  private volatile boolean finished = false;
-  
-  @Override
-  public void enqueue(ConnectionThrottle throttle, RawFragmentBatch batch) {
-    buffer.add(batch);
-  }
-
-//  @Override
-//  public RawFragmentBatch dequeue() {
-//    return buffer.poll();
-//  }
-
-  @Override
-  public void kill(FragmentContext context) {
-    // TODO: Pass back or kill handler?
-  }
-
-  
-  @Override
-  public void finished() {
-    finished = true;
-  }
-
-  @Override
-  public RawFragmentBatch getNext(){
-    
-    RawFragmentBatch b = buffer.poll();
-    if(b == null && !finished){
-      try {
-        return buffer.take();
-      } catch (InterruptedException e) {
-        return null;
-      }
-    }
-    
-    return b;
-    
-  }
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
index db6c437..6587237 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestSimpleFragmentRun.java
@@ -62,7 +62,7 @@ public class TestSimpleFragmentRun extends PopUnitTestBase {
     List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_test2.json"), Charsets.UTF_8));
 
     // look at records
-    RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+    RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator());
     int recordCount = 0;
     for (QueryResultBatch batch : results) {
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/402be7e0/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java
deleted file mode 100644
index f9a1ecb..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.store;
-
-import com.beust.jcommander.internal.Lists;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.util.FileUtils;
-import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.proto.UserProtos;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
-import org.apache.drill.exec.rpc.user.UserResultsListener;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.store.parquet.ParquetStorageEngine;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.junit.Test;
-import parquet.bytes.BytesInput;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.Page;
-import parquet.column.page.PageReadStore;
-import parquet.column.page.PageReader;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileWriter;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-
-import static org.junit.Assert.*;
-import static parquet.column.Encoding.PLAIN;
-
-
-public class MockScantTest {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StorageEngineRegistry.class);
-
-  private boolean VERBOSE_DEBUG = false;
-
-  private class ParquetResultListener implements UserResultsListener {
-
-    CountDownLatch latch = new CountDownLatch(1);
-    @Override
-    public void submissionFailed(RpcException ex) {
-      latch.countDown();
-    }
-
-    @Override
-    public void resultArrived(QueryResultBatch result) {
-      if(result.getHeader().getIsLastChunk()) latch.countDown();
-      result.getData().release(1);
-    }
-
-    public void await() throws Exception {
-      latch.await();
-    }
-  }
-
-
-  @Test
-  public void testMockScanFullEngine() throws Exception{
-    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-
-    DrillConfig config = DrillConfig.create();
-
-//    try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());){
-    try(DrillClient client = new DrillClient(config)){
-      long A = System.nanoTime();
-//      bit1.run();
-      long B = System.nanoTime();
-      client.connect();
-      long C = System.nanoTime();
-      ParquetResultListener listener = new ParquetResultListener();
-      client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/mock-scan.json"), Charsets.UTF_8), listener);
-      listener.await();
-      long D = System.nanoTime();
-      System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9));
-    }
-  }
-}