You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/04/04 04:37:31 UTC

[5/9] drill git commit: DRILL-2498: Separate QueryResult into two messages QueryResult and QueryData

DRILL-2498: Separate QueryResult into two messages QueryResult and QueryData


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1d9d82b0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1d9d82b0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1d9d82b0

Branch: refs/heads/master
Commit: 1d9d82b001810605e3f94ab3a5517dc0ed739715
Parents: 10be89f
Author: adeneche <ad...@gmail.com>
Authored: Fri Mar 20 12:39:15 2015 -0700
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Fri Apr 3 18:27:50 2015 -0700

----------------------------------------------------------------------
 .../org/apache/drill/hbase/BaseHBaseTest.java   |   10 +-
 .../drill/hbase/TestHBaseCFAsJSONString.java    |    5 +-
 .../apache/drill/exec/fn/hive/TestHiveUDFs.java |   12 +-
 .../drill/exec/fn/hive/TestSampleHiveUDFs.java  |    4 +-
 .../apache/drill/exec/client/DrillClient.java   |   34 +-
 .../exec/client/PrintingResultsListener.java    |   31 +-
 .../drill/exec/physical/impl/ScreenCreator.java |   67 +-
 .../impl/materialize/QueryWritableBatch.java    |    8 +-
 .../impl/materialize/RecordMaterializer.java    |    2 +-
 .../materialize/VectorRecordMaterializer.java   |    8 +-
 .../drill/exec/rpc/user/QueryDataBatch.java     |   62 +
 .../drill/exec/rpc/user/QueryResultBatch.java   |   62 -
 .../drill/exec/rpc/user/QueryResultHandler.java |  261 +--
 .../apache/drill/exec/rpc/user/UserClient.java  |   14 +-
 .../exec/rpc/user/UserResultsListener.java      |   28 +-
 .../drill/exec/rpc/user/UserRpcConfig.java      |    4 +-
 .../apache/drill/exec/rpc/user/UserServer.java  |   16 +-
 .../drill/exec/server/rest/QueryWrapper.java    |   15 +-
 .../apache/drill/exec/work/foreman/Foreman.java |    4 +-
 .../java/org/apache/drill/BaseTestQuery.java    |   38 +-
 .../java/org/apache/drill/DrillTestWrapper.java |   36 +-
 .../java/org/apache/drill/PlanTestBase.java     |    6 +-
 .../java/org/apache/drill/QueryTestUtil.java    |    8 +-
 .../org/apache/drill/SingleRowListener.java     |   43 +-
 .../drill/exec/TestQueriesOnLargeFile.java      |   10 +-
 .../exec/client/DrillClientSystemTest.java      |   10 +-
 .../exec/fn/impl/TestAggregateFunction.java     |    8 +-
 .../drill/exec/fn/impl/TestDateFunctions.java   |    8 +-
 .../drill/exec/fn/impl/TestMultiInputAdd.java   |   11 +-
 .../exec/fn/impl/TestNewAggregateFunctions.java |    8 +-
 .../physical/impl/TestBroadcastExchange.java    |   10 +-
 .../exec/physical/impl/TestCastFunctions.java   |    8 +-
 .../physical/impl/TestCastVarCharToBigInt.java  |   10 +-
 .../physical/impl/TestConvertFunctions.java     |   11 +-
 .../drill/exec/physical/impl/TestDecimal.java   |   38 +-
 .../impl/TestDistributedFragmentRun.java        |   18 +-
 .../physical/impl/TestExtractFunctions.java     |    8 +-
 .../physical/impl/TestHashToRandomExchange.java |    6 +-
 .../exec/physical/impl/TestOptiqPlans.java      |   22 +-
 .../physical/impl/TestReverseImplicitCast.java  |    8 +-
 .../physical/impl/TestSimpleFragmentRun.java    |   10 +-
 .../exec/physical/impl/TestUnionExchange.java   |    6 +-
 .../exec/physical/impl/TopN/TestSimpleTopN.java |    8 +-
 .../exec/physical/impl/join/TestHashJoin.java   |   26 +-
 .../exec/physical/impl/join/TestMergeJoin.java  |   18 +-
 .../impl/join/TestMergeJoinMulCondition.java    |   14 +-
 .../impl/mergereceiver/TestMergingReceiver.java |   14 +-
 .../TestOrderedPartitionExchange.java           |    6 +-
 .../physical/impl/writer/TestParquetWriter.java |    2 +-
 .../exec/physical/impl/writer/TestWriter.java   |   12 +-
 .../impl/xsort/TestSimpleExternalSort.java      |   26 +-
 .../drill/exec/record/vector/TestDateTypes.java |   40 +-
 .../exec/server/TestDrillbitResilience.java     |   10 +-
 .../store/parquet/ParquetRecordReaderTest.java  |   10 +-
 .../store/parquet/ParquetResultListener.java    |   20 +-
 .../store/parquet/TestParquetPhysicalPlan.java  |   18 +-
 .../drill/exec/store/text/TestTextColumn.java   |   10 +-
 .../exec/store/text/TextRecordReaderTest.java   |    6 +-
 .../fn/TestJsonReaderWithSparseFiles.java       |    8 +-
 .../complex/writer/TestComplexToJson.java       |    8 +-
 .../vector/complex/writer/TestJsonReader.java   |   22 +-
 .../exec/work/batch/TestSpoolingBuffer.java     |    6 +-
 .../java/org/apache/drill/jdbc/DrillCursor.java |    4 +-
 .../org/apache/drill/jdbc/DrillResultSet.java   |   43 +-
 .../drill/exec/proto/SchemaUserBitShared.java   |  195 +-
 .../apache/drill/exec/proto/UserBitShared.java  | 1948 ++++++++----------
 .../org/apache/drill/exec/proto/UserProtos.java |   47 +-
 .../drill/exec/proto/beans/QueryData.java       |  211 ++
 .../drill/exec/proto/beans/QueryResult.java     |  194 +-
 .../apache/drill/exec/proto/beans/RpcType.java  |    8 +-
 protocol/src/main/protobuf/User.proto           |    8 +-
 protocol/src/main/protobuf/UserBitShared.proto  |   22 +-
 72 files changed, 1898 insertions(+), 2054 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index b955d3b..df83c56 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.hbase.HBaseStoragePlugin;
 import org.apache.drill.exec.store.hbase.HBaseStoragePluginConfig;
