You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/23 07:10:27 UTC

[1/2] git commit: Update RPC framework to use Netty 4.0.3.Final Correct changes in RPC framework based on Netty API changes Fixed buffer reuse issues. Update buffer reference counting (clearly going to be problematic)

Updated Branches:
  refs/heads/master a0ed8fc03 -> 93ddf2660


Update RPC framework to use Netty 4.0.3.Final
Correct changes in RPC framework based on Netty API changes
Fixed buffer reuse issues.  Update buffer reference counting (clearly going to be problematic)


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/430e0c0d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/430e0c0d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/430e0c0d

Branch: refs/heads/master
Commit: 430e0c0d7b4e41f60a274b6fb533cfa621d2cc3f
Parents: a0ed8fc
Author: Jacques Nadeau <ja...@apache.org>
Authored: Mon Jul 22 11:59:22 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jul 22 22:00:47 2013 -0700

----------------------------------------------------------------------
 .../common/expression/FunctionRegistry.java     |   2 +-
 sandbox/prototype/exec/java-exec/pom.xml        |   2 +-
 .../apache/drill/exec/client/DrillClient.java   |   6 +
 .../exec/physical/impl/SingleSenderCreator.java |   4 +-
 .../org/apache/drill/exec/record/DeadBuf.java   | 118 +++++++++----------
 .../drill/exec/record/RawFragmentBatch.java     |   1 +
 .../exec/rpc/AbstractHandshakeHandler.java      |  17 +--
 .../drill/exec/rpc/BaseRpcOutcomeListener.java  |   4 +-
 .../org/apache/drill/exec/rpc/BasicClient.java  |  12 +-
 .../org/apache/drill/exec/rpc/BasicServer.java  |  12 +-
 .../drill/exec/rpc/CoordinationQueue.java       |  31 ++---
 .../apache/drill/exec/rpc/DrillRpcFuture.java   |   4 +
 .../drill/exec/rpc/DrillRpcFutureImpl.java      |  21 +++-
 .../drill/exec/rpc/PositiveAtomicInteger.java   |   2 +-
 .../java/org/apache/drill/exec/rpc/RpcBus.java  |  28 +++--
 .../apache/drill/exec/rpc/RpcCheckedFuture.java |  14 +++
 .../org/apache/drill/exec/rpc/RpcConstants.java |   1 +
 .../org/apache/drill/exec/rpc/RpcDecoder.java   |  14 +--
 .../org/apache/drill/exec/rpc/RpcEncoder.java   |  37 +++---
 .../org/apache/drill/exec/rpc/RpcOutcome.java   |   4 +-
 .../drill/exec/rpc/RpcOutcomeListener.java      |   4 +-
 .../exec/rpc/ZeroCopyProtobufLengthDecoder.java |  73 ++++++------
 .../drill/exec/rpc/bit/FutureBitCommand.java    |   6 +-
 .../drill/exec/rpc/bit/ListeningBitCommand.java |  11 +-
 .../drill/exec/rpc/user/QueryResultBatch.java   |   1 +
 .../drill/exec/rpc/user/QueryResultHandler.java |   2 +-
 .../drill/exec/vector/BaseDataValueVector.java  |   5 +-
 .../work/foreman/RunningFragmentManager.java    |   4 +-
 28 files changed, 249 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionRegistry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionRegistry.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionRegistry.java
