You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/04/04 04:37:29 UTC
[3/9] drill git commit: DRILL-2498: Separate QueryResult into two
messages QueryResult and QueryData
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 9bc0552..64033a5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -39,14 +39,14 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.testing.ExceptionInjectionUtil;
import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOption;
import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOptions;
@@ -184,11 +184,11 @@ public class TestDrillbitResilience extends ExecTest {
private final RecordBatchLoader loader = new RecordBatchLoader(bufferAllocator);
@Override
- public void rowArrived(final QueryResultBatch queryResultBatch) {
+ public void rowArrived(final QueryDataBatch queryResultBatch) {
// load the single record
- final QueryResult queryResult = queryResultBatch.getHeader();
+ final QueryData queryData = queryResultBatch.getHeader();
try {
- loader.load(queryResult.getDef(), queryResultBatch.getData());
+ loader.load(queryData.getDef(), queryResultBatch.getData());
} catch(SchemaChangeException e) {
fail(e.toString());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 9999be0..acfb522 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -49,7 +49,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserServer;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.CachedSingleFileSystem;
@@ -145,11 +145,11 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
@Test
public void testNullableAgg() throws Exception {
- List<QueryResultBatch> result = testSqlWithResults("select sum(a) as total_sum from dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`");
+ List<QueryDataBatch> result = testSqlWithResults("select sum(a) as total_sum from dfs.`/tmp/parquet_with_nulls_should_sum_100000_nulls_first.parquet`");
assertEquals("Only expected one batch with data, and then the empty finishing batch.", 2, result.size());
RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
- QueryResultBatch b = result.get(0);
+ QueryDataBatch b = result.get(0);
loader.load(b.getHeader().getDef(), b.getData());
VectorWrapper vw = loader.getValueAccessorById(
@@ -163,11 +163,11 @@ public class ParquetRecordReaderTest extends BaseTestQuery{
@Test
public void testNullableFilter() throws Exception {
- List<QueryResultBatch> result = testSqlWithResults("select count(wr_return_quantity) as row_count from dfs.`/tmp/web_returns` where wr_return_quantity = 1");
+ List<QueryDataBatch> result = testSqlWithResults("select count(wr_return_quantity) as row_count from dfs.`/tmp/web_returns` where wr_return_quantity = 1");
assertEquals("Only expected one batch with data, and then the empty finishing batch.", 2, result.size());
RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
- QueryResultBatch b = result.get(0);
+ QueryDataBatch b = result.get(0);
loader.load(b.getHeader().getDef(), b.getData());
VectorWrapper vw = loader.getValueAccessorById(
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
index 52d5086..55f0d75 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.vector.ValueVector;
@@ -39,7 +39,7 @@ import com.google.common.base.Strings;
import com.google.common.util.concurrent.SettableFuture;
public class ParquetResultListener implements UserResultsListener {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetResultListener.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetResultListener.class);
private SettableFuture<Void> future = SettableFuture.create();
int count = 0;
@@ -65,6 +65,10 @@ public class ParquetResultListener implements UserResultsListener {
future.setException(ex);
}
+ @Override
+ public void queryCompleted() {
+ checkLastChunk();
+ }
private <T> void assertField(ValueVector valueVector, int index, TypeProtos.MinorType expectedMinorType, Object value, String name) {
assertField(valueVector, index, expectedMinorType, value, name, 0);
@@ -94,11 +98,8 @@ public class ParquetResultListener implements UserResultsListener {
}
@Override
- synchronized public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
+ synchronized public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
logger.debug("result arrived in test batch listener.");
- if(result.getHeader().getIsLastChunk()){
- future.set(null);
- }
int columnValCounter = 0;
FieldInfo currentField;
count += result.getHeader().getRowCount();
@@ -147,15 +148,12 @@ public class ParquetResultListener implements UserResultsListener {
printRowMajor(batchLoader);
}
batchCounter++;
- if(result.getHeader().getIsLastChunk()){
- checkLastChunk(batchLoader, result);
- }
batchLoader.clear();
result.release();
}
- public void checkLastChunk(RecordBatchLoader batchLoader, QueryResultBatch result) {
+ private void checkLastChunk() {
int recordsInBatch = -1;
// ensure the right number of columns was returned, especially important to ensure selective column read is working
if (testValues) {
@@ -173,8 +171,6 @@ public class ParquetResultListener implements UserResultsListener {
}
assert valuesChecked.keySet().size() > 0;
- batchLoader.clear();
- result.release();
future.set(null);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
index 6cb412c..882cdbd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetPhysicalPlan.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
@@ -43,7 +43,7 @@ import com.google.common.base.Stopwatch;
import com.google.common.io.Resources;
public class TestParquetPhysicalPlan extends ExecTest {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestParquetPhysicalPlan.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestParquetPhysicalPlan.class);
public String fileName = "parquet/parquet_scan_filter_union_screen_physical.json";
@@ -56,10 +56,10 @@ public class TestParquetPhysicalPlan extends ExecTest {
try (Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());) {
bit1.run();
client.connect();
- List<QueryResultBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
+ List<QueryDataBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL, Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
int count = 0;
- for (QueryResultBatch b : results) {
+ for (QueryDataBatch b : results) {
System.out.println(String.format("Got %d results", b.getHeader().getRowCount()));
count += b.getHeader().getRowCount();
loader.load(b.getHeader().getDef(), b.getData());
@@ -96,13 +96,15 @@ public class TestParquetPhysicalPlan extends ExecTest {
}
@Override
- public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
+ public void queryCompleted() {
+ latch.countDown();
+ }
+
+ @Override
+ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
int rows = result.getHeader().getRowCount();
System.out.println(String.format("Result batch arrived. Number of records: %d", rows));
count.addAndGet(rows);
- if (result.getHeader().getIsLastChunk()) {
- latch.countDown();
- }
result.release();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
index e5a2c94..3c1a38a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TestTextColumn.java
@@ -27,7 +27,7 @@ import org.apache.drill.BaseTestQuery;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.vector.ValueVector;
import org.junit.Test;
@@ -41,7 +41,7 @@ public class TestTextColumn extends BaseTestQuery{
@Test
public void testDefaultDelimiterColumnSelection() throws Exception {
- List<QueryResultBatch> batches = testSqlWithResults("SELECT columns[0] as entire_row " +
+ List<QueryDataBatch> batches = testSqlWithResults("SELECT columns[0] as entire_row " +
"from dfs_test.`[WORKING_PATH]/src/test/resources/store/text/data/letters.txt`");
List<List<String>> expectedOutput = Arrays.asList(
@@ -55,7 +55,7 @@ public class TestTextColumn extends BaseTestQuery{
@Test
public void testCsvColumnSelectionCommasInsideQuotes() throws Exception {
- List<QueryResultBatch> batches = testSqlWithResults("SELECT columns[0] as col1, columns[1] as col2, columns[2] as col3," +
+ List<QueryDataBatch> batches = testSqlWithResults("SELECT columns[0] as col1, columns[1] as col2, columns[2] as col3," +
"columns[3] as col4 from dfs_test.`[WORKING_PATH]/src/test/resources/store/text/data/letters.csv`");
List<List<String>> expectedOutput = Arrays.asList(
@@ -67,11 +67,11 @@ public class TestTextColumn extends BaseTestQuery{
validateOutput(expectedOutput, actualOutput);
}
- private List<List<String>> getOutput(List<QueryResultBatch> batches) throws SchemaChangeException {
+ private List<List<String>> getOutput(List<QueryDataBatch> batches) throws SchemaChangeException {
List<List<String>> output = new ArrayList<>();
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
int last = 0;
- for(QueryResultBatch batch : batches) {
+ for(QueryDataBatch batch : batches) {
int rows = batch.getHeader().getRowCount();
if(batch.getData() != null) {
loader.load(batch.getHeader().getDef(), batch.getData());
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
index fa43d55..5e781d2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/text/TextRecordReaderTest.java
@@ -25,7 +25,7 @@ import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.util.VectorUtil;
@@ -45,13 +45,13 @@ public class TextRecordReaderTest extends PopUnitTestBase {
bit1.run();
client.connect();
- List<QueryResultBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
+ List<QueryDataBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
Files.toString(
FileUtils.getResourceAsFile("/store/text/test.json"), Charsets.UTF_8)
.replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/store/text/data/regions.csv").toURI().toString()));
int count = 0;
RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator());
- for(QueryResultBatch b : results) {
+ for(QueryDataBatch b : results) {
if (b.getHeader().getRowCount() != 0) {
count += b.getHeader().getRowCount();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
index 437bbb5..d674d47 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/fn/TestJsonReaderWithSparseFiles.java
@@ -23,7 +23,7 @@ import java.util.Objects;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.util.JsonStringArrayList;
import org.apache.drill.exec.util.JsonStringHashMap;
import org.apache.drill.exec.vector.ValueVector;
@@ -82,15 +82,15 @@ public class TestJsonReaderWithSparseFiles extends BaseTestQuery {
}
protected void query(final String query, final Function<RecordBatchLoader> testBody) throws Exception {
- List<QueryResultBatch> batches = testSqlWithResults(query);
+ List<QueryDataBatch> batches = testSqlWithResults(query);
RecordBatchLoader loader = new RecordBatchLoader(client.getAllocator());
try {
// first batch at index 0 is empty and used for fast schema return. Load the second one for the tests
- QueryResultBatch batch = batches.get(0);
+ QueryDataBatch batch = batches.get(0);
loader.load(batch.getHeader().getDef(), batch.getData());
testBody.apply(loader);
} finally {
- for (QueryResultBatch batch:batches) {
+ for (QueryDataBatch batch:batches) {
batch.release();
}
loader.clear();
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java
index eedbf6f..656429e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexToJson.java
@@ -27,7 +27,7 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.junit.Test;
public class TestComplexToJson extends BaseTestQuery {
@@ -36,7 +36,7 @@ public class TestComplexToJson extends BaseTestQuery {
public void test() throws Exception {
DrillClient parent_client = client;
- List<QueryResultBatch> results;
+ List<QueryDataBatch> results;
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
client = new DrillClient(config, serviceSet.getCoordinator());
@@ -50,7 +50,7 @@ public class TestComplexToJson extends BaseTestQuery {
// with setSupportComplexTypes == false, the column mode should be REQUIRED
assertTrue(def.getField(0).getMajorType().getMode() == DataMode.REQUIRED);
loader.clear();
- for(QueryResultBatch result : results) {
+ for(QueryDataBatch result : results) {
result.release();
}
client.close();
@@ -66,7 +66,7 @@ public class TestComplexToJson extends BaseTestQuery {
// with setSupportComplexTypes == true, the column mode should be REPEATED
assertTrue(def.getField(0).getMajorType().getMode() == DataMode.REPEATED);
loader.clear();
- for(QueryResultBatch result : results) {
+ for(QueryDataBatch result : results) {
result.release();
}
client.close();
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index fe86192..3f69fd0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -36,7 +36,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.RepeatedBigIntVector;
import org.junit.Ignore;
@@ -48,7 +48,7 @@ import com.google.common.io.Files;
import org.junit.rules.TemporaryFolder;
public class TestJsonReader extends BaseTestQuery {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonReader.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonReader.class);
private static final boolean VERBOSE_DEBUG = false;
@@ -196,15 +196,15 @@ public class TestJsonReader extends BaseTestQuery {
@Test
public void readComplexWithStar() throws Exception {
- List<QueryResultBatch> results = testSqlWithResults("select * from cp.`/store/json/test_complex_read_with_star.json`");
- assertEquals(2, results.size());
+ List<QueryDataBatch> results = testSqlWithResults("select * from cp.`/store/json/test_complex_read_with_star.json`");
+ assertEquals(1, results.size());
RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
- QueryResultBatch batch = results.get(0);
+ QueryDataBatch batch = results.get(0);
assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
assertEquals(3, batchLoader.getSchema().getFieldCount());
- testExistentColumns(batchLoader, batch);
+ testExistentColumns(batchLoader);
batch.release();
batchLoader.clear();
@@ -249,24 +249,24 @@ public class TestJsonReader extends BaseTestQuery {
test("alter system set `store.json.all_text_mode` = false");
runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, rowCounts);
- List<QueryResultBatch> results = testPhysicalWithResults(queries[0]);
- assertEquals(2, results.size());
+ List<QueryDataBatch> results = testPhysicalWithResults(queries[0]);
+ assertEquals(1, results.size());
// "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", "`field_4`.`inner_1`"
RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
- QueryResultBatch batch = results.get(0);
+ QueryDataBatch batch = results.get(0);
assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
// this used to be five. It is now three. This is because the plan doesn't have a project.
// Scanners are not responsible for projecting non-existent columns (as long as they project one column)
assertEquals(3, batchLoader.getSchema().getFieldCount());
- testExistentColumns(batchLoader, batch);
+ testExistentColumns(batchLoader);
batch.release();
batchLoader.clear();
}
- private void testExistentColumns(RecordBatchLoader batchLoader, QueryResultBatch batch) throws SchemaChangeException {
+ private void testExistentColumns(RecordBatchLoader batchLoader) throws SchemaChangeException {
VectorWrapper<?> vw = batchLoader.getValueAccessorById(
RepeatedBigIntVector.class, //
batchLoader.getValueVectorId(SchemaPath.getCompoundPath("field_1")).getFieldIds() //
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
index b3c653f..dcea9bb 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
@@ -25,7 +25,7 @@ import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.junit.Test;
@@ -46,11 +46,11 @@ public class TestSpoolingBuffer extends ExecTest {
bit1.run();
client.connect();
- List<QueryResultBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
+ List<QueryDataBatch> results = client.runQuery(org.apache.drill.exec.proto.UserBitShared.QueryType.PHYSICAL,
Files.toString(FileUtils.getResourceAsFile("/work/batch/multiple_exchange.json"),
Charsets.UTF_8));
int count = 0;
- for(QueryResultBatch b : results) {
+ for(QueryDataBatch b : results) {
if (b.getHeader().getRowCount() != 0) {
count += b.getHeader().getRowCount();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
index cddd999..3b38a09 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
public class DrillCursor implements Cursor {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillCursor.class);
@@ -113,7 +113,7 @@ public class DrillCursor implements Cursor {
// Next index is not in current batch (including initial empty batch--
// (try to) get next batch.
try {
- QueryResultBatch qrb = resultsListener.getNext();
+ QueryDataBatch qrb = resultsListener.getNext();
recordBatchCount++;
while (qrb != null && qrb.getHeader().getRowCount() == 0 && !first) {
qrb.release();
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java
index 0ce33f4..fb27d2d 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java
@@ -19,8 +19,6 @@ package org.apache.drill.jdbc;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
@@ -30,24 +28,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
import net.hydromatic.avatica.AvaticaPrepareResult;
import net.hydromatic.avatica.AvaticaResultSet;
import net.hydromatic.avatica.AvaticaStatement;
-import net.hydromatic.avatica.Cursor;
-import net.hydromatic.avatica.Cursor.Accessor;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import com.google.common.collect.Queues;
public class DrillResultSet extends AvaticaResultSet {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSet.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSet.class);
SchemaChangeListener changeListener;
final ResultsListener resultslistener = new ResultsListener();
@@ -107,8 +102,7 @@ public class DrillResultSet extends AvaticaResultSet {
// don't return with metadata until we've achieved at least one return message.
try {
resultslistener.latch.await();
- boolean notAtEnd = cursor.next();
- assert notAtEnd;
+ cursor.next();
} catch (InterruptedException e) {
// TODO: Check: Should this call Thread.currentThread.interrupt()? If
// not, at least document why this is empty.
@@ -137,7 +131,7 @@ public class DrillResultSet extends AvaticaResultSet {
- final LinkedBlockingDeque<QueryResultBatch> queue = Queues.newLinkedBlockingDeque();
+ final LinkedBlockingDeque<QueryDataBatch> queue = Queues.newLinkedBlockingDeque();
// TODO: Doc.: Release what if what is first relative to what?
private boolean releaseIfFirst() {
@@ -151,7 +145,6 @@ public class DrillResultSet extends AvaticaResultSet {
@Override
public void submissionFailed(RpcException ex) {
- releaseIfFirst();
this.ex = ex;
completed = true;
close();
@@ -159,13 +152,14 @@ public class DrillResultSet extends AvaticaResultSet {
}
@Override
- public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
- logger.debug("Result arrived {}", result);
+ public void queryCompleted() {
+ releaseIfFirst();
+ completed = true;
+ }
- if (result.getHeader().hasQueryState() && result.getHeader().getQueryState() == QueryState.COMPLETED && result.getHeader().getRowCount() == 0) {
- result.release();
- return;
- }
+ @Override
+ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+ logger.debug("Result arrived {}", result);
// If we're in a closed state, just release the message.
if (closed) {
@@ -182,20 +176,11 @@ public class DrillResultSet extends AvaticaResultSet {
autoread = false;
}
- if (result.getHeader().getIsLastChunk()) {
- completed = true;
- }
-
- if (result.getHeader().getErrorCount() > 0) {
- submissionFailed(new RpcException(String.format("%s", result.getHeader().getErrorList())));
- }
-
releaseIfFirst();
-
}
// TODO: Doc.: Specify whether result can be null and what that means.
- public QueryResultBatch getNext() throws RpcException, InterruptedException {
+ public QueryDataBatch getNext() throws RpcException, InterruptedException {
while (true) {
if (ex != null) {
throw ex;
@@ -203,7 +188,7 @@ public class DrillResultSet extends AvaticaResultSet {
if (completed && queue.isEmpty()) {
return null;
} else {
- QueryResultBatch q = queue.poll(50, TimeUnit.MILLISECONDS);
+ QueryDataBatch q = queue.poll(50, TimeUnit.MILLISECONDS);
if (q != null) {
if (!autoread && queue.size() < MAX / 2) {
autoread = true;
@@ -219,7 +204,7 @@ public class DrillResultSet extends AvaticaResultSet {
void close() {
closed = true;
while (!queue.isEmpty()) {
- QueryResultBatch qrb = queue.poll();
+ QueryDataBatch qrb = queue.poll();
if (qrb != null && qrb.getData() != null) {
qrb.getData().release();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
index 68e86db..f72d5e1 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaUserBitShared.java
@@ -1359,27 +1359,9 @@ public final class SchemaUserBitShared
if(message.hasQueryId())
output.writeObject(2, message.getQueryId(), org.apache.drill.exec.proto.SchemaUserBitShared.QueryId.WRITE, false);
- if(message.hasIsLastChunk())
- output.writeBool(3, message.getIsLastChunk(), false);
- if(message.hasRowCount())
- output.writeInt32(4, message.getRowCount(), false);
- if(message.hasRecordsScan())
- output.writeInt64(5, message.getRecordsScan(), false);
- if(message.hasRecordsError())
- output.writeInt64(6, message.getRecordsError(), false);
- if(message.hasSubmissionTime())
- output.writeInt64(7, message.getSubmissionTime(), false);
- for(org.apache.drill.exec.proto.UserBitShared.NodeStatus nodeStatus : message.getNodeStatusList())
- output.writeObject(8, nodeStatus, org.apache.drill.exec.proto.SchemaUserBitShared.NodeStatus.WRITE, true);
-
for(org.apache.drill.exec.proto.UserBitShared.DrillPBError error : message.getErrorList())
- output.writeObject(9, error, org.apache.drill.exec.proto.SchemaUserBitShared.DrillPBError.WRITE, true);
-
- if(message.hasDef())
- output.writeObject(10, message.getDef(), org.apache.drill.exec.proto.SchemaUserBitShared.RecordBatchDef.WRITE, false);
+ output.writeObject(3, error, org.apache.drill.exec.proto.SchemaUserBitShared.DrillPBError.WRITE, true);
- if(message.hasSchemaChanged())
- output.writeBool(11, message.getSchemaChanged(), false);
}
public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.QueryResult message)
{
@@ -1427,35 +1409,9 @@ public final class SchemaUserBitShared
break;
case 3:
- builder.setIsLastChunk(input.readBool());
- break;
- case 4:
- builder.setRowCount(input.readInt32());
- break;
- case 5:
- builder.setRecordsScan(input.readInt64());
- break;
- case 6:
- builder.setRecordsError(input.readInt64());
- break;
- case 7:
- builder.setSubmissionTime(input.readInt64());
- break;
- case 8:
- builder.addNodeStatus(input.mergeObject(org.apache.drill.exec.proto.UserBitShared.NodeStatus.newBuilder(), org.apache.drill.exec.proto.SchemaUserBitShared.NodeStatus.MERGE));
-
- break;
- case 9:
builder.addError(input.mergeObject(org.apache.drill.exec.proto.UserBitShared.DrillPBError.newBuilder(), org.apache.drill.exec.proto.SchemaUserBitShared.DrillPBError.MERGE));
break;
- case 10:
- builder.setDef(input.mergeObject(org.apache.drill.exec.proto.UserBitShared.RecordBatchDef.newBuilder(), org.apache.drill.exec.proto.SchemaUserBitShared.RecordBatchDef.MERGE));
-
- break;
- case 11:
- builder.setSchemaChanged(input.readBool());
- break;
default:
input.handleUnknownField(number, this);
}
@@ -1498,15 +1454,7 @@ public final class SchemaUserBitShared
{
case 1: return "queryState";
case 2: return "queryId";
- case 3: return "isLastChunk";
- case 4: return "rowCount";
- case 5: return "recordsScan";
- case 6: return "recordsError";
- case 7: return "submissionTime";
- case 8: return "nodeStatus";
- case 9: return "error";
- case 10: return "def";
- case 11: return "schemaChanged";
+ case 3: return "error";
default: return null;
}
}
@@ -1520,15 +1468,136 @@ public final class SchemaUserBitShared
{
fieldMap.put("queryState", 1);
fieldMap.put("queryId", 2);
- fieldMap.put("isLastChunk", 3);
- fieldMap.put("rowCount", 4);
- fieldMap.put("recordsScan", 5);
- fieldMap.put("recordsError", 6);
- fieldMap.put("submissionTime", 7);
- fieldMap.put("nodeStatus", 8);
- fieldMap.put("error", 9);
- fieldMap.put("def", 10);
- fieldMap.put("schemaChanged", 11);
+ fieldMap.put("error", 3);
+ }
+ }
+
+ public static final class QueryData
+ {
+ public static final org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.MessageSchema WRITE =
+ new org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.MessageSchema();
+ public static final org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.BuilderSchema MERGE =
+ new org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.BuilderSchema();
+
+ public static class MessageSchema implements com.dyuproject.protostuff.Schema<org.apache.drill.exec.proto.UserBitShared.QueryData>
+ {
+ public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.exec.proto.UserBitShared.QueryData message) throws java.io.IOException
+ {
+ if(message.hasQueryId())
+ output.writeObject(1, message.getQueryId(), org.apache.drill.exec.proto.SchemaUserBitShared.QueryId.WRITE, false);
+
+ if(message.hasRowCount())
+ output.writeInt32(2, message.getRowCount(), false);
+ if(message.hasDef())
+ output.writeObject(3, message.getDef(), org.apache.drill.exec.proto.SchemaUserBitShared.RecordBatchDef.WRITE, false);
+
+ }
+ public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.QueryData message)
+ {
+ return message.isInitialized();
+ }
+ public java.lang.String getFieldName(int number)
+ {
+ return org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.getFieldName(number);
+ }
+ public int getFieldNumber(java.lang.String name)
+ {
+ return org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.getFieldNumber(name);
+ }
+ public java.lang.Class<org.apache.drill.exec.proto.UserBitShared.QueryData> typeClass()
+ {
+ return org.apache.drill.exec.proto.UserBitShared.QueryData.class;
+ }
+ public java.lang.String messageName()
+ {
+ return org.apache.drill.exec.proto.UserBitShared.QueryData.class.getSimpleName();
+ }
+ public java.lang.String messageFullName()
+ {
+ return org.apache.drill.exec.proto.UserBitShared.QueryData.class.getName();
+ }
+ //unused
+ public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.exec.proto.UserBitShared.QueryData message) throws java.io.IOException {}
+ public org.apache.drill.exec.proto.UserBitShared.QueryData newMessage() { return null; }
+ }
+ public static class BuilderSchema implements com.dyuproject.protostuff.Schema<org.apache.drill.exec.proto.UserBitShared.QueryData.Builder>
+ {
+ public void mergeFrom(com.dyuproject.protostuff.Input input, org.apache.drill.exec.proto.UserBitShared.QueryData.Builder builder) throws java.io.IOException
+ {
+ for(int number = input.readFieldNumber(this);; number = input.readFieldNumber(this))
+ {
+ switch(number)
+ {
+ case 0:
+ return;
+ case 1:
+ builder.setQueryId(input.mergeObject(org.apache.drill.exec.proto.UserBitShared.QueryId.newBuilder(), org.apache.drill.exec.proto.SchemaUserBitShared.QueryId.MERGE));
+
+ break;
+ case 2:
+ builder.setRowCount(input.readInt32());
+ break;
+ case 3:
+ builder.setDef(input.mergeObject(org.apache.drill.exec.proto.UserBitShared.RecordBatchDef.newBuilder(), org.apache.drill.exec.proto.SchemaUserBitShared.RecordBatchDef.MERGE));
+
+ break;
+ default:
+ input.handleUnknownField(number, this);
+ }
+ }
+ }
+ public boolean isInitialized(org.apache.drill.exec.proto.UserBitShared.QueryData.Builder builder)
+ {
+ return builder.isInitialized();
+ }
+ public org.apache.drill.exec.proto.UserBitShared.QueryData.Builder newMessage()
+ {
+ return org.apache.drill.exec.proto.UserBitShared.QueryData.newBuilder();
+ }
+ public java.lang.String getFieldName(int number)
+ {
+ return org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.getFieldName(number);
+ }
+ public int getFieldNumber(java.lang.String name)
+ {
+ return org.apache.drill.exec.proto.SchemaUserBitShared.QueryData.getFieldNumber(name);
+ }
+ public java.lang.Class<org.apache.drill.exec.proto.UserBitShared.QueryData.Builder> typeClass()
+ {
+ return org.apache.drill.exec.proto.UserBitShared.QueryData.Builder.class;
+ }
+ public java.lang.String messageName()
+ {
+ return org.apache.drill.exec.proto.UserBitShared.QueryData.class.getSimpleName();
+ }
+ public java.lang.String messageFullName()
+ {
+ return org.apache.drill.exec.proto.UserBitShared.QueryData.class.getName();
+ }
+ //unused
+ public void writeTo(com.dyuproject.protostuff.Output output, org.apache.drill.exec.proto.UserBitShared.QueryData.Builder builder) throws java.io.IOException {}
+ }
+ public static java.lang.String getFieldName(int number)
+ {
+ switch(number)
+ {
+ case 1: return "queryId";
+ case 2: return "rowCount";
+ case 3: return "def";
+ default: return null;
+ }
+ }
+ public static int getFieldNumber(java.lang.String name)
+ {
+ java.lang.Integer number = fieldMap.get(name);
+ return number == null ? 0 : number.intValue();
+ }
+ private static final java.util.HashMap<java.lang.String,java.lang.Integer> fieldMap = new java.util.HashMap<java.lang.String,java.lang.Integer>();
+ static
+ {
+ fieldMap.put("queryId", 1);
+ fieldMap.put("rowCount", 2);
+ fieldMap.put("def", 3);
}
}