@@ -76,22 +76,22 @@ public class BaseHBaseTest extends BaseTestQuery {
 
   protected void runHBasePhysicalVerifyCount(String planFile, String tableName, int expectedRowCount) throws Exception{
     String physicalPlan = getPlanText(planFile, tableName);
-    List<QueryResultBatch> results = testPhysicalWithResults(physicalPlan);
+    List<QueryDataBatch> results = testPhysicalWithResults(physicalPlan);
     printResultAndVerifyRowCount(results, expectedRowCount);
   }
 
-  protected List<QueryResultBatch> runHBaseSQLlWithResults(String sql) throws Exception {
+  protected List<QueryDataBatch> runHBaseSQLlWithResults(String sql) throws Exception {
     sql = canonizeHBaseSQL(sql);
     System.out.println("Running query:\n" + sql);
     return testSqlWithResults(sql);
   }
 
   protected void runHBaseSQLVerifyCount(String sql, int expectedRowCount) throws Exception{
-    List<QueryResultBatch> results = runHBaseSQLlWithResults(sql);
+    List<QueryDataBatch> results = runHBaseSQLlWithResults(sql);
     printResultAndVerifyRowCount(results, expectedRowCount);
   }
 
-  private void printResultAndVerifyRowCount(List<QueryResultBatch> results, int expectedRowCount) throws SchemaChangeException {
+  private void printResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount) throws SchemaChangeException {
     int rowCount = printResult(results);
     if (expectedRowCount != -1) {
       Assert.assertEquals(expectedRowCount, rowCount);

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
index 6fe1525..7873b80 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
@@ -21,7 +21,8 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -49,7 +50,7 @@ public class TestHBaseCFAsJSONString extends BaseHBaseTest {
   @Test
   public void testColumnFamiliesAsJSONString() throws Exception {
     setColumnWidths(new int[] {112, 12});
-    List<QueryResultBatch> resultList = runHBaseSQLlWithResults("SELECT f, f2 FROM hbase.`[TABLE_NAME]` tableName LIMIT 1");
+    List<QueryDataBatch> resultList = runHBaseSQLlWithResults("SELECT f, f2 FROM hbase.`[TABLE_NAME]` tableName LIMIT 1");
     printResult(resultList);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestHiveUDFs.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestHiveUDFs.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestHiveUDFs.java
index e134aac..3ce9a6d 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestHiveUDFs.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestHiveUDFs.java
@@ -24,10 +24,8 @@ import java.util.List;
 
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
-import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.vector.Float4Vector;
-import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.NullableFloat8Vector;
 import org.apache.drill.exec.vector.NullableIntVector;
 import org.apache.drill.exec.vector.NullableVar16CharVector;
@@ -44,10 +42,10 @@ public class TestHiveUDFs extends BaseTestQuery {
 
     int numRecords = 0;
     String planString = Resources.toString(Resources.getResource("functions/hive/GenericUDF.json"), Charsets.UTF_8);
-    List<QueryResultBatch> results = testPhysicalWithResults(planString);
+    List<QueryDataBatch> results = testPhysicalWithResults(planString);
 
     RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
-    for (QueryResultBatch result : results) {
+    for (QueryDataBatch result : results) {
       batchLoader.load(result.getHeader().getDef(), result.getData());
       if (batchLoader.getRecordCount() <= 0) {
         result.release();
@@ -115,10 +113,10 @@ public class TestHiveUDFs extends BaseTestQuery {
   public void testUDF() throws Throwable {
     int numRecords = 0;
     String planString = Resources.toString(Resources.getResource("functions/hive/UDF.json"), Charsets.UTF_8);
-    List<QueryResultBatch> results = testPhysicalWithResults(planString);
+    List<QueryDataBatch> results = testPhysicalWithResults(planString);
 
     RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
-    for (QueryResultBatch result : results) {
+    for (QueryDataBatch result : results) {
       batchLoader.load(result.getHeader().getDef(), result.getData());
       if (batchLoader.getRecordCount() <= 0) {
         result.release();

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java
index 9ef766f..f4b6351 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java
@@ -22,14 +22,14 @@ import static org.junit.Assert.assertTrue;
 import java.util.List;
 
 import org.apache.drill.exec.hive.HiveTestBase;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestSampleHiveUDFs extends HiveTestBase {
 
   private void helper(String query, String expected) throws Exception {
-    List<QueryResultBatch> results = testSqlWithResults(query);
+    List<QueryDataBatch> results = testSqlWithResults(query);
     String actual = getResultString(results, ",");
     assertTrue(String.format("Result:\n%s\ndoes not match:\n%s", actual, expected), expected.equals(actual));
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 6d4c86c..650a2eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -40,7 +40,6 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.UserProtos;
 import org.apache.drill.exec.proto.UserProtos.Property;
@@ -54,7 +53,7 @@ import org.apache.drill.exec.rpc.RpcConnectionHandler;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.TransportCheck;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserClient;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 
@@ -66,7 +65,7 @@ import com.google.common.util.concurrent.SettableFuture;
  * String into ByteBuf.
  */
 public class DrillClient implements Closeable, ConnectionThrottle {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
 
   DrillConfig config;
   private UserClient client;
@@ -249,7 +248,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
    * @return a handle for the query result
    * @throws RpcException
    */
-  public List<QueryResultBatch> runQuery(QueryType type, String plan) throws RpcException {
+  public List<QueryDataBatch> runQuery(QueryType type, String plan) throws RpcException {
     UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build();
     ListHoldingResultsListener listener = new ListHoldingResultsListener(query);
     client.submitQuery(listener, query);
@@ -294,8 +293,8 @@ public class DrillClient implements Closeable, ConnectionThrottle {
   }
 
   private class ListHoldingResultsListener implements UserResultsListener {
-    private Vector<QueryResultBatch> results = new Vector<>();
-    private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create();
+    private Vector<QueryDataBatch> results = new Vector<>();
+    private SettableFuture<List<QueryDataBatch>> future = SettableFuture.create();
     private UserProtos.RunQuery query ;
 
     public ListHoldingResultsListener(UserProtos.RunQuery query) {
@@ -321,6 +320,11 @@ public class DrillClient implements Closeable, ConnectionThrottle {
       }
     }
 
+    @Override
+    public void queryCompleted() {
+      future.set(results);
+    }
+
     private void fail(Exception ex) {
       logger.debug("Submission failed.", ex);
       future.setException(ex);
@@ -328,24 +332,12 @@ public class DrillClient implements Closeable, ConnectionThrottle {
     }
 
     @Override
-    public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
-      logger.debug(
-          "Result arrived:  Query state: {}.  Is last chunk: {}.  Result: {}",
-          result.getHeader().getQueryState(),
-          result.getHeader().getIsLastChunk(),
-          result );
+    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+      logger.debug("Result arrived:  Result: {}", result );
       results.add(result);
-      if (result.getHeader().getIsLastChunk()) {
-        future.set(results);
-      }
-      else {
-        assert QueryState.PENDING == result.getHeader().getQueryState()
-            : "For non-last chunk, expected query state of PENDING but got "
-              + result.getHeader().getQueryState();
-      }
     }
 
-    public List<QueryResultBatch> getResults() throws RpcException{
+    public List<QueryDataBatch> getResults() throws RpcException{
       try {
         return future.get();
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
index 926e703..98948af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
@@ -20,16 +20,18 @@ package org.apache.drill.exec.client;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.client.QuerySubmitter.Format;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.util.VectorUtil;
 
@@ -58,12 +60,21 @@ public class PrintingResultsListener implements UserResultsListener {
   }
 
   @Override
-  public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
-    int rows = result.getHeader().getRowCount();
-    if (result.getData() != null) {
-      count.addAndGet(rows);
+  public void queryCompleted() {
+    allocator.close();
+    latch.countDown();
+    System.out.println("Total rows returned: " + count.get());
+  }
+
+  @Override
+  public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+    final QueryData header = result.getHeader();
+    final DrillBuf data = result.getData();
+
+    if (data != null) {
+      count.addAndGet(header.getRowCount());
       try {
-        loader.load(result.getHeader().getDef(), result.getData());
+        loader.load(header.getDef(), data);
       } catch (SchemaChangeException e) {
         submissionFailed(new RpcException(e));
       }
@@ -82,15 +93,7 @@ public class PrintingResultsListener implements UserResultsListener {
       loader.clear();
     }
 
-    boolean isLastChunk = result.getHeader().getIsLastChunk();
     result.release();
-
-    if (isLastChunk) {
-      allocator.close();
-      latch.countDown();
-      System.out.println("Total rows returned: " + count.get());
-    }
-
   }
 
   public int await() throws Exception {

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 404c453..8038527 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -31,8 +31,7 @@ import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
 import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
@@ -44,7 +43,7 @@ import org.apache.drill.exec.work.ErrorHelper;
 import com.google.common.base.Preconditions;
 
 public class ScreenCreator implements RootCreator<Screen>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
 
 
 
@@ -67,6 +66,8 @@ public class ScreenCreator implements RootCreator<Screen>{
     final UserClientConnection connection;
     private RecordMaterializer materializer;
 
+    private boolean firstBatch = true;
+
     public enum Metric implements MetricDef {
       BYTES_SENT;
 
@@ -96,67 +97,45 @@ public class ScreenCreator implements RootCreator<Screen>{
       IterOutcome outcome = next(incoming);
       logger.trace("Screen Outcome {}", outcome);
       switch (outcome) {
-      case STOP: {
+      case STOP:
         this.internalStop();
-        boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
-        QueryResult header = QueryResult.newBuilder() //
-              .setQueryId(context.getHandle().getQueryId()) //
-              .setRowCount(0) //
-              .setQueryState(QueryState.FAILED)
-              .addError(ErrorHelper.logAndConvertMessageError(context.getIdentity(), "Query stopped.",
-                context.getFailureCause(), logger, verbose))
-              .setDef(RecordBatchDef.getDefaultInstance()) //
-              .setIsLastChunk(true) //
-              .build();
-          QueryWritableBatch batch = new QueryWritableBatch(header);
+        return false;
+      case NONE:
+        if (firstBatch) {
+          // this is the only data message sent to the client and may contain the schema
+          this.internalStop();
+          QueryWritableBatch batch;
+          QueryData header = QueryData.newBuilder() //
+            .setQueryId(context.getHandle().getQueryId()) //
+            .setRowCount(0) //
+            .setDef(RecordBatchDef.getDefaultInstance()) //
+            .build();
+          batch = new QueryWritableBatch(header);
+
           stats.startWait();
           try {
-            connection.sendResult(listener, batch);
+            connection.sendData(listener, batch);
           } finally {
             stats.stopWait();
           }
+          firstBatch = false; // we don't really need to set this. But who knows!
           sendCount.increment();
-
-          return false;
-      }
-      case NONE: {
-        this.internalStop();
-        QueryWritableBatch batch;
-        //TODO: At some point we should make this the last message.
-        //For the moment though, to detect memory leaks, we need to delay sending the
-        //COMPLETED message until the Foreman calls cleanup.
-        QueryResult header = QueryResult.newBuilder() //
-            .setQueryId(context.getHandle().getQueryId()) //
-            .setRowCount(0) //
-            //.setQueryState(QueryState.COMPLETED) //
-            .setDef(RecordBatchDef.getDefaultInstance()) //
-            .setIsLastChunk(true) //
-            .build();
-        batch = new QueryWritableBatch(header);
-        stats.startWait();
-        try {
-          connection.sendResult(listener, batch);
-        } finally {
-          stats.stopWait();
         }
-        sendCount.increment();
 
         return false;
-      }
       case OK_NEW_SCHEMA:
         materializer = new VectorRecordMaterializer(context, incoming);
         //$FALL-THROUGH$
       case OK:
-//        context.getStats().batchesCompleted.inc(1);
-//        context.getStats().recordsCompleted.inc(incoming.getRecordCount());
-        QueryWritableBatch batch = materializer.convertNext(false);
+        QueryWritableBatch batch = materializer.convertNext();
         updateStats(batch);
         stats.startWait();
         try {
-          connection.sendResult(listener, batch);
+          connection.sendData(listener, batch);
         } finally {
           stats.stopWait();
         }
+        firstBatch = false;
         sendCount.increment();
 
         return true;

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index 2a59e22..44a3489 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -21,15 +21,15 @@ import io.netty.buffer.ByteBuf;
 
 import java.util.Arrays;
 
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 
 public class QueryWritableBatch {
 //  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
 
-  private final QueryResult header;
+  private final QueryData header;
   private final ByteBuf[] buffers;
 
-  public QueryWritableBatch(QueryResult header, ByteBuf... buffers) {
+  public QueryWritableBatch(QueryData header, ByteBuf... buffers) {
     this.header = header;
     this.buffers = buffers;
   }
@@ -46,7 +46,7 @@ public class QueryWritableBatch {
     return n;
   }
 
-  public QueryResult getHeader() {
+  public QueryData getHeader() {
     return header;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
index 221fc34..acb17ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
@@ -20,6 +20,6 @@ package org.apache.drill.exec.physical.impl.materialize;
 
 public interface RecordMaterializer {
 
-  public QueryWritableBatch convertNext(boolean isLast);
+  public QueryWritableBatch convertNext();
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
index cc1b3bf..3933ddd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.materialize;
 
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.WritableBatch;
@@ -41,14 +41,14 @@ public class VectorRecordMaterializer implements RecordMaterializer{
 //    }
   }
 
-  public QueryWritableBatch convertNext(boolean isLast) {
+  public QueryWritableBatch convertNext() {
     //batch.getWritableBatch().getDef().getRecordCount()
     WritableBatch w = batch.getWritableBatch();
 
-    QueryResult header = QueryResult.newBuilder() //
+    QueryData header = QueryData.newBuilder() //
         .setQueryId(queryId) //
         .setRowCount(batch.getRecordCount()) //
-        .setDef(w.getDef()).setIsLastChunk(isLast).build();
+        .setDef(w.getDef()).build();
     QueryWritableBatch batch = new QueryWritableBatch(header, w.getBuffers());
     return batch;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java
new file mode 100644
index 0000000..914bd00
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.DrillBuf;
+
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
+
+public class QueryDataBatch {
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultBatch.class);
+
+  private final QueryData header;
+  private final DrillBuf data;
+
+  public QueryDataBatch(QueryData header, DrillBuf data) {
+//    logger.debug("New Result Batch with header {} and data {}", header, data);
+    this.header = header;
+    this.data = data;
+    if (this.data != null) {
+      data.retain();
+    }
+  }
+
+  public QueryData getHeader() {
+    return header;
+  }
+
+  public DrillBuf getData() {
+    return data;
+  }
+
+  public boolean hasData() {
+    return data != null;
+  }
+
+  public void release() {
+    if (data != null) {
+      data.release();
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "QueryResultBatch [header=" + header + ", data=" + data + "]";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
deleted file mode 100644
index ab4c9ef..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc.user;
-
-import io.netty.buffer.DrillBuf;
-
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-
-public class QueryResultBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultBatch.class);
-
-  private final QueryResult header;
-  private final DrillBuf data;
-
-  public QueryResultBatch(QueryResult header, DrillBuf data) {
-//    logger.debug("New Result Batch with header {} and data {}", header, data);
-    this.header = header;
-    this.data = data;
-    if (this.data != null) {
-      data.retain();
-    }
-  }
-
-  public QueryResult getHeader() {
-    return header;
-  }
-
-  public DrillBuf getData() {
-    return data;
-  }
-
-  public boolean hasData() {
-    return data != null;
-  }
-
-  public void release() {
-    if (data != null) {
-      data.release();
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "QueryResultBatch [header=" + header + ", data=" + data + "]";
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index c05b127..a1be83b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -20,17 +20,13 @@ package org.apache.drill.exec.rpc.user;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.DrillBuf;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
-import javax.annotation.Nullable;
-
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
 import org.apache.drill.exec.rpc.RpcBus;
@@ -65,179 +61,137 @@ public class QueryResultHandler {
   private final ConcurrentMap<QueryId, UserResultsListener> queryIdToResultsListenersMap =
       Maps.newConcurrentMap();
 
-  /**
-   * Any is-last-chunk batch being deferred until the next batch
-   * (normally one with COMPLETED) arrives, per active query.
-   * <ul>
-   *   <li>Last-chunk batch is added (and not passed on) when it arrives.</li>
-   *   <li>Last-chunk batch is removed (and passed on) when next batch arrives
-   *       and has state {@link QueryState.COMPLETED}.</li>
-   *   <li>Last-chunk batch is removed (and not passed on) when next batch
-   *       arrives and has state {@link QueryState.CANCELED} or
-   *       {@link QueryState.FAILED}.</li>
-   * </ul>
-   */
-  private final Map<QueryId, QueryResultBatch> queryIdToDeferredLastChunkBatchesMap =
-      new ConcurrentHashMap<>();
-
-
   public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener resultsListener) {
     return new SubmissionListener(resultsListener);
   }
 
   /**
-   * Maps internal low-level API protocol to {@link UserResultsListener}-level
-   * API protocol, deferring sending is-last-chunk batches until (internal)
-   * COMPLETED batch.
+   * Maps internal low-level API protocol to {@link UserResultsListener}-level API protocol.
+   * handles data result messages
    */
-  public void batchArrived( ConnectionThrottle throttle,
-                            ByteBuf pBody, ByteBuf dBody ) throws RpcException {
+  public void resultArrived( ByteBuf pBody ) throws RpcException {
     final QueryResult queryResult = RpcBus.get( pBody, QueryResult.PARSER );
-    // Current batch coming in.  (Not necessarily passed along now or ever.)
-    final QueryResultBatch inputBatch = new QueryResultBatch( queryResult,
-                                                              (DrillBuf) dBody );
 
     final QueryId queryId = queryResult.getQueryId();
-    final QueryState queryState = inputBatch.getHeader().getQueryState();
+    final QueryState queryState = queryResult.getQueryState();
+
+    logger.debug( "resultArrived: queryState: {}, queryId = {}", queryState, queryId );
 
-    logger.debug( "batchArrived: isLastChunk: {}, queryState: {}, queryId = {}",
-                  inputBatch.getHeader().getIsLastChunk(), queryState, queryId );
-    logger.trace( "batchArrived: currentBatch = {}", inputBatch );
+    assert queryResult.hasQueryState() : "received query result without QueryState";
 
-    final boolean isFailureBatch    = QueryState.FAILED    == queryState;
-    final boolean isCompletionBatch = QueryState.COMPLETED == queryState;
-    final boolean isLastChunkBatchToDelay =
-        inputBatch.getHeader().getIsLastChunk() && QueryState.PENDING == queryState;
-    final boolean isTerminalBatch;
+    final boolean isFailureResult    = QueryState.FAILED    == queryState;
+    // CANCELED queries are handled the same way as COMPLETED
+    final boolean isTerminalResult;
     switch ( queryState ) {
       case PENDING:
-         isTerminalBatch = false;
-         break;
+        isTerminalResult = false;
+        break;
       case FAILED:
       case CANCELED:
       case COMPLETED:
-        isTerminalBatch = true;
+        isTerminalResult = true;
         break;
       default:
         logger.error( "Unexpected/unhandled QueryState " + queryState
-                      + " (for query " + queryId +  ")" );
-        isTerminalBatch = false;
+          + " (for query " + queryId +  ")" );
+        isTerminalResult = false;
         break;
     }
-    assert isFailureBatch || inputBatch.getHeader().getErrorCount() == 0
-        : "Error count for the query batch is non-zero but QueryState != FAILED";
 
-    UserResultsListener resultsListener = queryIdToResultsListenersMap.get( queryId );
-    logger.trace( "For QueryId [{}], retrieved results listener {}", queryId,
-                  resultsListener );
-    if ( null == resultsListener ) {
-      // WHO?? didn't get query ID response and set submission listener yet,
-      // so install a buffering listener for now
+    assert isFailureResult || queryResult.getErrorCount() == 0
+      : "Error count for the query batch is non-zero but QueryState != FAILED";
 
-      BufferingResultsListener bl = new BufferingResultsListener();
-      resultsListener = queryIdToResultsListenersMap.putIfAbsent( queryId, bl );
-      // If we had a successful insertion, use that reference.  Otherwise, just
-      // throw away the new buffering listener.
-      if ( null == resultsListener ) {
-        resultsListener = bl;
-      }
-      // TODO:  Is there a more direct way to detect a Query ID in whatever
-      // state this string comparison detects?
-      if ( queryId.toString().equals( "" ) ) {
-        failAll();
-      }
-    }
+    UserResultsListener resultsListener = newUserResultsListener(queryId);
 
     try {
-      if (isFailureBatch) {
+      if (isFailureResult) {
         // Failure case--pass on via submissionFailed(...).
 
+        String message = buildErrorMessage(queryResult);
+        resultsListener.submissionFailed(new RpcException(message));
+        // Note: Listener is removed in finally below.
+      } else if (isTerminalResult) {
+        // A successful completion/canceled case--pass on via resultArrived
+
         try {
-          String message = buildErrorMessage(inputBatch);
-          resultsListener.submissionFailed(new RpcException(message));
+          resultsListener.queryCompleted();
+        } catch ( Exception e ) {
+          resultsListener.submissionFailed(new RpcException(e));
         }
-        finally {
-          inputBatch.release();
-        }
-        // Note: Listener and any delayed batch are removed in finally below.
       } else {
-        // A successful (data, completion, or cancelation) case--pass on via
-        // resultArrived, delaying any last-chunk batches until following
-        // COMPLETED batch and omitting COMPLETED batch.
-
-        // If is last-chunk batch, save until next batch for query (normally a
-        // COMPLETED batch) comes in:
-        if ( isLastChunkBatchToDelay ) {
-          // We have a (non-failure) is-last-chunk batch--defer it until we get
-          // the query's COMPLETED batch.
-
-          QueryResultBatch expectNone;
-          assert null == ( expectNone =
-                           queryIdToDeferredLastChunkBatchesMap.get( queryId ) )
-              : "Already have pending last-batch QueryResultBatch " + expectNone
-                + " (at receiving last-batch QueryResultBatch " + inputBatch
-                + ") for query " + queryId;
-          queryIdToDeferredLastChunkBatchesMap.put( queryId, inputBatch );
-          // Can't release batch now; will release at terminal batch in
-          // finally below.
-        } else {
-          // We have a batch triggering sending out a batch (maybe same one,
-          // maybe deferred one.
-
-          // Batch to send out in response to current batch.
-          final QueryResultBatch outputBatch;
-          if ( isCompletionBatch ) {
-            // We have a COMPLETED batch--we should have a saved is-last-chunk
-            // batch, and we must pass that on now (that we've seen COMPLETED).
-
-            outputBatch = queryIdToDeferredLastChunkBatchesMap.get( queryId );
-            assert null != outputBatch
-                : "No pending last-batch QueryResultsBatch saved, at COMPLETED"
-                + " QueryResultsBatch " + inputBatch + " for query " + queryId;
-          } else {
-            // We have a non--last-chunk PENDING batch or a CANCELED
-            // batch--pass it on.
-            outputBatch = inputBatch;
-          }
-          // Note to release input batch if it's not the batch we're sending out.
-          final boolean releaseInputBatch = outputBatch != inputBatch;
-
-          try {
-            resultsListener.resultArrived( outputBatch, throttle );
-            // That releases outputBatch if successful.
-          } catch ( Exception e ) {
-            outputBatch.release();
-            resultsListener.submissionFailed(new RpcException(e));
-          }
-          finally {
-            if ( releaseInputBatch ) {
-              inputBatch.release();
-            }
-          }
-        }
+        logger.warn("queryState {} was ignored", queryState);
       }
     } finally {
-      if ( isTerminalBatch ) {
-        // Remove and release any deferred is-last-chunk batch:
-        QueryResultBatch anyUnsentLastChunkBatch =
-             queryIdToDeferredLastChunkBatchesMap.remove( queryId );
-        if ( null != anyUnsentLastChunkBatch ) {
-          anyUnsentLastChunkBatch.release();
-        }
-
-       // TODO:  What exactly are we checking for?  How should we really check
+      if ( isTerminalResult ) {
+        // TODO:  What exactly are we checking for?  How should we really check
         // for it?
         if ( (! ( resultsListener instanceof BufferingResultsListener )
-             || ((BufferingResultsListener) resultsListener).output != null ) ) {
+          || ((BufferingResultsListener) resultsListener).output != null ) ) {
           queryIdToResultsListenersMap.remove( queryId, resultsListener );
         }
       }
     }
   }
 
-  protected String buildErrorMessage(QueryResultBatch batch) {
+  /**
+   * Maps internal low-level API protocol to {@link UserResultsListener}-level API protocol.
+   * handles query data messages
+   */
+  public void batchArrived( ConnectionThrottle throttle,
+                            ByteBuf pBody, ByteBuf dBody ) throws RpcException {
+    final QueryData queryData = RpcBus.get( pBody, QueryData.PARSER );
+    // Current batch coming in.
+    final QueryDataBatch batch = new QueryDataBatch( queryData, (DrillBuf) dBody );
+
+    final QueryId queryId = queryData.getQueryId();
+
+    logger.debug( "batchArrived: queryId = {}", queryId );
+    logger.trace( "batchArrived: batch = {}", batch );
+
+    UserResultsListener resultsListener = newUserResultsListener(queryId);
+
+    // A data case--pass on via dataArrived
+
+    try {
+      resultsListener.dataArrived(batch, throttle);
+      // That releases batch if successful.
+    } catch ( Exception e ) {
+      batch.release();
+      resultsListener.submissionFailed(new RpcException(e));
+    }
+  }
+
+  /**
+   * Return {@link UserResultsListener} associated with queryId. Will create a new {@link BufferingResultsListener}
+   * if no listener found.
+   * @param queryId queryId we are getting the listener for
+   * @return {@link UserResultsListener} associated with queryId
+   */
+  private UserResultsListener newUserResultsListener(QueryId queryId) {
+    UserResultsListener resultsListener = queryIdToResultsListenersMap.get( queryId );
+    logger.trace( "For QueryId [{}], retrieved results listener {}", queryId, resultsListener );
+    if ( null == resultsListener ) {
+      // WHO?? didn't get query ID response and set submission listener yet,
+      // so install a buffering listener for now
+
+      BufferingResultsListener bl = new BufferingResultsListener();
+      resultsListener = queryIdToResultsListenersMap.putIfAbsent( queryId, bl );
+      // If we had a successful insertion, use that reference.  Otherwise, just
+      // throw away the new buffering listener.
+      if ( null == resultsListener ) {
+        resultsListener = bl;
+      }
+      // TODO:  Is there a more direct way to detect a Query ID in whatever state this string comparison detects?
+      if ( queryId.toString().isEmpty() ) {
+        failAll();
+      }
+    }
+    return resultsListener;
+  }
+
+  protected String buildErrorMessage(QueryResult result) {
     StringBuilder sb = new StringBuilder();
-    for (UserBitShared.DrillPBError error : batch.getHeader().getErrorList()) {
+    for (UserBitShared.DrillPBError error : result.getErrorList()) {
       sb.append(error.getMessage());
       sb.append("\n");
     }
@@ -252,7 +206,7 @@ public class QueryResultHandler {
 
   private static class BufferingResultsListener implements UserResultsListener {
 
-    private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+    private ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
     private volatile boolean finished = false;
     private volatile RpcException ex;
     private volatile UserResultsListener output;
@@ -261,31 +215,39 @@ public class QueryResultHandler {
     public boolean transferTo(UserResultsListener l) {
       synchronized (this) {
         output = l;
-        boolean last = false;
-        for (QueryResultBatch r : results) {
-          l.resultArrived(r, throttle);
-          last = r.getHeader().getIsLastChunk();
+        for (QueryDataBatch r : results) {
+          l.dataArrived(r, throttle);
         }
         if (ex != null) {
           l.submissionFailed(ex);
           return true;
+        } else if (finished) {
+          l.queryCompleted();
         }
-        return last;
+
+        return finished;
       }
     }
 
     @Override
-    public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
-      this.throttle = throttle;
-      if (result.getHeader().getIsLastChunk()) {
-        finished = true;
+    public void queryCompleted() {
+      finished = true;
+      synchronized (this) {
+        if (output != null) {
+          output.queryCompleted();
+        }
       }
+    }
+
+    @Override
+    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+      this.throttle = throttle;
 
       synchronized (this) {
         if (output == null) {
           this.results.add(result);
         } else {
-          output.resultArrived(result, throttle);
+          output.dataArrived(result, throttle);
         }
       }
     }
@@ -340,11 +302,10 @@ public class QueryResultHandler {
       if (oldListener != null) {
         logger.debug("Unable to place user results listener, buffering listener was already in place.");
         if (oldListener instanceof BufferingResultsListener) {
-          queryIdToResultsListenersMap.remove(oldListener);
           boolean all = ((BufferingResultsListener) oldListener).transferTo(this.resultsListener);
           // simply remove the buffering listener if we already have the last response.
           if (all) {
-            queryIdToResultsListenersMap.remove(oldListener);
+            queryIdToResultsListenersMap.remove(queryId);
           } else {
             boolean replaced = queryIdToResultsListenersMap.replace(queryId, oldListener, resultsListener);
             if (!replaced) {

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 925154d..fd82699 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -41,7 +42,7 @@ import org.apache.drill.exec.rpc.RpcException;
 import com.google.protobuf.MessageLite;
 
 public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHandshake, BitToUserHandshake> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
 
   private final QueryResultHandler queryResultHandler = new QueryResultHandler();
 
@@ -80,8 +81,10 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
       return BitToUserHandshake.getDefaultInstance();
     case RpcType.QUERY_HANDLE_VALUE:
       return QueryId.getDefaultInstance();
-    case RpcType.QUERY_RESULT_VALUE:
-      return QueryResult.getDefaultInstance();
+      case RpcType.QUERY_RESULT_VALUE:
+        return QueryResult.getDefaultInstance();
+    case RpcType.QUERY_DATA_VALUE:
+      return QueryData.getDefaultInstance();
     }
     throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
   }
@@ -89,9 +92,12 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
   @Override
   protected Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
     switch (rpcType) {
-    case RpcType.QUERY_RESULT_VALUE:
+    case RpcType.QUERY_DATA_VALUE:
       queryResultHandler.batchArrived(throttle, pBody, dBody);
       return new Response(RpcType.ACK, Ack.getDefaultInstance());
+    case RpcType.QUERY_RESULT_VALUE:
+      queryResultHandler.resultArrived(pBody);
+      return new Response(RpcType.ACK, Ack.getDefaultInstance());
     default:
       throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index 9f83a4f..934a094 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -22,8 +22,30 @@ import org.apache.drill.exec.rpc.RpcException;
 
 public interface UserResultsListener {
 
-  public abstract void queryIdArrived(QueryId queryId);
-  public abstract void submissionFailed(RpcException ex);
-  public abstract void resultArrived(QueryResultBatch result, ConnectionThrottle throttle);
+  /**
+   * QueryId is available. Called when a query is successfully submitted to the server.
+   * @param queryId sent by the server along {@link org.apache.drill.exec.rpc.Acks.OK Acks.OK}
+   */
+  void queryIdArrived(QueryId queryId);
+
+  /**
+   * The query has failed. Most likely called when the server returns a FAILED query state. Can also be called if
+   * {@link #dataArrived(QueryDataBatch, ConnectionThrottle) dataArrived()} throws an exception
+   * @param ex exception describing the cause of the failure
+   */
+  void submissionFailed(RpcException ex);
+
+  /**
+   * The query has completed (successsful completion or cancellation). The listener will not receive any other
+   * data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED)
+   */
+  void queryCompleted();
+
+  /**
+   * A {@link org.apache.drill.exec.proto.beans.QueryData QueryData} message was received
+   * @param result data batch received
+   * @param throttle connection throttle
+   */
+  void dataArrived(QueryDataBatch result, ConnectionThrottle throttle);
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 908d304..9fb8bdb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.rpc.user;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -33,8 +34,9 @@ public class UserRpcConfig {
       .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) //user to bit.
       .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) //user to bit
       .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) //user to bit
+      .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) //bit to user
       .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) //bit to user
       .build();
 
-  public static int RPC_VERSION = 3;
+  public static int RPC_VERSION = 4;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index c76d324..5b2433b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
 import org.apache.drill.exec.proto.UserProtos.RpcType;
 import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -45,7 +46,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.MessageLite;
 
 public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnection> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
 
   final UserWorker worker;
   final BufferAllocator alloc;
@@ -117,16 +118,19 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
       return session;
     }
 
-    public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
+    public void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result, boolean allowInEventThread){
       logger.trace("Sending result to client with {}", result);
-      send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, false, result.getBuffers());
+      send(listener, this, RpcType.QUERY_RESULT, result, Ack.class, allowInEventThread);
     }
 
-    public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result, boolean allowInEventThread){
-      logger.trace("Sending result to client with {}", result);
-      send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, allowInEventThread, result.getBuffers());
+    public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
+      sendData(listener, result, false);
     }
 
+    public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result, boolean allowInEventThread){
+      logger.trace("Sending data to client with {}", result);
+      send(listener, this, RpcType.QUERY_DATA, result.getHeader(), Ack.class, allowInEventThread, result.getBuffers());
+    }
     @Override
     public BufferAllocator getAllocator() {
       return alloc;

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
index 8996a69..fbbf0b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
@@ -39,7 +39,7 @@ import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -49,8 +49,7 @@ import parquet.Preconditions;
 
 @XmlRootElement
 public class QueryWrapper {
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWrapper.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWrapper.class);
 
   private String query;
   private String queryType;
@@ -137,7 +136,12 @@ public class QueryWrapper {
     }
 
     @Override
-    public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
+    public void queryCompleted() {
+      latch.countDown();
+    }
+
+    @Override
+    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
       try {
         final int rows = result.getHeader().getRowCount();
         if (result.hasData()) {
@@ -162,9 +166,6 @@ public class QueryWrapper {
         throw new RuntimeException(e);
       } finally {
         result.release();
-        if (result.getHeader().getIsLastChunk()) {
-          latch.countDown();
-        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 285b75a..23ef0d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -46,7 +46,6 @@ import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.FragmentRoot;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
 import org.apache.drill.exec.planner.fragment.Fragment;
 import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
 import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
@@ -612,7 +611,6 @@ public class Foreman implements Runnable {
        * Construct the response based on the latest resultState. The builder shouldn't fail.
        */
       final QueryResult.Builder resultBuilder = QueryResult.newBuilder()
-          .setIsLastChunk(resultState != QueryState.COMPLETED) // TODO(DRILL-2498) temporary
           .setQueryId(queryId)
           .setQueryState(resultState);
       if (resultException != null) {
@@ -629,7 +627,7 @@ public class Foreman implements Runnable {
        */
       try {
         // send whatever result we ended up with
-        initiatingClient.sendResult(responseListener, new QueryWritableBatch(resultBuilder.build()), true);
+        initiatingClient.sendResult(responseListener, resultBuilder.build(), true);
       } catch(Exception e) {
         addException(e);
         logger.warn("Exception sending result to client", resultException);

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 64cf2ec..c602a01 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -38,7 +38,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
@@ -188,19 +188,19 @@ public class BaseTestQuery extends ExecTest {
     listener.waitForCompletion();
   }
 
-  protected static List<QueryResultBatch> testSqlWithResults(String sql) throws Exception{
+  protected static List<QueryDataBatch> testSqlWithResults(String sql) throws Exception{
     return testRunAndReturn(QueryType.SQL, sql);
   }
 
-  protected static List<QueryResultBatch> testLogicalWithResults(String logical) throws Exception{
+  protected static List<QueryDataBatch> testLogicalWithResults(String logical) throws Exception{
     return testRunAndReturn(QueryType.LOGICAL, logical);
   }
 
-  protected static List<QueryResultBatch> testPhysicalWithResults(String physical) throws Exception{
+  protected static List<QueryDataBatch> testPhysicalWithResults(String physical) throws Exception{
     return testRunAndReturn(QueryType.PHYSICAL, physical);
   }
 
-  public static List<QueryResultBatch>  testRunAndReturn(QueryType type, String query) throws Exception{
+  public static List<QueryDataBatch>  testRunAndReturn(QueryType type, String query) throws Exception{
     query = QueryTestUtil.normalizeQuery(query);
     return client.runQuery(type, query);
   }
@@ -221,9 +221,9 @@ public class BaseTestQuery extends ExecTest {
     query = String.format(query, args);
     logger.debug("Running query:\n--------------\n"+query);
     for (int i = 0; i < interation; i++) {
-      List<QueryResultBatch> results = client.runQuery(QueryType.SQL, query);
-      for (QueryResultBatch queryResultBatch : results) {
-        queryResultBatch.release();
+      List<QueryDataBatch> results = client.runQuery(QueryType.SQL, query);
+      for (QueryDataBatch queryDataBatch : results) {
+        queryDataBatch.release();
       }
     }
   }
@@ -252,7 +252,7 @@ public class BaseTestQuery extends ExecTest {
     testPhysical(getFile(file));
   }
 
-  protected static List<QueryResultBatch> testPhysicalFromFileWithResults(String file) throws Exception {
+  protected static List<QueryDataBatch> testPhysicalFromFileWithResults(String file) throws Exception {
     return testRunAndReturn(QueryType.PHYSICAL, getFile(file));
   }
 
@@ -285,16 +285,18 @@ public class BaseTestQuery extends ExecTest {
     }
 
     @Override
-    public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
+    public void queryCompleted() {
+      System.out.println("Query completed successfully with row count: " + count.get());
+      latch.countDown();
+    }
+
+    @Override
+    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
       int rows = result.getHeader().getRowCount();
       if (result.getData() != null) {
         count.addAndGet(rows);
       }
       result.release();
-      if (result.getHeader().getIsLastChunk()) {
-        System.out.println("Query completed successfully with row count: " + count.get());
-        latch.countDown();
-      }
     }
 
     @Override
@@ -317,10 +319,10 @@ public class BaseTestQuery extends ExecTest {
     this.columnWidths = columnWidths;
   }
 
-  protected int printResult(List<QueryResultBatch> results) throws SchemaChangeException {
+  protected int printResult(List<QueryDataBatch> results) throws SchemaChangeException {
     int rowCount = 0;
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-    for(QueryResultBatch result : results) {
+    for(QueryDataBatch result : results) {
       rowCount += result.getHeader().getRowCount();
       loader.load(result.getHeader().getDef(), result.getData());
       if (loader.getRecordCount() <= 0) {
@@ -334,12 +336,12 @@ public class BaseTestQuery extends ExecTest {
     return rowCount;
   }
 
-  protected static String getResultString(List<QueryResultBatch> results, String delimiter)
+  protected static String getResultString(List<QueryDataBatch> results, String delimiter)
       throws SchemaChangeException {
     StringBuilder formattedResults = new StringBuilder();
     boolean includeHeader = true;
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
-    for(QueryResultBatch result : results) {
+    for(QueryDataBatch result : results) {
       loader.load(result.getHeader().getDef(), result.getData());
       if (loader.getRecordCount() <= 0) {
         continue;

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 75a91b3..d05c896 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.record.HyperVectorWrapper;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.io.Text;
 
@@ -166,13 +166,13 @@ public class DrillTestWrapper {
     }
   }
 
-  private Map<String, HyperVectorValueIterator> addToHyperVectorMap(List<QueryResultBatch> records, RecordBatchLoader loader,
+  private Map<String, HyperVectorValueIterator> addToHyperVectorMap(List<QueryDataBatch> records, RecordBatchLoader loader,
                                                                       BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
     // TODO - this does not handle schema changes
     Map<String, HyperVectorValueIterator> combinedVectors = new HashMap();
 
     long totalRecords = 0;
-    QueryResultBatch batch;
+    QueryDataBatch batch;
     int size = records.size();
     for (int i = 0; i < size; i++) {
       batch = records.get(i);
@@ -213,13 +213,13 @@ public class DrillTestWrapper {
    * @throws SchemaChangeException
    * @throws UnsupportedEncodingException
    */
-   private Map<String, List> addToCombinedVectorResults(List<QueryResultBatch> records, RecordBatchLoader loader,
+   private Map<String, List> addToCombinedVectorResults(List<QueryDataBatch> records, RecordBatchLoader loader,
                                                          BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
     // TODO - this does not handle schema changes
     Map<String, List> combinedVectors = new HashMap();
 
     long totalRecords = 0;
-    QueryResultBatch batch;
+    QueryDataBatch batch;
     int size = records.size();
     for (int i = 0; i < size; i++) {
       batch = records.get(0);
@@ -268,14 +268,14 @@ public class DrillTestWrapper {
     BatchSchema schema = null;
 
     BaseTestQuery.test(testOptionSettingQueries);
-    List<QueryResultBatch> expected = BaseTestQuery.testRunAndReturn(queryType, query);
+    List<QueryDataBatch> expected = BaseTestQuery.testRunAndReturn(queryType, query);
 
     addTypeInfoIfMissing(expected.get(0), testBuilder);
 
     List<Map> expectedRecords = new ArrayList<>();
     addToMaterializedResults(expectedRecords, expected, loader, schema);
 
-    List<QueryResultBatch> results = new ArrayList();
+    List<QueryDataBatch> results = new ArrayList();
     List<Map> actualRecords = new ArrayList<>();
     // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
     // the cases where the baseline is stored in a file.
@@ -313,13 +313,13 @@ public class DrillTestWrapper {
     BatchSchema schema = null;
 
     BaseTestQuery.test(testOptionSettingQueries);
-    List<QueryResultBatch> results = BaseTestQuery.testRunAndReturn(queryType, query);
+    List<QueryDataBatch> results = BaseTestQuery.testRunAndReturn(queryType, query);
     // To avoid extra work for test writers, types can optionally be inferred from the test query
     addTypeInfoIfMissing(results.get(0), testBuilder);
 
     Map<String, List> actualSuperVectors = addToCombinedVectorResults(results, loader, schema);
 
-    List<QueryResultBatch> expected = null;
+    List<QueryDataBatch> expected = null;
     Map<String, List> expectedSuperVectors = null;
 
     // If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
@@ -353,14 +353,14 @@ public class DrillTestWrapper {
     BatchSchema schema = null;
 
     BaseTestQuery.test(testOptionSettingQueries);
-    List<QueryResultBatch> results = BaseTestQuery.testRunAndReturn(queryType, query);
+    List<QueryDataBatch> results = BaseTestQuery.testRunAndReturn(queryType, query);
     // To avoid extra work for test writers, types can optionally be inferred from the test query
     addTypeInfoIfMissing(results.get(0), testBuilder);
 
     Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader, schema);
 
     BaseTestQuery.test(baselineOptionSettingQueries);
-    List<QueryResultBatch> expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
+    List<QueryDataBatch> expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
 
     Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader, schema);
 
@@ -368,7 +368,7 @@ public class DrillTestWrapper {
     cleanupBatches(results, expected);
   }
 
-  private void addTypeInfoIfMissing(QueryResultBatch batch, TestBuilder testBuilder) {
+  private void addTypeInfoIfMissing(QueryDataBatch batch, TestBuilder testBuilder) {
     if (! testBuilder.typeInfoSet()) {
       Map<SchemaPath, TypeProtos.MajorType> typeMap = getTypeMapFromBatch(batch);
       testBuilder.baselineTypes(typeMap);
@@ -376,7 +376,7 @@ public class DrillTestWrapper {
 
   }
 
-  private Map<SchemaPath, TypeProtos.MajorType> getTypeMapFromBatch(QueryResultBatch batch) {
+  private Map<SchemaPath, TypeProtos.MajorType> getTypeMapFromBatch(QueryDataBatch batch) {
     Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap();
     for (int i = 0; i < batch.getHeader().getDef().getFieldCount(); i++) {
       typeMap.put(MaterializedField.create(batch.getHeader().getDef().getField(i)).getPath(),
@@ -385,18 +385,18 @@ public class DrillTestWrapper {
     return typeMap;
   }
 
-  private void cleanupBatches(List<QueryResultBatch>... results) {
-    for (List<QueryResultBatch> resultList : results ) {
-      for (QueryResultBatch result : resultList) {
+  private void cleanupBatches(List<QueryDataBatch>... results) {
+    for (List<QueryDataBatch> resultList : results ) {
+      for (QueryDataBatch result : resultList) {
         result.release();
       }
     }
   }
 
-  protected void addToMaterializedResults(List<Map> materializedRecords,  List<QueryResultBatch> records, RecordBatchLoader loader,
+  protected void addToMaterializedResults(List<Map> materializedRecords,  List<QueryDataBatch> records, RecordBatchLoader loader,
                                           BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
     long totalRecords = 0;
-    QueryResultBatch batch;
+    QueryDataBatch batch;
     int size = records.size();
     for (int i = 0; i < size; i++) {
       batch = records.get(0);

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
index 80b4d13..4744978 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
@@ -28,7 +28,7 @@ import java.util.regex.Pattern;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.eigenbase.sql.SqlExplain.Depth;
@@ -288,12 +288,12 @@ public class PlanTestBase extends BaseTestQuery {
    */
   protected static String getPlanInString(String sql, String columnName)
       throws Exception {
-    List<QueryResultBatch> results = testSqlWithResults(sql);
+    List<QueryDataBatch> results = testSqlWithResults(sql);
 
     RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
     StringBuilder builder = new StringBuilder();
 
-    for (QueryResultBatch b : results) {
+    for (QueryDataBatch b : results) {
       if (!b.hasData()) {
         continue;
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
index 3d19229..82f1752 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.client.PrintingResultsListener;
 import org.apache.drill.exec.client.QuerySubmitter.Format;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.util.VectorUtil;
@@ -59,11 +59,11 @@ public class QueryTestUtil {
     final DrillClient drillClient = new DrillClient(drillConfig, remoteServiceSet.getCoordinator());
     drillClient.connect();
 
-    final List<QueryResultBatch> results = drillClient.runQuery(
+    final List<QueryDataBatch> results = drillClient.runQuery(
         QueryType.SQL, String.format("alter session set `%s` = %d",
             ExecConstants.MAX_WIDTH_PER_NODE_KEY, maxWidth));
-    for (QueryResultBatch queryResultBatch : results) {
-      queryResultBatch.release();
+    for (QueryDataBatch queryDataBatch : results) {
+      queryDataBatch.release();
     }
 
     return drillClient;

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
index 07cb833..5703bf9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
@@ -24,17 +24,17 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 
 /**
  * Result listener that is set up to receive a single row. Useful for queries
- * such with a count(*) or limit 1. The abstract method {@link #rowArrived(QueryResultBatch)} provides
+ * such with a count(*) or limit 1. The abstract method {@link #rowArrived(QueryDataBatch)} provides
  * the means for a derived class to get the expected record's data.
  */
 public abstract class SingleRowListener implements UserResultsListener {
@@ -51,14 +51,26 @@ public abstract class SingleRowListener implements UserResultsListener {
   @Override
   public void submissionFailed(final RpcException ex) {
     exception = ex;
+    synchronized(errorList) {
+      errorList.add(ex.getRemoteError());
+    }
     latch.countDown();
   }
 
   @Override
-  public void resultArrived(final QueryResultBatch result, final ConnectionThrottle throttle) {
-    final QueryResult queryResult = result.getHeader();
+  public void queryCompleted() {
+    try {
+      cleanup();
+    } finally {
+      latch.countDown();
+    }
+  }
+
+  @Override
+  public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+    final QueryData queryData = result.getHeader();
     if (result.hasData()) {
-      final int nRows = this.nRows.addAndGet(queryResult.getRowCount());
+      final int nRows = this.nRows.addAndGet(queryData.getRowCount());
       if (nRows > 1) {
         throw new IllegalStateException("Expected exactly one row, but got " + nRows);
       }
@@ -66,22 +78,7 @@ public abstract class SingleRowListener implements UserResultsListener {
       rowArrived(result);
     }
 
-    // TODO this appears to never be set
-    if (queryResult.hasQueryState()) {
-      queryState = queryResult.getQueryState();
-    }
-
-    synchronized(errorList) {
-      errorList.addAll(queryResult.getErrorList());
-    }
-
-    final boolean isLastChunk = queryResult.getIsLastChunk();
     result.release();
-
-    if (isLastChunk) {
-      cleanup();
-      latch.countDown();
-    }
   }
 
   /**
@@ -110,9 +107,9 @@ public abstract class SingleRowListener implements UserResultsListener {
    * <p>Derived classes provide whatever implementation they require here to access
    * the record's data.
    *
-   * @param queryResultBatch result batch holding the row
+   * @param queryDataBatch result batch holding the row
    */
-  protected abstract void rowArrived(QueryResultBatch queryResultBatch);
+  protected abstract void rowArrived(QueryDataBatch queryDataBatch);
 
   /**
    * Wait for the completion of this query; receiving a record or an error will both cause the

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java
index 67b102d..0a24073 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java
@@ -27,7 +27,7 @@ import java.util.List;
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.vector.BigIntVector;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -72,12 +72,12 @@ public class TestQueriesOnLargeFile extends BaseTestQuery {
 
   @Test
   public void testRead() throws Exception {
-    List<QueryResultBatch> results = testSqlWithResults(
+    List<QueryDataBatch> results = testSqlWithResults(
         String.format("SELECT count(*) FROM dfs_test.`default`.`%s`", dataFile.getPath()));
 
     RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
 
-    for(QueryResultBatch batch : results) {
+    for(QueryDataBatch batch : results) {
       batchLoader.load(batch.getHeader().getDef(), batch.getData());
 
       if (batchLoader.getRecordCount() <= 0) {
@@ -96,10 +96,10 @@ public class TestQueriesOnLargeFile extends BaseTestQuery {
   public void testMergingReceiver() throws Exception {
     String plan = Files.toString(FileUtils.getResourceAsFile("/largefiles/merging_receiver_large_data.json"),
         Charsets.UTF_8).replace("#{TEST_FILE}", escapeJsonString(dataFile.getPath()));
-    List<QueryResultBatch> results = testPhysicalWithResults(plan);
+    List<QueryDataBatch> results = testPhysicalWithResults(plan);
 
     int recordsInOutput = 0;
-    for(QueryResultBatch batch : results) {
+    for(QueryDataBatch batch : results) {
       recordsInOutput += batch.getHeader().getDef().getRecordCount();
       batch.release();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
index 98919ec..df03c7d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 import org.apache.drill.exec.DrillSystemTestBase;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -53,8 +53,8 @@ public class DrillClientSystemTest extends DrillSystemTestBase {
     startCluster(1);
     DrillClient client = new DrillClient();
     client.connect();
-    List<QueryResultBatch> results = client.runQuery(QueryType.LOGICAL, plan);
-    for (QueryResultBatch result : results) {
+    List<QueryDataBatch> results = client.runQuery(QueryType.LOGICAL, plan);
+    for (QueryDataBatch result : results) {
       System.out.println(result);
       result.release();
     }
@@ -66,8 +66,8 @@ public class DrillClientSystemTest extends DrillSystemTestBase {
     startCluster(2);
     DrillClient client = new DrillClient();
     client.connect();
-    List<QueryResultBatch> results = client.runQuery(QueryType.LOGICAL, plan);
-    for (QueryResultBatch result : results) {
+    List<QueryDataBatch> results = client.runQuery(QueryType.LOGICAL, plan);
+    for (QueryDataBatch result : results) {
       System.out.println(result);
       result.release();
     }