index 28fa2db..2808356 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionRegistry.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/FunctionRegistry.java
@@ -47,7 +47,7 @@ public class FunctionRegistry {
           for(String rn : d.getRegisteredNames()){
             
             FunctionDefinition d2 = funcs.put(rn, d);
-            logger.debug("Registering function {}", d);
+//            logger.debug("Registering function {}", d);
             if(d2 != null){
               throw new ExceptionInInitializerError(String.format("Failure while registering functions.  The function %s tried to register with the name %s but the function %s already registered with that name.", d.getName(), rn, d2.getName()) );
             }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/pom.xml b/sandbox/prototype/exec/java-exec/pom.xml
index a2baa32..8893044 100644
--- a/sandbox/prototype/exec/java-exec/pom.xml
+++ b/sandbox/prototype/exec/java-exec/pom.xml
@@ -116,7 +116,7 @@
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty-all</artifactId>
-      <version>4.0.0.CR2</version>
+      <version>4.0.3.Final</version>
     </dependency>
     <dependency>
       <groupId>com.google.protobuf</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 5d80ee4..b2503c1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Iterables.get;
 import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
 import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.nio.NioEventLoopGroup;
@@ -178,6 +179,11 @@ public class DrillClient implements Closeable{
     protected RpcException mapException(Exception e) {
       return RpcException.mapException(e);
     }
+
+    @Override
+    public ByteBuf getBuffer() {
+      return null;
+    }
     
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index a2b9865..231bc6c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.physical.impl;
 
+import io.netty.buffer.ByteBuf;
+
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -101,7 +103,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       }
 
       @Override
-      public void success(Ack value) {
+      public void success(Ack value, ByteBuf buf) {
         if(value.getOk()) return;
 
         logger.error("Downstream fragment was not accepted.  Stopping future sends.");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
index 0c2adcf..5e81e6a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
@@ -17,6 +17,10 @@
  ******************************************************************************/
 package org.apache.drill.exec.record;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufProcessor;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -26,12 +30,7 @@ import java.nio.channels.GatheringByteChannel;
 import java.nio.channels.ScatteringByteChannel;
 import java.nio.charset.Charset;
 
-import io.netty.buffer.BufType;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.ByteBufIndexFinder;
-
-public class DeadBuf implements ByteBuf {
+public class DeadBuf extends ByteBuf {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DeadBuf.class);
   
   private static final String ERROR_MESSAGE = "Attemped to access a DeadBuf. This would happen if you attempted to interact with a buffer that has been moved or not yet initialized.";
@@ -39,12 +38,8 @@ public class DeadBuf implements ByteBuf {
   public static final DeadBuf DEAD_BUFFER = new DeadBuf();
 
   private DeadBuf(){}
-  
-  @Override
-  public BufType type() {
-    throw new UnsupportedOperationException(ERROR_MESSAGE);
-  }
 
+  
   @Override
   public boolean isReadable(int size) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
@@ -156,23 +151,11 @@ public class DeadBuf implements ByteBuf {
   }
 
   @Override
-  @Deprecated
-  public boolean readable() {
-    throw new UnsupportedOperationException(ERROR_MESSAGE);
-  }
-
-  @Override
   public boolean isWritable() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
-  @Deprecated
-  public boolean writable() {
-    throw new UnsupportedOperationException(ERROR_MESSAGE);
-  }
-
-  @Override
   public ByteBuf clear() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
@@ -213,12 +196,6 @@ public class DeadBuf implements ByteBuf {
   }
 
   @Override
-  @Deprecated
-  public ByteBuf ensureWritableBytes(int minWritableBytes) {
-    throw new UnsupportedOperationException(ERROR_MESSAGE);
-  }
-
-  @Override
   public int ensureWritable(int minWritableBytes, boolean force) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
@@ -677,48 +654,27 @@ public class DeadBuf implements ByteBuf {
 
   }
 
-  @Override
-  public int indexOf(int fromIndex, int toIndex, ByteBufIndexFinder indexFinder) {
-    throw new UnsupportedOperationException(ERROR_MESSAGE);
-
-  }
-
+  
   @Override
   public int bytesBefore(byte value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
 
   }
 
-  @Override
-  public int bytesBefore(ByteBufIndexFinder indexFinder) {
-    throw new UnsupportedOperationException(ERROR_MESSAGE);
-
-  }
-
-  @Override
+    @Override
   public int bytesBefore(int length, byte value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
 
   }
 
-  @Override
-  public int bytesBefore(int length, ByteBufIndexFinder indexFinder) {
-    throw new UnsupportedOperationException(ERROR_MESSAGE);
-
-  }
-
+  
   @Override
   public int bytesBefore(int index, int length, byte value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
 
   }
 
-  @Override
-  public int bytesBefore(int index, int length, ByteBufIndexFinder indexFinder) {
-    throw new UnsupportedOperationException(ERROR_MESSAGE);
-
-  }
-
+  
   @Override
   public ByteBuf copy() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
@@ -822,27 +778,67 @@ public class DeadBuf implements ByteBuf {
   }
 
   @Override
-  public ByteBuf suspendIntermediaryDeallocations() {
+  public int compareTo(ByteBuf buffer) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
-  public ByteBuf resumeIntermediaryDeallocations() {
+  public ByteBuf retain(int increment) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
   @Override
-  public int compareTo(ByteBuf buffer) {
+  public ByteBuf retain() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
   }
 
+
   @Override
-  public ByteBuf retain(int increment) {
-    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  public boolean equals(Object arg0) {
+    return false;
   }
 
+
   @Override
-  public ByteBuf retain() {
-    throw new UnsupportedOperationException(ERROR_MESSAGE);
+  public int forEachByte(ByteBufProcessor arg0) {
+    return 0;
+  }
+
+
+  @Override
+  public int forEachByte(int arg0, int arg1, ByteBufProcessor arg2) {
+    return 0;
   }
+
+
+  @Override
+  public int forEachByteDesc(ByteBufProcessor arg0) {
+    return 0;
+  }
+
+
+  @Override
+  public int forEachByteDesc(int arg0, int arg1, ByteBufProcessor arg2) {
+    return 0;
+  }
+
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+
+  @Override
+  public ByteBuffer internalNioBuffer(int arg0, int arg1) {
+    return null;
+  }
+
+
+  @Override
+  public String toString() {
+    return null;
+  }
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index 4f87224..164bf59 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -31,6 +31,7 @@ public class RawFragmentBatch {
     super();
     this.header = header;
     this.body = body;
+    if(body != null) body.retain();
   }
 
   public FragmentRecordBatch getHeader() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
index ea591da..112b537 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
@@ -17,20 +17,21 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
-import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+import io.netty.handler.codec.MessageToMessageDecoder;
+
+import java.util.List;
 
 import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
-public abstract class AbstractHandshakeHandler<T extends MessageLite> extends ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
+public abstract class AbstractHandshakeHandler<T extends MessageLite> extends MessageToMessageDecoder<InboundRpcMessage> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractHandshakeHandler.class);
 
   protected final EnumLite handshakeType;
   protected final Parser<T> parser;
-  protected int coordinationId;
+  protected volatile int coordinationId;
 
   public AbstractHandshakeHandler(EnumLite handshakeType, Parser<T> parser) {
     super();
@@ -38,8 +39,10 @@ public abstract class AbstractHandshakeHandler<T extends MessageLite> extends Ch
     this.parser = parser;
   }
 
+  
   @Override
-  public final void messageReceived(ChannelHandlerContext ctx, InboundRpcMessage inbound) throws Exception {
+  protected void decode(ChannelHandlerContext ctx, InboundRpcMessage inbound, List<Object> outputs) throws Exception {
+    if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Received handshake {}", inbound);
     this.coordinationId = inbound.coordinationId;
     ctx.channel().pipeline().remove(this);
     if (inbound.rpcType != handshakeType.getNumber())
@@ -47,10 +50,10 @@ public abstract class AbstractHandshakeHandler<T extends MessageLite> extends Ch
           handshakeType, handshakeType.getNumber(), inbound.rpcType));
   
     T msg = parser.parseFrom(inbound.getProtobufBodyAsIS());
-    consumeHandshake(ctx.channel(), msg);
+    consumeHandshake(ctx, msg);
     
   }
 
-  protected abstract void consumeHandshake(Channel c, T msg) throws Exception;
+  protected abstract void consumeHandshake(ChannelHandlerContext ctx, T msg) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
index 1dab1c7..da2d13c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BaseRpcOutcomeListener.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
+import io.netty.buffer.ByteBuf;
+
 public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRpcOutcomeListener.class);
 
@@ -25,7 +27,7 @@ public class BaseRpcOutcomeListener<T> implements RpcOutcomeListener<T> {
   }
 
   @Override
-  public void success(T value) {
+  public void success(T value, ByteBuf buffer) {
   }
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index 0afc5d0..b4ac993 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -31,7 +32,6 @@ import io.netty.util.concurrent.GenericFutureListener;
 
 import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
 
-import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.Internal.EnumLite;
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
@@ -70,8 +70,8 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
             
             ch.pipeline().addLast( //
                 new ZeroCopyProtobufLengthDecoder(), //
-                new RpcDecoder(rpcConfig.getName()), //
-                new RpcEncoder(rpcConfig.getName()), //
+                new RpcDecoder("c-" + rpcConfig.getName()), //
+                new RpcEncoder("c-" + rpcConfig.getName()), //
                 new ClientHandshakeHandler(), //
                 new InboundHandler(connection), //
                 new RpcExceptionHandler() //
@@ -165,7 +165,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
       }
 
       @Override
-      public void success(HANDSHAKE_RESPONSE value) {
+      public void success(HANDSHAKE_RESPONSE value, ByteBuf buffer) {
 //        logger.debug("Handshake received. {}", value);
         try {
           BasicClient.this.validateHandshake(value);
@@ -189,11 +189,11 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
     }
 
     @Override
-    protected final void consumeHandshake(Channel c, HANDSHAKE_RESPONSE msg) throws Exception {
+    protected final void consumeHandshake(ChannelHandlerContext ctx, HANDSHAKE_RESPONSE msg) throws Exception {
       // remove the handshake information from the queue so it doesn't sit there forever.
       RpcOutcome<HANDSHAKE_RESPONSE> response = queue.getFuture(handshakeType.getNumber(), coordinationId,
           responseClass);
-      response.set(msg);
+      response.set(msg, null);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index af5d9c9..5643f47 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.rpc;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
@@ -67,11 +67,11 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
             
             C connection = initRemoteConnection(ch);
             ch.closeFuture().addListener(getCloseHandler(connection));
-
+            
             ch.pipeline().addLast( //
                 new ZeroCopyProtobufLengthDecoder(), //
-                new RpcDecoder(rpcConfig.getName()), //
-                new RpcEncoder(rpcConfig.getName()), //
+                new RpcDecoder("s-" + rpcConfig.getName()), //
+                new RpcEncoder("s-" + rpcConfig.getName()), //
                 getHandshakeHandler(connection),
                 new InboundHandler(connection), //
                 new RpcExceptionHandler() //
@@ -97,9 +97,9 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
     }
 
     @Override
-    protected final void consumeHandshake(Channel c, T inbound) throws Exception {
+    protected final void consumeHandshake(ChannelHandlerContext ctx, T inbound) throws Exception {
       OutboundRpcMessage msg = new OutboundRpcMessage(RpcMode.RESPONSE, this.handshakeType, coordinationId, getHandshakeResponse(inbound));
-      c.write(msg);
+      ctx.writeAndFlush(msg);
     }
     
     public abstract MessageLite getHandshakeResponse(T inbound) throws Exception;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index 9edbe11..f36530f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -17,12 +17,11 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.GenericFutureListener;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
 
@@ -77,16 +76,18 @@ public class CoordinationQueue {
 
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
+      
       if(!future.isSuccess()){
         removeFromMap(coordinationId);
+        future.get();
       }
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public void set(Object value) {
+    public void set(Object value, ByteBuf buffer) {
       assert clazz.isAssignableFrom(value.getClass());
-      handler.success( (T) value);
+      handler.success( (T) value, buffer);
     }
 
     @Override
@@ -106,22 +107,10 @@ public class CoordinationQueue {
     
     
   }
-//  
-//  public <V> DrillRpcFutureImpl<V> getNewFuture(Class<V> clazz) {
-//    int i = circularInt.getNext();
-//    DrillRpcFutureImpl<V> future = DrillRpcFutureImpl.getNewFuture(i, clazz);
-//    // logger.debug("Writing to map coord {}, future {}", i, future);
-//    Object old = map.put(i, future);
-//    if (old != null)
-//      throw new IllegalStateException(
-//          "You attempted to reuse a coordination id when the previous coordination id has not been removed.  This is likely rpc future callback memory leak.");
-//    return future;
-//  }
 
   private RpcOutcome<?> removeFromMap(int coordinationId) {
     RpcOutcome<?> rpc = map.remove(coordinationId);
     if (rpc == null) {
-      logger.error("Rpc is null.");
       throw new IllegalStateException(
           "Attempting to retrieve an rpc that wasn't first stored in the rpc coordination queue.  This would most likely happen if you're opposite endpoint sent multiple messages on the same coordination id.");
     }
@@ -129,7 +118,7 @@ public class CoordinationQueue {
   }
 
   public <V> RpcOutcome<V> getFuture(int rpcType, int coordinationId, Class<V> clazz) {
-    // logger.debug("Getting future for coordinationId {} and class {}", coordinationId, clazz);
+
     RpcOutcome<?> rpc = removeFromMap(coordinationId);
     // logger.debug("Got rpc from map {}", rpc);
     Class<?> outcomeClass = rpc.getOutcomeType();
@@ -154,7 +143,11 @@ public class CoordinationQueue {
 
   public void updateFailedFuture(int coordinationId, RpcFailure failure) {
     // logger.debug("Updating failed future.");
-    RpcOutcome<?> rpc = removeFromMap(coordinationId);
-    rpc.setException(new RemoteRpcException(failure));
+    try{
+      RpcOutcome<?> rpc = removeFromMap(coordinationId);
+      rpc.setException(new RemoteRpcException(failure));
+    }catch(Exception ex){
+      logger.warn("Failed to remove from map.  Not a problem since we were updating on failed future.", ex);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
index 9033ea1..2918d29 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFuture.java
@@ -17,8 +17,12 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
+import io.netty.buffer.ByteBuf;
+
 import com.google.common.util.concurrent.CheckedFuture;
 
 public interface DrillRpcFuture<T> extends CheckedFuture<T,RpcException> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFuture.class);
+  
+  public ByteBuf getBuffer();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
index d5d3a9c..3b2452c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DrillRpcFutureImpl.java
@@ -17,15 +17,16 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
-import java.util.concurrent.ExecutionException;
+import io.netty.buffer.ByteBuf;
 
 import com.google.common.util.concurrent.AbstractCheckedFuture;
 import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ListenableFuture;
 
 class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> implements DrillRpcFuture<V>, RpcOutcomeListener<V>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRpcFutureImpl.class);
-
+  
+  private volatile ByteBuf buffer;
+  
   public DrillRpcFutureImpl() {
     super(new InnerFuture<V>());
   }
@@ -46,7 +47,7 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
     return RpcException.mapException(ex);
   }
 
-  public static class InnerFuture<T> extends AbstractFuture<T> {
+  private static class InnerFuture<T> extends AbstractFuture<T> {
     // we rewrite these so that the parent can see them
 
     void setValue(T value) {
@@ -64,10 +65,20 @@ class DrillRpcFutureImpl<V> extends AbstractCheckedFuture<V, RpcException> imple
   }
 
   @Override
-  public void success(V value) {
+  public void success(V value, ByteBuf buffer) {
+    this.buffer = buffer;
+    if(buffer != null) buffer.retain();
     ( (InnerFuture<V>)delegate()).setValue(value);
   }
 
+  @Override
+  public ByteBuf getBuffer() {
+    return buffer;
+  }
+
+  public void release(){
+    if(buffer != null) buffer.release();
+  }
 
   
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
index 7408516..b34cba7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/PositiveAtomicInteger.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class PositiveAtomicInteger {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PositiveAtomicInteger.class);
   
-  private final AtomicInteger internal = new AtomicInteger(Integer.MIN_VALUE);
+  private final AtomicInteger internal = new AtomicInteger(0);
   
   public int getNext(){
     int i = internal.addAndGet(1);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 9ce3f3c..b08aa96 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -22,16 +22,15 @@ import io.netty.buffer.ByteBufInputStream;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.Closeable;
 import java.util.Arrays;
-import java.util.concurrent.CancellationException;
+import java.util.List;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcFailure;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
-import org.slf4j.Logger;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.Internal.EnumLite;
@@ -85,8 +84,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
       Preconditions.checkNotNull(protobufBody);
       ChannelListenerWithCoordinationId futureListener = queue.get(listener, clazz);
       OutboundRpcMessage m = new OutboundRpcMessage(RpcMode.REQUEST, rpcType, futureListener.getCoordinationId(), protobufBody, dataBodies);
-      logger.debug("Outbound RPC message: {}.    DATA BODIES: {}", m, dataBodies);
-      ChannelFuture channelFuture = connection.getChannel().write(m);
+      ChannelFuture channelFuture = connection.getChannel().writeAndFlush(m);
       channelFuture.addListener(futureListener);
       completed = true;
     } finally {
@@ -115,14 +113,14 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
   }
 
   protected void closeQueueDueToChannelClose() {
-    if (this.isClient()) queue.channelClosed(new ChannelClosedException());
+    if (this.isClient()) queue.channelClosed(new ChannelClosedException("Queue closed due to channel closure."));
   }
 
   protected GenericFutureListener<ChannelFuture> getCloseHandler(C clientConnection) {
     return new ChannelClosedHandler();
   }
 
-  protected class InboundHandler extends ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
+  protected class InboundHandler extends MessageToMessageDecoder<InboundRpcMessage> {
 
     private final C connection;
     public InboundHandler(C connection) {
@@ -131,29 +129,37 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
     }
 
     @Override
-    public void messageReceived(ChannelHandlerContext ctx, InboundRpcMessage msg) throws Exception {
+    protected void decode(ChannelHandlerContext ctx, InboundRpcMessage msg, List<Object> output) throws Exception {
       if (!ctx.channel().isOpen()) return;
       if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Received message {}", msg);
       switch (msg.mode) {
       case REQUEST:
         // handle message and ack.
         Response r = handle(connection, msg.rpcType, msg.pBody, msg.dBody);
+        msg.pBody.release();
+        if(msg.dBody != null) msg.dBody.release(); // we release our ownership.  Handle could have taken over ownership.
         assert rpcConfig.checkResponseSend(r.rpcType, r.pBody.getClass());
         OutboundRpcMessage outMessage = new OutboundRpcMessage(RpcMode.RESPONSE, r.rpcType, msg.coordinationId,
             r.pBody, r.dBodies);
         if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Adding message to outbound buffer. {}", outMessage);
-        ctx.write(outMessage);
+        ctx.writeAndFlush(outMessage);
         break;
 
       case RESPONSE:
+        try{
         MessageLite m = getResponseDefaultInstance(msg.rpcType);
         assert rpcConfig.checkReceive(msg.rpcType, m.getClass());
         RpcOutcome<?> rpcFuture = queue.getFuture(msg.rpcType, msg.coordinationId, m.getClass());
         Parser<?> parser = m.getParserForType();
         Object value = parser.parseFrom(new ByteBufInputStream(msg.pBody, msg.pBody.readableBytes()));
-        rpcFuture.set(value);
+        msg.pBody.release();
+        rpcFuture.set(value, msg.dBody);
+        if(msg.dBody != null) msg.dBody.release();
         if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Updated rpc future {} with value {}", rpcFuture, value);
-
+        }catch(Exception ex){
+          logger.error("Failure while handling response.", ex);
+          throw ex;
+        }
         break;
 
       case RESPONSE_FAILURE:

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
index 7c300d3..8466c1c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcCheckedFuture.java
@@ -17,17 +17,31 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
+import io.netty.buffer.ByteBuf;
+
 import com.google.common.util.concurrent.AbstractCheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 
 public class RpcCheckedFuture<T> extends AbstractCheckedFuture<T, RpcException> implements DrillRpcFuture<T>{
+  
+  volatile ByteBuf buffer;
+  
   public RpcCheckedFuture(ListenableFuture<T> delegate) {
     super(delegate);
   }
 
+  public void set(T obj, ByteBuf buffer){
+    this.buffer = buffer;
+  }
+  
   @Override
   protected RpcException mapException(Exception e) {
     return RpcException.mapException(e);
   }
 
+  @Override
+  public ByteBuf getBuffer() {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
index 7753e07..b5b8df5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
@@ -22,5 +22,6 @@ public class RpcConstants {
   
   private RpcConstants(){}
   
+  public static final boolean SOME_DEBUGGING = false;
   public static final boolean EXTRA_DEBUGGING = false;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
index 4e9714b..b7c2e83 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcDecoder.java
@@ -19,12 +19,12 @@ package org.apache.drill.exec.rpc;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.MessageBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.CorruptedFrameException;
 import io.netty.handler.codec.MessageToMessageDecoder;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;
@@ -38,12 +38,12 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
   private final AtomicLong messageCounter = new AtomicLong();
   
   public RpcDecoder(String name){
-    this.logger = org.slf4j.LoggerFactory.getLogger(RpcDecoder.class.getCanonicalName() + "." + name);
+    this.logger = org.slf4j.LoggerFactory.getLogger(RpcDecoder.class.getCanonicalName() + "-" + name);
   }
 
   
   @Override
-  protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, MessageBuf<Object> out) throws Exception {
+  protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
     if(!ctx.channel().isOpen()){
       return;
     }
@@ -58,7 +58,6 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
     // read the rpc header, saved in delimited format.
     checkTag(is, RpcEncoder.HEADER_TAG);
     final RpcHeader header = RpcHeader.parseDelimitedFrom(is);
-    if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Read header. {}", header);
 
     if(RpcConstants.EXTRA_DEBUGGING) logger.debug(" post header read index {}", buffer.readerIndex());
     
@@ -67,7 +66,7 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
     final int pBodyLength = readRawVarint32(is);
     final ByteBuf pBody = buffer.slice(buffer.readerIndex(), pBodyLength);
     buffer.skipBytes(pBodyLength);
-    buffer.retain();
+    pBody.retain();
     if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Read protobuf body of length {} into buffer {}.", pBodyLength, pBody);
 
     if(RpcConstants.EXTRA_DEBUGGING) logger.debug("post protobufbody read index {}", buffer.readerIndex());
@@ -80,11 +79,10 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
       
       if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Reading raw body, buffer has {} bytes available, is available {}.", buffer.readableBytes(), is.available());
       checkTag(is, RpcEncoder.RAW_BODY_TAG);
-      if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Reading length.");
       dBodyLength = readRawVarint32(is);
       if(buffer.readableBytes() != dBodyLength) throw new CorruptedFrameException(String.format("Expected to receive a raw body of %d bytes but received a buffer with %d bytes.", dBodyLength, buffer.readableBytes()));
       dBody = buffer.slice();
-      buffer.retain();
+      dBody.retain();
       if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Read raw body of {}", dBody);
       
     }else{
@@ -99,7 +97,7 @@ class RpcDecoder extends MessageToMessageDecoder<ByteBuf> {
     // move the reader index forward so the next rpc call won't try to work with it.
     buffer.skipBytes(dBodyLength);
     messageCounter.incrementAndGet();
-    if (RpcConstants.EXTRA_DEBUGGING) logger.trace("Inbound Rpc Message Decoded {}.", m);
+    if (RpcConstants.SOME_DEBUGGING) logger.debug("Inbound Rpc Message Decoded {}.", m);
     out.add(m);
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
index f76d648..6c0158a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcEncoder.java
@@ -20,9 +20,10 @@ package org.apache.drill.exec.rpc;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufOutputStream;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundMessageHandlerAdapter;
+import io.netty.handler.codec.MessageToMessageEncoder;
 
 import java.io.OutputStream;
+import java.util.List;
 
 import org.apache.drill.exec.proto.GeneralRPCProtos.CompleteRpcMessage;
 import org.apache.drill.exec.proto.GeneralRPCProtos.RpcHeader;
@@ -33,7 +34,7 @@ import com.google.protobuf.WireFormat;
 /**
  * Converts an RPCMessage into wire format.
  */
-class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage>{
+class RpcEncoder extends MessageToMessageEncoder<OutboundRpcMessage>{
   final org.slf4j.Logger logger;
   
   static final int HEADER_TAG = makeTag(CompleteRpcMessage.HEADER_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
@@ -44,12 +45,16 @@ class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage
   static final int RAW_BODY_TAG_LENGTH = getRawVarintSize(RAW_BODY_TAG);
   
   public RpcEncoder(String name){
-    this.logger = org.slf4j.LoggerFactory.getLogger(RpcEncoder.class.getCanonicalName() + "." + name);
+    this.logger = org.slf4j.LoggerFactory.getLogger(RpcEncoder.class.getCanonicalName() + "-" + name);
   }
   
   @Override
-  public void flush(ChannelHandlerContext ctx, OutboundRpcMessage msg) throws Exception {
+  protected void encode(ChannelHandlerContext ctx, OutboundRpcMessage msg, List<Object> out) throws Exception {
+    if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Rpc Encoder called with msg {}", msg);
+    
     if(!ctx.channel().isOpen()){
+      //output.add(ctx.alloc().buffer(0));
+      logger.debug("Channel closed, skipping encode.");
       return;
     }
     
@@ -73,8 +78,7 @@ class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage
         fullLength += (RAW_BODY_TAG_LENGTH + getRawVarintSize(rawBodyLength) + rawBodyLength);
       }
 
-      // set up buffers.
-      ByteBuf buf = ctx.nextOutboundByteBuffer();
+      ByteBuf buf = ctx.alloc().buffer();
       OutputStream os = new ByteBufOutputStream(buf);
       CodedOutputStream cos = CodedOutputStream.newInstance(os);
 
@@ -90,26 +94,31 @@ class RpcEncoder extends ChannelOutboundMessageHandlerAdapter<OutboundRpcMessage
       cos.writeRawVarint32(PROTOBUF_BODY_TAG);
       cos.writeRawVarint32(protoBodyLength);
       msg.pBody.writeTo(cos);
-
+      
       // if exists, write data body and tag.
-      // TODO: is it possible to avoid this copy, i think so...
       if(msg.getRawBodySize() > 0){
         if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Writing raw body of size {}", msg.getRawBodySize());
+        
         cos.writeRawVarint32(RAW_BODY_TAG);
         cos.writeRawVarint32(rawBodyLength);
         cos.flush(); // need to flush so that dbody goes after if cos is caching.
-        for(int i =0; i < msg.dBodies.length; i++){
-          buf.writeBytes(msg.dBodies[i]);  
+        
+        out.add(buf);
+        for(ByteBuf b : msg.dBodies){
+          out.add(b);
         }
+        
       }else{
         cos.flush();
+        out.add(buf);
       }
-      if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Wrote message with length header of {} bytes and body of {} bytes.", getRawVarintSize(fullLength), fullLength);
+      
+      if(RpcConstants.SOME_DEBUGGING) logger.debug("Wrote message length {}:{} bytes (head:body).  Message: " + msg, getRawVarintSize(fullLength), fullLength);
       if(RpcConstants.EXTRA_DEBUGGING) logger.debug("Sent message.  Ending writer index was {}.", buf.writerIndex());
-    
+      
     }finally{
-      // make sure to release Rpc Messages unerlying byte buffers.
-      msg.release();
+      // make sure to release Rpc Messages underlying byte buffers.
+      //msg.release();
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
index a25e5e7..112924d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcome.java
@@ -17,10 +17,12 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
+import io.netty.buffer.ByteBuf;
+
 public interface RpcOutcome<T> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcOutcome.class);
   
-  public void set(Object value);
+  public void set(Object value, ByteBuf buffer);
   public void setException(Throwable t);
   public Class<T> getOutcomeType();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
index 771edcf..7633316 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcOutcomeListener.java
@@ -17,10 +17,12 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc;
 
+import io.netty.buffer.ByteBuf;
+
 public interface RpcOutcomeListener<V> {
   
   public void failed(RpcException ex);
-  public void success(V value);
+  public void success(V value, ByteBuf buffer);
   
   
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
index 318abb1..6186056 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ZeroCopyProtobufLengthDecoder.java
@@ -18,11 +18,12 @@
 package org.apache.drill.exec.rpc;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.MessageBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.CorruptedFrameException;
 
+import java.util.List;
+
 import com.google.protobuf.CodedInputStream;
 
 /**
@@ -31,54 +32,58 @@ import com.google.protobuf.CodedInputStream;
 public class ZeroCopyProtobufLengthDecoder extends ByteToMessageDecoder {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZeroCopyProtobufLengthDecoder.class);
 
-  
   @Override
-  protected void decode(ChannelHandlerContext ctx, ByteBuf in, MessageBuf<Object> out) throws Exception {
+  protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
 
-    if(!ctx.channel().isOpen()){
-      if(in.readableBytes() > 0) logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
+    if (!ctx.channel().isOpen()) {
+      if (in.readableBytes() > 0)
+        logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
       in.skipBytes(in.readableBytes());
       return;
     }
-    
+
     in.markReaderIndex();
     final byte[] buf = new byte[5];
-    for (int i = 0; i < buf.length; i ++) {
-        if (!in.isReadable()) {
-            in.resetReaderIndex();
-            return;
+    for (int i = 0; i < buf.length; i++) {
+      if (!in.isReadable()) {
+        in.resetReaderIndex();
+        return;
+      }
+
+      buf[i] = in.readByte();
+      if (buf[i] >= 0) {
+
+        int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
+
+        if (length < 0) {
+          throw new CorruptedFrameException("negative length: " + length);
+        }
+        if (length == 0) {
+          throw new CorruptedFrameException("Received a message of length 0.");
         }
 
-        buf[i] = in.readByte();
-        if (buf[i] >= 0) {
+        if (in.readableBytes() < length) {
+          in.resetReaderIndex();
+          return;
+        } else {
+          // need to make buffer copy, otherwise netty will try to refill this buffer if we move the readerIndex forward...
+          // TODO: Can we avoid this copy?
+          ByteBuf outBuf = in.copy(in.readerIndex(), length);
+          in.skipBytes(length);
           
-            int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
-            
-            if (length < 0) {
-                throw new CorruptedFrameException("negative length: " + length);
-            }
-            if (length == 0){
-                throw new CorruptedFrameException("Received a message of length 0.");
-            }
-
-            if (in.readableBytes() < length) {
-                in.resetReaderIndex();
-                return;
-            } else {
-                ByteBuf outBuf = in.slice(in.readerIndex(), length);
-                in.retain();
-                in.skipBytes(length);
-                if(RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format("ReaderIndex is %d after length header of %d bytes and frame body of length %d bytes.", in.readerIndex(), i+1, length));
-                out.add(outBuf);
-                return;
-            }
+          if (RpcConstants.EXTRA_DEBUGGING)
+            logger.debug(String.format(
+                "ReaderIndex is %d after length header of %d bytes and frame body of length %d bytes.",
+                in.readerIndex(), i + 1, length));
+          out.add(outBuf);
+          return;
         }
+      }
     }
 
     // Couldn't find the byte whose MSB is off.
     throw new CorruptedFrameException("length wider than 32-bit");
-    
-  }
 
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
index fa3b518..7914fc2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/FutureBitCommand.java
@@ -17,13 +17,13 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
+import io.netty.buffer.ByteBuf;
+
 import org.apache.drill.exec.rpc.DrillRpcFuture;
 import org.apache.drill.exec.rpc.RpcCheckedFuture;
-import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
-import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.MessageLite;
 
@@ -59,7 +59,7 @@ public abstract class FutureBitCommand<T extends MessageLite> implements BitComm
     }
 
     @Override
-    public void success(T value) {
+    public void success(T value, ByteBuf buf) {
       settableFuture.set(value);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
index 90db6a6..b322105 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/ListeningBitCommand.java
@@ -17,14 +17,11 @@
  ******************************************************************************/
 package org.apache.drill.exec.rpc.bit;
 
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.RpcCheckedFuture;
-import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import io.netty.buffer.ByteBuf;
+
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.MessageLite;
 
 public abstract class ListeningBitCommand<T extends MessageLite> implements BitCommand<T> {
@@ -57,8 +54,8 @@ public abstract class ListeningBitCommand<T extends MessageLite> implements BitC
     }
 
     @Override
-    public void success(T value) {
-      listener.success(value);
+    public void success(T value, ByteBuf buf) {
+      listener.success(value, buf);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
index 779085c..48b0dae 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
@@ -31,6 +31,7 @@ public class QueryResultBatch {
 //    logger.debug("New Result Batch with header {} and data {}", header, data);
     this.header = header;
     this.data = data;
+    if(data != null) data.retain();
   }
 
   public QueryResult getHeader() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 0aa7c86..36fd199 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -123,7 +123,7 @@ public class QueryResultHandler {
     }
 
     @Override
-    public void success(QueryId queryId) {
+    public void success(QueryId queryId, ByteBuf buf) {
       logger.debug("Received QueryId {} succesfully.  Adding listener {}", queryId, listener);
       UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index 646640a..ad54a07 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -31,7 +31,10 @@ abstract class BaseDataValueVector extends BaseValueVector{
   
   @Override
   public ByteBuf[] getBuffers(){
-    return new ByteBuf[]{data};
+    ByteBuf[] out = new ByteBuf[]{data};
+    data.readerIndex(0);
+    clear();
+    return out;
   }
   
   public int getBufferSize() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/430e0c0d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
index 1a4bc6c..65fc8c7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.work.foreman;
 
+import io.netty.buffer.ByteBuf;
+
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -185,7 +187,7 @@ class RunningFragmentManager implements FragmentStatusListener{
     }
 
     @Override
-    public void success(Ack value) {
+    public void success(Ack value, ByteBuf buf) {
       if(!value.getOk()){
         logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
       }


[2/2] git commit: Improve projection, filter and svremover tests. Fix bugs introduced by addition of accessors.

Posted by ja...@apache.org.
Improve projection, filter and svremover tests.
Fix bugs introduced by addition of accessors.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/93ddf266
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/93ddf266
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/93ddf266

Branch: refs/heads/master
Commit: 93ddf2660a8bfe1e6cf938580fb5053de1547f44
Parents: 430e0c0
Author: Jacques Nadeau <ja...@apache.org>
Authored: Mon Jul 22 21:55:15 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Jul 22 22:01:48 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/drill/exec/expr/EvaluationVisitor.java  | 8 ++++----
 .../drill/exec/physical/impl/project/ProjectorTemplate.java | 4 ++--
 .../drill/exec/physical/impl/filter/TestSimpleFilter.java   | 8 +++++++-
 .../exec/physical/impl/project/TestSimpleProjection.java    | 9 ++++++++-
 .../drill/exec/physical/impl/svremover/TestSVRemover.java   | 9 ++++++++-
 5 files changed, 29 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/93ddf266/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index c9e3c22..664940d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -153,11 +153,11 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
     generator.getSetupBlock().assign(vv, JExpr.cast(vvType, obj));
     
     if(hc.isOptional()){
-      vv.invoke("set").arg(JExpr.direct("outIndex"));
+      vv.invoke("getMutator").invoke("set").arg(JExpr.direct("outIndex"));
       JConditional jc = block._if(hc.getIsSet().eq(JExpr.lit(0)).not());
       block = jc._then();
     }
-    block.add(vv.invoke("set").arg(JExpr.direct("outIndex")).arg(hc.getValue()));
+    block.add(vv.invoke("getMutator").invoke("set").arg(JExpr.direct("outIndex")).arg(hc.getValue()));
 
     return null;
   }
@@ -185,13 +185,13 @@ public class EvaluationVisitor extends AbstractExprVisitor<HoldingContainer, Cod
       blk.assign(out.getIsSet(), vv1.invoke("isSet").arg(JExpr.direct("inIndex")));
       JConditional jc = blk._if(out.getIsSet());
       jc._then() //
-        .assign(out.getValue(), vv1.invoke("get").arg(JExpr.direct("inIndex"))); //
+        .assign(out.getValue(), vv1.invoke("getAccessor").invoke("get").arg(JExpr.direct("inIndex"))); //
         //.assign(out.getIsSet(), JExpr.lit(1));
       //jc._else()
         //.assign(out.getIsSet(), JExpr.lit(0));
       
     }else{
-      generator.getBlock().assign(out.getValue(), vv1.invoke("get").arg(JExpr.direct("inIndex")));
+      generator.getBlock().assign(out.getValue(), vv1.invoke("getAccessor").invoke("get").arg(JExpr.direct("inIndex")));
     }
     return out;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/93ddf266/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index 735d355..646e6d1 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -68,10 +68,10 @@ public abstract class ProjectorTemplate implements Projector {
       break;
     }
     this.transfers = ImmutableList.copyOf(transfers);
-    setupEval(context, incoming, outgoing);
+    doSetup(context, incoming, outgoing);
   }
 
-  protected abstract void setupEval(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
+  protected abstract void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing) throws SchemaChangeException;
   protected abstract void doEval(int inIndex, int outIndex);
 
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/93ddf266/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
index 96a6139..a905a85 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -31,7 +31,7 @@ public class TestSimpleFilter {
   
   
   @Test
-  public void testFilter(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Exception{
+  public void testFilter(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable{
 //    System.out.println(System.getProperty("java.class.path"));
 
     
@@ -49,6 +49,12 @@ public class TestSimpleFilter {
     while(exec.next()){
       assertEquals(50, exec.getSelectionVector2().getCount());
     }
+    
+    if(context.getFailureCause() != null){
+      throw context.getFailureCause();
+    }
+    assertTrue(!context.isFailed());
+
   }
   
   @After

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/93ddf266/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
index f4900e1..79218d1 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/project/TestSimpleProjection.java
@@ -1,6 +1,7 @@
 package org.apache.drill.exec.physical.impl.project;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import mockit.Injectable;
 import mockit.NonStrictExpectations;
 
@@ -34,7 +35,7 @@ public class TestSimpleProjection {
   
   
   @Test
-  public void project(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Exception{
+  public void project(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable{
 
 
     new NonStrictExpectations(){{
@@ -48,6 +49,7 @@ public class TestSimpleProjection {
     FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
     FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
     SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
     while(exec.next()){
       BigIntVector c1 = exec.getValueVectorById(new SchemaPath("col1", ExpressionPosition.UNKNOWN), BigIntVector.class);
       BigIntVector c2 = exec.getValueVectorById(new SchemaPath("col2", ExpressionPosition.UNKNOWN), BigIntVector.class);
@@ -63,6 +65,11 @@ public class TestSimpleProjection {
       
       System.out.println(x);
     }
+
+    if(context.getFailureCause() != null){
+      throw context.getFailureCause();
+    }
+    assertTrue(!context.isFailed());
   }
   
   @After

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/93ddf266/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
index f417b91..2dafd0a 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/svremover/TestSVRemover.java
@@ -1,6 +1,7 @@
 package org.apache.drill.exec.physical.impl.svremover;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import mockit.Injectable;
 import mockit.NonStrictExpectations;
 
@@ -32,7 +33,7 @@ public class TestSVRemover {
   
   
   @Test
-  public void testSelectionVectorRemoval(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Exception{
+  public void testSelectionVectorRemoval(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable{
 //    System.out.println(System.getProperty("java.class.path"));
 
 
@@ -54,6 +55,12 @@ public class TestSVRemover {
         assertEquals(count, a.getValueCount());
       }
     }
+    
+    if(context.getFailureCause() != null){
+      throw context.getFailureCause();
+    }
+    assertTrue(!context.isFailed());
+
   }
   
   @After