You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/04/04 04:37:29 UTC

[3/9] drill git commit: DRILL-2498: Separate QueryResult into two messages QueryResult and QueryData

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 9bc0552..64033a5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -39,14 +39,14 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.testing.ExceptionInjectionUtil;
 import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOption;
 import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOptions;
@@ -184,11 +184,11 @@ public class TestDrillbitResilience extends ExecTest {
           private final RecordBatchLoader loader = new RecordBatchLoader(bufferAllocator);
 
           @Override
-          public void rowArrived(final QueryResultBatch queryResultBatch) {
+          public void rowArrived(final QueryDataBatch queryResultBatch) {
             // load the single record
-            final QueryResult queryResult = queryResultBatch.getHeader();
+            final QueryData queryData = queryResultBatch.getHeader();
             try {
-              loader.load(queryResult.getDef(), queryResultBatch.getData());
+              loader.load(queryData.getDef(), queryResultBatch.getData());
             } catch(SchemaChangeException e) {
               fail(e.toString());
             }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 9999be0..acfb522 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -49,7 +49,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserServer;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.CachedSingleFileSystem;
@@ -145,11 +145,11 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
   @Test
   public void testNullableAgg() throws Exception {
 
-    List<QueryResultBatch> result = testSqlWithResults("select sum(a) as total_sum from dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`");
+    List<QueryDataBatch> result = testSqlWithResults("select sum(a) as total_sum from dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`");
     assertEquals("Only expected one batch with data, and then the empty finishing batch.", 2, result.size());
     RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
 
-    QueryResultBatch b = result.get(0);
+    QueryDataBatch b = result.get(0);
     loader.load(b.getHeader().getDef(), b.getData());
 
     VectorWrapper vw = loader.getValueAccessorById(
@@ -163,11 +163,11 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
 
   @Test
   public void testNullableFilter() throws Exception {
-    List<QueryResultBatch> result = testSqlWithResults("select count(wr_return_quantity) as row_count from dfs.`/tmp/web_returns` where wr_return_quantity = 1");
+    List<QueryDataBatch> result = testSqlWithResults("select count(wr_return_quantity) as row_count from dfs.`/tmp/web_returns` where wr_return_quantity = 1");
     assertEquals("Only expected one batch with data, and then the empty finishing batch.", 2, result.size());
     RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
 
-    QueryResultBatch b = result.get(0);
+    QueryDataBatch b = result.get(0);
     loader.load(b.getHeader().getDef(), b.getData());
 
     VectorWrapper vw = loader.getValueAccessorById(

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index 52d5086..55f0d75 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.vector.ValueVector;
 
@@ -39,7 +39,7 @@ import com.google.common.base.Strings;
 import com.google.common.util.concurrent.SettableFuture;
 
 public class ParquetResultListener implements UserResultsListener {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetResultListener.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetResultListener.class);
 
   private SettableFuture<Void> future = SettableFuture.create();
   int count = 0;
@@ -65,6 +65,10 @@ public class ParquetResultListener implements UserResultsListener {
     future.setException(ex);
   }
 
+  @Override
+  public void queryCompleted() {
+    checkLastChunk();
+  }
 
   private <T> void assertField(ValueVector valueVector, int index, TypeProtos.MinorType expectedMinorType, Object value, String name) {
     assertField(valueVector, index, expectedMinorType, value, name, 0);
@@ -94,11 +98,8 @@ public class ParquetResultListener implements UserResultsListener {
   }
 
   @Override
-  synchronized public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
+  synchronized public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
     logger.debug("result arrived in test batch listener.");
-    if(result.getHeader().getIsLastChunk()){
-      future.set(null);
-    }
     int columnValCounter = 0;
     FieldInfo currentField;
     count += result.getHeader().getRowCount();
@@ -147,15 +148,12 @@ public class ParquetResultListener implements UserResultsListener {
       printRowMajor(batchLoader);
     }
     batchCounter++;
-    if(result.getHeader().getIsLastChunk()){
-      checkLastChunk(batchLoader, result);
-    }
 
     batchLoader.clear();
     result.release();
   }
 
-  public void checkLastChunk(RecordBatchLoader batchLoader, QueryResultBatch result) {
+  private void checkLastChunk() {
     int recordsInBatch = -1;
     // ensure the right number of columns was returned, especially important to ensure selective column read is working
     if (testValues) {
@@ -173,8 +171,6 @@ public class ParquetResultListener implements UserResultsListener {
     }
 
     assert valuesChecked.keySet().size() > 0;
-    batchLoader.clear();
-    result.release();
     future.set(null);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
index 6cb412c..882cdbd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
@@ -43,7 +43,7 @@ import com.google.common.base.Stopwatch;
 import com.google.common.io.Resources;
 
 public class TestParquetPhysicalPlan extends ExecTest {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestParquetPhysicalPlan.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestParquetPhysicalPlan.class);
 
   public String fileName = "parquet/parquet_scan_filter_union_screen_physical.json";
 
@@ -56,10 +56,10 @@ public class TestParquetPhysicalPlan extends ExecTest {
     try (Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) {
       bit1.run();
       client.connect();
-      List<QueryResultBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
+      List<QueryDataBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
       RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
       int count = 0;
-      for (QueryResultBatch b : results) {
+      for (QueryDataBatch b : results) {
         System.out.println(String.format("Got %d results", b.getHeader().getRowCount()));
         count += b.getHeader().getRowCount();
         loader.load(b.getHeader().getDef(), b.getData());
@@ -96,13 +96,15 @@ public class TestParquetPhysicalPlan extends ExecTest {
     }
 
     @Override
-    public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
+    public void queryCompleted() {
+      latch.countDown();
+    }
+
+    @Override
+    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
       int rows = result.getHeader().getRowCount();
       System.out.println(String.format("Result batch arrived. Number of records: %d", rows));
       count.addAndGet(rows);
-      if (result.getHeader().getIsLastChunk()) {
-        latch.countDown();
-      }
       result.release();
     }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
index e5a2c94..3c1a38a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
@@ -27,7 +27,7 @@ import org.apache.drill.BaseTestQuery;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.vector.ValueVector;
 import org.junit.Test;
 
@@ -41,7 +41,7 @@ public class TestTextColumn extends BaseTestQuery{
 
   @Test
   public void testDefaultDelimiterColumnSelection() throws Exception {
-    List<QueryResultBatch> batches = testSqlWithResults("SELECT columns[0] as entire_row " +
+    List<QueryDataBatch> batches = testSqlWithResults("SELECT columns[0] as entire_row " +
       "from dfs_test.`[WORKING_PATH]/src/test/resources/store/text/data/letters.txt`");
 
     List<List<String>> expectedOutput = Arrays.asList(
@@ -55,7 +55,7 @@ public class TestTextColumn extends BaseTestQuery{
 
   @Test
   public void testCsvColumnSelectionCommasInsideQuotes() throws Exception {
-    List<QueryResultBatch> batches = testSqlWithResults("SELECT columns[0] as col1, columns[1] as col2, columns[2] as col3," +
+    List<QueryDataBatch> batches = testSqlWithResults("SELECT columns[0] as col1, columns[1] as col2, columns[2] as col3," +
       "columns[3] as col4 from dfs_test.`[WORKING_PATH]/src/test/resources/store/text/data/letters.csv`");
 
     List<List<String>> expectedOutput = Arrays.asList(
@@ -67,11 +67,11 @@ public class TestTextColumn extends BaseTestQuery{
     validateOutput(expectedOutput, actualOutput);
   }
 
-  private List<List<String>> getOutput(List<QueryResultBatch> batches) throws SchemaChangeException {
+  private List<List<String>> getOutput(List<QueryDataBatch> batches) throws SchemaChangeException {
     List<List<String>> output = new ArrayList<>();
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
     int last = 0;
-    for(QueryResultBatch batch : batches) {
+    for(QueryDataBatch batch : batches) {
       int rows = batch.getHeader().getRowCount();
       if(batch.getData() != null) {
         loader.load(batch.getHeader().getDef(), batch.getData());

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
index fa43d55..5e781d2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
@@ -25,7 +25,7 @@ import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.pop.PopUnitTestBase;
 import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.util.VectorUtil;
@@ -45,13 +45,13 @@ public class TextRecordReaderTest extends PopUnitTestBase {
 
       bit1.run();
       client.connect();
-      List<QueryResultBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
+      List<QueryDataBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
               Files.toString(
                       FileUtils.getResourceAsFile("/store/text/test.json"), Charsets.UTF_8)
                       .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/store/text/data/regions.csv").toURI().toString()));
       int count = 0;
       RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
-      for(QueryResultBatch b : results) {
+      for(QueryDataBatch b : results) {
         if (b.getHeader().getRowCount() != 0) {
           count += b.getHeader().getRowCount();
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
index 437bbb5..d674d47 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
@@ -23,7 +23,7 @@ import java.util.Objects;
 
 import org.apache.drill.BaseTestQuery;
 import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.util.JsonStringArrayList;
 import org.apache.drill.exec.util.JsonStringHashMap;
 import org.apache.drill.exec.vector.ValueVector;
@@ -82,15 +82,15 @@ public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
   }
 
   protected void query(final String query, final Function<RecordBatchLoader> testBody) throws Exception {
-    List<QueryResultBatch> batches = testSqlWithResults(query);
+    List<QueryDataBatch> batches = testSqlWithResults(query);
     RecordBatchLoader loader = new RecordBatchLoader(client.getAllocator());
     try {
       // first batch at index 0 is empty and used for fast schema return. Load the second one for the tests
-      QueryResultBatch batch = batches.get(0);
+      QueryDataBatch batch = batches.get(0);
       loader.load(batch.getHeader().getDef(), batch.getData());
       testBody.apply(loader);
     } finally {
-      for (QueryResultBatch batch:batches) {
+      for (QueryDataBatch batch:batches) {
         batch.release();
       }
       loader.clear();

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java
index eedbf6f..656429e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java
@@ -27,7 +27,7 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.junit.Test;
 
 public class TestComplexToJson extends BaseTestQuery {
@@ -36,7 +36,7 @@ public class TestComplexToJson extends BaseTestQuery {
   public void test() throws Exception {
     DrillClient parent_client = client;
 
-    List<QueryResultBatch> results;
+    List<QueryDataBatch> results;
     RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
 
     client = new DrillClient(config, serviceSet.getCoordinator());
@@ -50,7 +50,7 @@ public class TestComplexToJson extends BaseTestQuery {
     // with setSupportComplexTypes == false, the column mode should be REQUIRED
     assertTrue(def.getField(0).getMajorType().getMode() == DataMode.REQUIRED);
     loader.clear();
-    for(QueryResultBatch result : results) {
+    for(QueryDataBatch result : results) {
       result.release();
     }
     client.close();
@@ -66,7 +66,7 @@ public class TestComplexToJson extends BaseTestQuery {
     // with setSupportComplexTypes == true, the column mode should be REPEATED
     assertTrue(def.getField(0).getMajorType().getMode() == DataMode.REPEATED);
     loader.clear();
-    for(QueryResultBatch result : results) {
+    for(QueryDataBatch result : results) {
       result.release();
     }
     client.close();

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index fe86192..3f69fd0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -36,7 +36,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.vector.IntVector;
 import org.apache.drill.exec.vector.RepeatedBigIntVector;
 import org.junit.Ignore;
@@ -48,7 +48,7 @@ import com.google.common.io.Files;
 import org.junit.rules.TemporaryFolder;
 
 public class TestJsonReader extends BaseTestQuery {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonReader.class);
+//  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonReader.class);
 
   private static final boolean VERBOSE_DEBUG = false;
 
@@ -196,15 +196,15 @@ public class TestJsonReader extends BaseTestQuery {
 
   @Test
   public void readComplexWithStar() throws Exception {
-    List<QueryResultBatch> results = testSqlWithResults("select * from cp.`/store/json/test_complex_read_with_star.json`");
-    assertEquals(2, results.size());
+    List<QueryDataBatch> results = testSqlWithResults("select * from cp.`/store/json/test_complex_read_with_star.json`");
+    assertEquals(1, results.size());
 
     RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
-    QueryResultBatch batch = results.get(0);
+    QueryDataBatch batch = results.get(0);
 
     assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
     assertEquals(3, batchLoader.getSchema().getFieldCount());
-    testExistentColumns(batchLoader, batch);
+    testExistentColumns(batchLoader);
 
     batch.release();
     batchLoader.clear();
@@ -249,24 +249,24 @@ public class TestJsonReader extends BaseTestQuery {
     test("alter system set `store.json.all_text_mode` = false");
     runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, rowCounts);
 
-    List<QueryResultBatch> results = testPhysicalWithResults(queries[0]);
-    assertEquals(2, results.size());
+    List<QueryDataBatch> results = testPhysicalWithResults(queries[0]);
+    assertEquals(1, results.size());
     // "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", "`field_4`.`inner_1`"
 
     RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
-    QueryResultBatch batch = results.get(0);
+    QueryDataBatch batch = results.get(0);
     assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
 
     // this used to be five.  It is now three.  This is because the plan doesn't have a project.
     // Scanners are not responsible for projecting non-existent columns (as long as they project one column)
     assertEquals(3, batchLoader.getSchema().getFieldCount());
-    testExistentColumns(batchLoader, batch);
+    testExistentColumns(batchLoader);
 
     batch.release();
     batchLoader.clear();
   }
 
-  private void testExistentColumns(RecordBatchLoader batchLoader, QueryResultBatch batch) throws SchemaChangeException {
+  private void testExistentColumns(RecordBatchLoader batchLoader) throws SchemaChangeException {
     VectorWrapper<?> vw = batchLoader.getValueAccessorById(
         RepeatedBigIntVector.class, //
         batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_1")).getFieldIds() //

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
index b3c653f..dcea9bb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
@@ -25,7 +25,7 @@ import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.junit.Test;
@@ -46,11 +46,11 @@ public class TestSpoolingBuffer extends ExecTest {
 
       bit1.run();
       client.connect();
-      List<QueryResultBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
+      List<QueryDataBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
               Files.toString(FileUtils.getResourceAsFile("/work/batch/multiple_exchange.json"),
                       Charsets.UTF_8));
       int count = 0;
-      for(QueryResultBatch b : results) {
+      for(QueryDataBatch b : results) {
         if (b.getHeader().getRowCount() != 0) {
           count += b.getHeader().getRowCount();
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
index cddd999..3b38a09 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 
 public class DrillCursor implements Cursor {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillCursor.class);
@@ -113,7 +113,7 @@ public class DrillCursor implements Cursor {
       // Next index is not in current batch (including initial empty batch--
       // (try to) get next batch.
       try {
-        QueryResultBatch qrb = resultsListener.getNext();
+        QueryDataBatch qrb = resultsListener.getNext();
         recordBatchCount++;
         while (qrb != null && qrb.getHeader().getRowCount() == 0 && !first) {
           qrb.release();

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java
index 0ce33f4..fb27d2d 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java
@@ -19,8 +19,6 @@ package org.apache.drill.jdbc;
 
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.TimeZone;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -30,24 +28,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import net.hydromatic.avatica.AvaticaPrepareResult;
 import net.hydromatic.avatica.AvaticaResultSet;
 import net.hydromatic.avatica.AvaticaStatement;
-import net.hydromatic.avatica.Cursor;
-import net.hydromatic.avatica.Cursor.Accessor;
 
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
 
 import com.google.common.collect.Queues;
 
 public class DrillResultSet extends AvaticaResultSet {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSet.class);
+  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSet.class);
 
   SchemaChangeListener changeListener;
   final ResultsListener resultslistener = new ResultsListener();
@@ -107,8 +102,7 @@ public class DrillResultSet extends AvaticaResultSet {
     // don't return with metadata until we've achieved at least one return message.
     try {
       resultslistener.latch.await();
-      boolean notAtEnd = cursor.next();
-      assert notAtEnd;
+      cursor.next();
     } catch (InterruptedException e) {
      // TODO:  Check:  Should this call Thread.currentThread.interrupt()?   If
      // not, at least document why this is empty.
@@ -137,7 +131,7 @@ public class DrillResultSet extends AvaticaResultSet {
 
 
 
-    final LinkedBlockingDeque<QueryResultBatch> queue = Queues.newLinkedBlockingDeque();
+    final LinkedBlockingDeque<QueryDataBatch> queue = Queues.newLinkedBlockingDeque();
 
     // TODO:  Doc.:  Release what if what is first relative to what?
     private boolean releaseIfFirst() {
@@ -151,7 +145,6 @@ public class DrillResultSet extends AvaticaResultSet {
 
     @Override
     public void submissionFailed(RpcException ex) {
-      releaseIfFirst();
       this.ex = ex;
       completed = true;
       close();
@@ -159,13 +152,14 @@ public class DrillResultSet extends AvaticaResultSet {
     }
 
     @Override
-    public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
-      logger.debug("Result arrived {}", result);
+    public void queryCompleted() {
+      releaseIfFirst();
+      completed = true;
+    }
 
-      if (result.getHeader().hasQueryState() && result.getHeader().getQueryState() == QueryState.COMPLETED && result.getHeader().getRowCount() == 0) {
-        result.release();
-        return;
-      }
+    @Override
+    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+      logger.debug("Result arrived {}", result);
 
       // If we're in a closed state, just release the message.
       if (closed) {
@@ -182,20 +176,11 @@ public class DrillResultSet extends AvaticaResultSet {
         autoread = false;
       }
 
-      if (result.getHeader().getIsLastChunk()) {
-        completed = true;
-      }
-
-      if (result.getHeader().getErrorCount() > 0) {
-        submissionFailed(new RpcException(String.format("%s", result.getHeader().getErrorList())));
-      }
-
       releaseIfFirst();
-
     }
 
     // TODO:  Doc.:  Specify whether result can be null and what that means.
-    public QueryResultBatch getNext() throws RpcException, InterruptedException {
+    public QueryDataBatch getNext() throws RpcException, InterruptedException {
       while (true) {
         if (ex != null) {
           throw ex;
@@ -203,7 +188,7 @@ public class DrillResultSet extends AvaticaResultSet {
         if (completed && queue.isEmpty()) {
           return null;
         } else {
-          QueryResultBatch q = queue.poll(50, TimeUnit.MILLISECONDS);
+          QueryDataBatch q = queue.poll(50, TimeUnit.MILLISECONDS);
           if (q != null) {
             if (!autoread && queue.size() < MAX / 2) {
               autoread = true;
@@ -219,7 +204,7 @@ public class DrillResultSet extends AvaticaResultSet {
     void close() {
       closed = true;
       while (!queue.isEmpty()) {
-        QueryResultBatch qrb = queue.poll();
+        QueryDataBatch qrb = queue.poll();
         if (qrb != null && qrb.getData() != null) {
           qrb.getData().release();
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
index 68e86db..f72d5e1 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
@@ -1359,27 +1359,9 @@ public final class SchemaUserBitShared
                 if(message.hasQueryId())
                     output.writeObject(2, message.getQueryId(), org.apache.drill.exec.proto.SchemaUserBitShared.QueryId.WRITE, false);
 
-                if(message.hasIsLastChunk())
-                    output.writeBool(3, message.getIsLastChunk(), false);
-                if(message.hasRowCount())
-                    output.writeInt32(4, message.getRowCount(), false);
-                if(message.hasRecordsScan())
-                    output.writeInt64(5, message.getRecordsScan(), false);
-                if(message.hasRecordsError())
-                    output.writeInt64(6, message.getRecordsError(), false);
-                if(message.hasSubmissionTime())
-                    output.writeInt64(7, message.getSubmissionTime(), false);
-                for(org.apache.drill.exec.proto.UserBitShared.NodeStatus nodeStatus : message.getNodeStatusList())
-                    output.writeObject(8, nodeStatus, org.apache.drill.exec.proto.SchemaUserBitShared.NodeStatus.WRITE, true);
-
                 for(org.apache.drill.exec.proto.UserBitShared.DrillPBError error : message.getErrorList())
-                    output.writeObject(9, error, org.apache.drill.exec.proto.SchemaUserBitShared.DrillPBError.WRITE, true);
-
-                if(message.hasDef())
-                    output.writeObject(10, message.getDef(), org.apache.drill.exec.proto.SchemaUserBitShared.RecordBatchDef.WRITE, false);
+                    output.writeObject(3, error, org.apache.drill.exec.proto.SchemaUserBitShared.DrillPBError.WRITE, true);
 
-                if(message.hasSchemaChanged())
-                    output.writeBool(11, message.getSchemaChanged(), false);
             }
             public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.QueryResult message)
             {
@@ -1427,35 +1409,9 @@ public final class SchemaUserBitShared
 
                             break;
                         case 3:
-                            builder.setIsLastChunk(input.readBool());
-                            break;
-                        case 4:
-                            builder.setRowCount(input.readInt32());
-                            break;
-                        case 5:
-                            builder.setRecordsScan(input.readInt64());
-                            break;
-                        case 6:
-                            builder.setRecordsError(input.readInt64());
-                            break;
-                        case 7:
-                            builder.setSubmissionTime(input.readInt64());
-                            break;
-                        case 8:
-                            builder.addNodeStatus(input.mergeObject(org.apache.drill.exec.proto.UserBitShared.NodeStatus.newBuilder(), org.apache.drill.exec.proto.SchemaUserBitShared.NodeStatus.MERGE));
-
-                            break;
-                        case 9:
                             builder.addError(input.mergeObject(org.apache.drill.exec.proto.UserBitShared.DrillPBError.newBuilder(), org.apache.drill.exec.proto.SchemaUserBitShared.DrillPBError.MERGE));
 
                             break;
-                        case 10:
-                            builder.setDef(input.mergeObject(org.apache.drill.exec.proto.UserBitShared.RecordBatchDef.newBuilder(), org.apache.drill.exec.proto.SchemaUserBitShared.RecordBatchDef.MERGE));
-
-                            break;
-                        case 11:
-                            builder.setSchemaChanged(input.readBool());
-                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -1498,15 +1454,7 @@ public final class SchemaUserBitShared
             {
                 case 1: return "queryState";
                 case 2: return "queryId";
-                case 3: return "isLastChunk";
-                case 4: return "rowCount";
-                case 5: return "recordsScan";
-                case 6: return "recordsError";
-                case 7: return "submissionTime";
-                case 8: return "nodeStatus";
-                case 9: return "error";
-                case 10: return "def";
-                case 11: return "schemaChanged";
+                case 3: return "error";
                 default: return null;
             }
         }
@@ -1520,15 +1468,136 @@ public final class SchemaUserBitShared
         {
             fieldMap.put("queryState", 1);
             fieldMap.put("queryId", 2);
-            fieldMap.put("isLastChunk", 3);
-            fieldMap.put("rowCount", 4);
-            fieldMap.put("recordsScan", 5);
-            fieldMap.put("recordsError", 6);
-            fieldMap.put("submissionTime", 7);
-            fieldMap.put("nodeStatus", 8);
-            fieldMap.put("error", 9);
-            fieldMap.put("def", 10);
-            fieldMap.put("schemaChanged", 11);
+            fieldMap.put("error", 3);
+        }
+    }
+
+    public static final class QueryData
+    {
+        public static final org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.MessageSchema WRITE =
+            new org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.MessageSchema();
+        public static final org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.BuilderSchema MERGE =
+            new org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.BuilderSchema();
+        
+        public static class MessageSchema implements com.dyuproject.protostuff.Schema<org.apache.drill.exec.proto.UserBitShared.QueryData>
+        {
+            public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.exec.proto.UserBitShared.QueryData message) throws java.io.IOException
+            {
+                if(message.hasQueryId())
+                    output.writeObject(1, message.getQueryId(), org.apache.drill.exec.proto.SchemaUserBitShared.QueryId.WRITE, false);
+
+                if(message.hasRowCount())
+                    output.writeInt32(2, message.getRowCount(), false);
+                if(message.hasDef())
+                    output.writeObject(3, message.getDef(), org.apache.drill.exec.proto.SchemaUserBitShared.RecordBatchDef.WRITE, false);
+
+            }
+            public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.QueryData message)
+            {
+                return message.isInitialized();
+            }
+            public java.lang.String getFieldName(int number)
+            {
+                return org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.getFieldName(number);
+            }
+            public int getFieldNumber(java.lang.String name)
+            {
+                return org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.getFieldNumber(name);
+            }
+            public java.lang.Class<org.apache.drill.exec.proto.UserBitShared.QueryData> typeClass()
+            {
+                return org.apache.drill.exec.proto.UserBitShared.QueryData.class;
+            }
+            public java.lang.String messageName()
+            {
+                return org.apache.drill.exec.proto.UserBitShared.QueryData.class.getSimpleName();
+            }
+            public java.lang.String messageFullName()
+            {
+                return org.apache.drill.exec.proto.UserBitShared.QueryData.class.getName();
+            }
+            //unused
+            public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.exec.proto.UserBitShared.QueryData message) throws java.io.IOException {}
+            public org.apache.drill.exec.proto.UserBitShared.QueryData newMessage() { return null; }
+        }
+        public static class BuilderSchema implements com.dyuproject.protostuff.Schema<org.apache.drill.exec.proto.UserBitShared.QueryData.Builder>
+        {
+            public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.exec.proto.UserBitShared.QueryData.Builder builder) throws java.io.IOException
+            {
+                for(int number = input.readFieldNumber(this);; number = input.readFieldNumber(this))
+                {
+                    switch(number)
+                    {
+                        case 0:
+                            return;
+                        case 1:
+                            builder.setQueryId(input.mergeObject(org.apache.drill.exec.proto.UserBitShared.QueryId.newBuilder(), org.apache.drill.exec.proto.SchemaUserBitShared.QueryId.MERGE));
+
+                            break;
+                        case 2:
+                            builder.setRowCount(input.readInt32());
+                            break;
+                        case 3:
+                            builder.setDef(input.mergeObject(org.apache.drill.exec.proto.UserBitShared.RecordBatchDef.newBuilder(), org.apache.drill.exec.proto.SchemaUserBitShared.RecordBatchDef.MERGE));
+
+                            break;
+                        default:
+                            input.handleUnknownField(number, this);
+                    }
+                }
+            }
+            public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.QueryData.Builder builder)
+            {
+                return builder.isInitialized();
+            }
+            public org.apache.drill.exec.proto.UserBitShared.QueryData.Builder newMessage()
+            {
+                return org.apache.drill.exec.proto.UserBitShared.QueryData.newBuilder();
+            }
+            public java.lang.String getFieldName(int number)
+            {
+                return org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.getFieldName(number);
+            }
+            public int getFieldNumber(java.lang.String name)
+            {
+                return org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.getFieldNumber(name);
+            }
+            public java.lang.Class<org.apache.drill.exec.proto.UserBitShared.QueryData.Builder> typeClass()
+            {
+                return org.apache.drill.exec.proto.UserBitShared.QueryData.Builder.class;
+            }
+            public java.lang.String messageName()
+            {
+                return org.apache.drill.exec.proto.UserBitShared.QueryData.class.getSimpleName();
+            }
+            public java.lang.String messageFullName()
+            {
+                return org.apache.drill.exec.proto.UserBitShared.QueryData.class.getName();
+            }
+            //unused
+            public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.exec.proto.UserBitShared.QueryData.Builder builder) throws java.io.IOException {}
+        }
+        public static java.lang.String getFieldName(int number)
+        {
+            switch(number)
+            {
+                case 1: return "queryId";
+                case 2: return "rowCount";
+                case 3: return "def";
+                default: return null;
+            }
+        }
+        public static int getFieldNumber(java.lang.String name)
+        {
+            java.lang.Integer number = fieldMap.get(name);
+            return number == null ? 0 : number.intValue();
+        }
+        private static final java.util.HashMap<java.lang.String,java.lang.Integer> fieldMap = new java.util.HashMap<java.lang.String,java.lang.Integer>();
+        static
+        {
+            fieldMap.put("queryId", 1);
+            fieldMap.put("rowCount", 2);
+            fieldMap.put("def", 3);
         }
     }