You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2015/04/04 04:37:31 UTC
[5/9] drill git commit: DRILL-2498: Separate QueryResult into two
messages QueryResult and QueryData
DRILL-2498: Separate QueryResult into two messages QueryResult and QueryData
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/1d9d82b0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/1d9d82b0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/1d9d82b0
Branch: refs/heads/master
Commit: 1d9d82b001810605e3f94ab3a5517dc0ed739715
Parents: 10be89f
Author: adeneche <ad...@gmail.com>
Authored: Fri Mar 20 12:39:15 2015 -0700
Committer: Parth Chandra <pc...@maprtech.com>
Committed: Fri Apr 3 18:27:50 2015 -0700
----------------------------------------------------------------------
.../org/apache/drill/hbase/BaseHBaseTest.java | 10 +-
.../drill/hbase/TestHBaseCFAsJSONString.java | 5 +-
.../apache/drill/exec/fn/hive/TestHiveUDFs.java | 12 +-
.../drill/exec/fn/hive/TestSampleHiveUDFs.java | 4 +-
.../apache/drill/exec/client/DrillClient.java | 34 +-
.../exec/client/PrintingResultsListener.java | 31 +-
.../drill/exec/physical/impl/ScreenCreator.java | 67 +-
.../impl/materialize/QueryWritableBatch.java | 8 +-
.../impl/materialize/RecordMaterializer.java | 2 +-
.../materialize/VectorRecordMaterializer.java | 8 +-
.../drill/exec/rpc/user/QueryDataBatch.java | 62 +
.../drill/exec/rpc/user/QueryResultBatch.java | 62 -
.../drill/exec/rpc/user/QueryResultHandler.java | 261 +--
.../apache/drill/exec/rpc/user/UserClient.java | 14 +-
.../exec/rpc/user/UserResultsListener.java | 28 +-
.../drill/exec/rpc/user/UserRpcConfig.java | 4 +-
.../apache/drill/exec/rpc/user/UserServer.java | 16 +-
.../drill/exec/server/rest/QueryWrapper.java | 15 +-
.../apache/drill/exec/work/foreman/Foreman.java | 4 +-
.../java/org/apache/drill/BaseTestQuery.java | 38 +-
.../java/org/apache/drill/DrillTestWrapper.java | 36 +-
.../java/org/apache/drill/PlanTestBase.java | 6 +-
.../java/org/apache/drill/QueryTestUtil.java | 8 +-
.../org/apache/drill/SingleRowListener.java | 43 +-
.../drill/exec/TestQueriesOnLargeFile.java | 10 +-
.../exec/client/DrillClientSystemTest.java | 10 +-
.../exec/fn/impl/TestAggregateFunction.java | 8 +-
.../drill/exec/fn/impl/TestDateFunctions.java | 8 +-
.../drill/exec/fn/impl/TestMultiInputAdd.java | 11 +-
.../exec/fn/impl/TestNewAggregateFunctions.java | 8 +-
.../physical/impl/TestBroadcastExchange.java | 10 +-
.../exec/physical/impl/TestCastFunctions.java | 8 +-
.../physical/impl/TestCastVarCharToBigInt.java | 10 +-
.../physical/impl/TestConvertFunctions.java | 11 +-
.../drill/exec/physical/impl/TestDecimal.java | 38 +-
.../impl/TestDistributedFragmentRun.java | 18 +-
.../physical/impl/TestExtractFunctions.java | 8 +-
.../physical/impl/TestHashToRandomExchange.java | 6 +-
.../exec/physical/impl/TestOptiqPlans.java | 22 +-
.../physical/impl/TestReverseImplicitCast.java | 8 +-
.../physical/impl/TestSimpleFragmentRun.java | 10 +-
.../exec/physical/impl/TestUnionExchange.java | 6 +-
.../exec/physical/impl/TopN/TestSimpleTopN.java | 8 +-
.../exec/physical/impl/join/TestHashJoin.java | 26 +-
.../exec/physical/impl/join/TestMergeJoin.java | 18 +-
.../impl/join/TestMergeJoinMulCondition.java | 14 +-
.../impl/mergereceiver/TestMergingReceiver.java | 14 +-
.../TestOrderedPartitionExchange.java | 6 +-
.../physical/impl/writer/TestParquetWriter.java | 2 +-
.../exec/physical/impl/writer/TestWriter.java | 12 +-
.../impl/xsort/TestSimpleExternalSort.java | 26 +-
.../drill/exec/record/vector/TestDateTypes.java | 40 +-
.../exec/server/TestDrillbitResilience.java | 10 +-
.../store/parquet/ParquetRecordReaderTest.java | 10 +-
.../store/parquet/ParquetResultListener.java | 20 +-
.../store/parquet/TestParquetPhysicalPlan.java | 18 +-
.../drill/exec/store/text/TestTextColumn.java | 10 +-
.../exec/store/text/TextRecordReaderTest.java | 6 +-
.../fn/TestJsonReaderWithSparseFiles.java | 8 +-
.../complex/writer/TestComplexToJson.java | 8 +-
.../vector/complex/writer/TestJsonReader.java | 22 +-
.../exec/work/batch/TestSpoolingBuffer.java | 6 +-
.../java/org/apache/drill/jdbc/DrillCursor.java | 4 +-
.../org/apache/drill/jdbc/DrillResultSet.java | 43 +-
.../drill/exec/proto/SchemaUserBitShared.java | 195 +-
.../apache/drill/exec/proto/UserBitShared.java | 1948 ++++++++----------
.../org/apache/drill/exec/proto/UserProtos.java | 47 +-
.../drill/exec/proto/beans/QueryData.java | 211 ++
.../drill/exec/proto/beans/QueryResult.java | 194 +-
.../apache/drill/exec/proto/beans/RpcType.java | 8 +-
protocol/src/main/protobuf/User.proto | 8 +-
protocol/src/main/protobuf/UserBitShared.proto | 22 +-
72 files changed, 1898 insertions(+), 2054 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
index b955d3b..df83c56 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/BaseHBaseTest.java
@@ -23,7 +23,7 @@ import java.util.List;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.hbase.HBaseStoragePlugin;
import org.apache.drill.exec.store.hbase.HBaseStoragePluginConfig;
@@ -76,22 +76,22 @@ public class BaseHBaseTest extends BaseTestQuery {
protected void runHBasePhysicalVerifyCount(String planFile, String tableName, int expectedRowCount) throws Exception{
String physicalPlan = getPlanText(planFile, tableName);
- List<QueryResultBatch> results = testPhysicalWithResults(physicalPlan);
+ List<QueryDataBatch> results = testPhysicalWithResults(physicalPlan);
printResultAndVerifyRowCount(results, expectedRowCount);
}
- protected List<QueryResultBatch> runHBaseSQLlWithResults(String sql) throws Exception {
+ protected List<QueryDataBatch> runHBaseSQLlWithResults(String sql) throws Exception {
sql = canonizeHBaseSQL(sql);
System.out.println("Running query:\n" + sql);
return testSqlWithResults(sql);
}
protected void runHBaseSQLVerifyCount(String sql, int expectedRowCount) throws Exception{
- List<QueryResultBatch> results = runHBaseSQLlWithResults(sql);
+ List<QueryDataBatch> results = runHBaseSQLlWithResults(sql);
printResultAndVerifyRowCount(results, expectedRowCount);
}
- private void printResultAndVerifyRowCount(List<QueryResultBatch> results, int expectedRowCount) throws SchemaChangeException {
+ private void printResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount) throws SchemaChangeException {
int rowCount = printResult(results);
if (expectedRowCount != -1) {
Assert.assertEquals(expectedRowCount, rowCount);
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
index 6fe1525..7873b80 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseCFAsJSONString.java
@@ -21,7 +21,8 @@ import java.io.IOException;
import java.util.List;
import org.apache.drill.exec.client.DrillClient;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -49,7 +50,7 @@ public class TestHBaseCFAsJSONString extends BaseHBaseTest {
@Test
public void testColumnFamiliesAsJSONString() throws Exception {
setColumnWidths(new int[] {112, 12});
- List<QueryResultBatch> resultList = runHBaseSQLlWithResults("SELECT f, f2 FROM hbase.`[TABLE_NAME]` tableName LIMIT 1");
+ List<QueryDataBatch> resultList = runHBaseSQLlWithResults("SELECT f, f2 FROM hbase.`[TABLE_NAME]` tableName LIMIT 1");
printResult(resultList);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestHiveUDFs.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestHiveUDFs.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestHiveUDFs.java
index e134aac..3ce9a6d 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestHiveUDFs.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestHiveUDFs.java
@@ -24,10 +24,8 @@ import java.util.List;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
-import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.vector.Float4Vector;
-import org.apache.drill.exec.vector.NullableBigIntVector;
import org.apache.drill.exec.vector.NullableFloat8Vector;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.drill.exec.vector.NullableVar16CharVector;
@@ -44,10 +42,10 @@ public class TestHiveUDFs extends BaseTestQuery {
int numRecords = 0;
String planString = Resources.toString(Resources.getResource("functions/hive/GenericUDF.json"), Charsets.UTF_8);
- List<QueryResultBatch> results = testPhysicalWithResults(planString);
+ List<QueryDataBatch> results = testPhysicalWithResults(planString);
RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
- for (QueryResultBatch result : results) {
+ for (QueryDataBatch result : results) {
batchLoader.load(result.getHeader().getDef(), result.getData());
if (batchLoader.getRecordCount() <= 0) {
result.release();
@@ -115,10 +113,10 @@ public class TestHiveUDFs extends BaseTestQuery {
public void testUDF() throws Throwable {
int numRecords = 0;
String planString = Resources.toString(Resources.getResource("functions/hive/UDF.json"), Charsets.UTF_8);
- List<QueryResultBatch> results = testPhysicalWithResults(planString);
+ List<QueryDataBatch> results = testPhysicalWithResults(planString);
RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
- for (QueryResultBatch result : results) {
+ for (QueryDataBatch result : results) {
batchLoader.load(result.getHeader().getDef(), result.getData());
if (batchLoader.getRecordCount() <= 0) {
result.release();
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java
index 9ef766f..f4b6351 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/fn/hive/TestSampleHiveUDFs.java
@@ -22,14 +22,14 @@ import static org.junit.Assert.assertTrue;
import java.util.List;
import org.apache.drill.exec.hive.HiveTestBase;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.junit.Ignore;
import org.junit.Test;
public class TestSampleHiveUDFs extends HiveTestBase {
private void helper(String query, String expected) throws Exception {
- List<QueryResultBatch> results = testSqlWithResults(query);
+ List<QueryDataBatch> results = testSqlWithResults(query);
String actual = getResultString(results, ",");
assertTrue(String.format("Result:\n%s\ndoes not match:\n%s", actual, expected), expected.equals(actual));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 6d4c86c..650a2eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -40,7 +40,6 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.proto.UserProtos.Property;
@@ -54,7 +53,7 @@ import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.TransportCheck;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserClient;
import org.apache.drill.exec.rpc.user.UserResultsListener;
@@ -66,7 +65,7 @@ import com.google.common.util.concurrent.SettableFuture;
* String into ByteBuf.
*/
public class DrillClient implements Closeable, ConnectionThrottle {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillClient.class);
DrillConfig config;
private UserClient client;
@@ -249,7 +248,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
* @return a handle for the query result
* @throws RpcException
*/
- public List<QueryResultBatch> runQuery(QueryType type, String plan) throws RpcException {
+ public List<QueryDataBatch> runQuery(QueryType type, String plan) throws RpcException {
UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build();
ListHoldingResultsListener listener = new ListHoldingResultsListener(query);
client.submitQuery(listener, query);
@@ -294,8 +293,8 @@ public class DrillClient implements Closeable, ConnectionThrottle {
}
private class ListHoldingResultsListener implements UserResultsListener {
- private Vector<QueryResultBatch> results = new Vector<>();
- private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create();
+ private Vector<QueryDataBatch> results = new Vector<>();
+ private SettableFuture<List<QueryDataBatch>> future = SettableFuture.create();
private UserProtos.RunQuery query ;
public ListHoldingResultsListener(UserProtos.RunQuery query) {
@@ -321,6 +320,11 @@ public class DrillClient implements Closeable, ConnectionThrottle {
}
}
+ @Override
+ public void queryCompleted() {
+ future.set(results);
+ }
+
private void fail(Exception ex) {
logger.debug("Submission failed.", ex);
future.setException(ex);
@@ -328,24 +332,12 @@ public class DrillClient implements Closeable, ConnectionThrottle {
}
@Override
- public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
- logger.debug(
- "Result arrived: Query state: {}. Is last chunk: {}. Result: {}",
- result.getHeader().getQueryState(),
- result.getHeader().getIsLastChunk(),
- result );
+ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+ logger.debug("Result arrived: Result: {}", result );
results.add(result);
- if (result.getHeader().getIsLastChunk()) {
- future.set(results);
- }
- else {
- assert QueryState.PENDING == result.getHeader().getQueryState()
- : "For non-last chunk, expected query state of PENDING but got "
- + result.getHeader().getQueryState();
- }
}
- public List<QueryResultBatch> getResults() throws RpcException{
+ public List<QueryDataBatch> getResults() throws RpcException{
try {
return future.get();
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
index 926e703..98948af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
@@ -20,16 +20,18 @@ package org.apache.drill.exec.client;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
+import io.netty.buffer.DrillBuf;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.client.QuerySubmitter.Format;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.util.VectorUtil;
@@ -58,12 +60,21 @@ public class PrintingResultsListener implements UserResultsListener {
}
@Override
- public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
- int rows = result.getHeader().getRowCount();
- if (result.getData() != null) {
- count.addAndGet(rows);
+ public void queryCompleted() {
+ allocator.close();
+ latch.countDown();
+ System.out.println("Total rows returned: " + count.get());
+ }
+
+ @Override
+ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+ final QueryData header = result.getHeader();
+ final DrillBuf data = result.getData();
+
+ if (data != null) {
+ count.addAndGet(header.getRowCount());
try {
- loader.load(result.getHeader().getDef(), result.getData());
+ loader.load(header.getDef(), data);
} catch (SchemaChangeException e) {
submissionFailed(new RpcException(e));
}
@@ -82,15 +93,7 @@ public class PrintingResultsListener implements UserResultsListener {
loader.clear();
}
- boolean isLastChunk = result.getHeader().getIsLastChunk();
result.release();
-
- if (isLastChunk) {
- allocator.close();
- latch.countDown();
- System.out.println("Total rows returned: " + count.get());
- }
-
}
public int await() throws Exception {
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 404c453..8038527 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -31,8 +31,7 @@ import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
@@ -44,7 +43,7 @@ import org.apache.drill.exec.work.ErrorHelper;
import com.google.common.base.Preconditions;
public class ScreenCreator implements RootCreator<Screen>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
@@ -67,6 +66,8 @@ public class ScreenCreator implements RootCreator<Screen>{
final UserClientConnection connection;
private RecordMaterializer materializer;
+ private boolean firstBatch = true;
+
public enum Metric implements MetricDef {
BYTES_SENT;
@@ -96,67 +97,45 @@ public class ScreenCreator implements RootCreator<Screen>{
IterOutcome outcome = next(incoming);
logger.trace("Screen Outcome {}", outcome);
switch (outcome) {
- case STOP: {
+ case STOP:
this.internalStop();
- boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
- QueryResult header = QueryResult.newBuilder() //
- .setQueryId(context.getHandle().getQueryId()) //
- .setRowCount(0) //
- .setQueryState(QueryState.FAILED)
- .addError(ErrorHelper.logAndConvertMessageError(context.getIdentity(), "Query stopped.",
- context.getFailureCause(), logger, verbose))
- .setDef(RecordBatchDef.getDefaultInstance()) //
- .setIsLastChunk(true) //
- .build();
- QueryWritableBatch batch = new QueryWritableBatch(header);
+ return false;
+ case NONE:
+ if (firstBatch) {
+ // this is the only data message sent to the client and may contain the schema
+ this.internalStop();
+ QueryWritableBatch batch;
+ QueryData header = QueryData.newBuilder() //
+ .setQueryId(context.getHandle().getQueryId()) //
+ .setRowCount(0) //
+ .setDef(RecordBatchDef.getDefaultInstance()) //
+ .build();
+ batch = new QueryWritableBatch(header);
+
stats.startWait();
try {
- connection.sendResult(listener, batch);
+ connection.sendData(listener, batch);
} finally {
stats.stopWait();
}
+ firstBatch = false; // we don't really need to set this. But who knows!
sendCount.increment();
-
- return false;
- }
- case NONE: {
- this.internalStop();
- QueryWritableBatch batch;
- //TODO: At some point we should make this the last message.
- //For the moment though, to detect memory leaks, we need to delay sending the
- //COMPLETED message until the Foreman calls cleanup.
- QueryResult header = QueryResult.newBuilder() //
- .setQueryId(context.getHandle().getQueryId()) //
- .setRowCount(0) //
- //.setQueryState(QueryState.COMPLETED) //
- .setDef(RecordBatchDef.getDefaultInstance()) //
- .setIsLastChunk(true) //
- .build();
- batch = new QueryWritableBatch(header);
- stats.startWait();
- try {
- connection.sendResult(listener, batch);
- } finally {
- stats.stopWait();
}
- sendCount.increment();
return false;
- }
case OK_NEW_SCHEMA:
materializer = new VectorRecordMaterializer(context, incoming);
//$FALL-THROUGH$
case OK:
-// context.getStats().batchesCompleted.inc(1);
-// context.getStats().recordsCompleted.inc(incoming.getRecordCount());
- QueryWritableBatch batch = materializer.convertNext(false);
+ QueryWritableBatch batch = materializer.convertNext();
updateStats(batch);
stats.startWait();
try {
- connection.sendResult(listener, batch);
+ connection.sendData(listener, batch);
} finally {
stats.stopWait();
}
+ firstBatch = false;
sendCount.increment();
return true;
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index 2a59e22..44a3489 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -21,15 +21,15 @@ import io.netty.buffer.ByteBuf;
import java.util.Arrays;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
public class QueryWritableBatch {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
- private final QueryResult header;
+ private final QueryData header;
private final ByteBuf[] buffers;
- public QueryWritableBatch(QueryResult header, ByteBuf... buffers) {
+ public QueryWritableBatch(QueryData header, ByteBuf... buffers) {
this.header = header;
this.buffers = buffers;
}
@@ -46,7 +46,7 @@ public class QueryWritableBatch {
return n;
}
- public QueryResult getHeader() {
+ public QueryData getHeader() {
return header;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
index 221fc34..acb17ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
@@ -20,6 +20,6 @@ package org.apache.drill.exec.physical.impl.materialize;
public interface RecordMaterializer {
- public QueryWritableBatch convertNext(boolean isLast);
+ public QueryWritableBatch convertNext();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
index cc1b3bf..3933ddd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.physical.impl.materialize;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.WritableBatch;
@@ -41,14 +41,14 @@ public class VectorRecordMaterializer implements RecordMaterializer{
// }
}
- public QueryWritableBatch convertNext(boolean isLast) {
+ public QueryWritableBatch convertNext() {
//batch.getWritableBatch().getDef().getRecordCount()
WritableBatch w = batch.getWritableBatch();
- QueryResult header = QueryResult.newBuilder() //
+ QueryData header = QueryData.newBuilder() //
.setQueryId(queryId) //
.setRowCount(batch.getRecordCount()) //
- .setDef(w.getDef()).setIsLastChunk(isLast).build();
+ .setDef(w.getDef()).build();
QueryWritableBatch batch = new QueryWritableBatch(header, w.getBuffers());
return batch;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java
new file mode 100644
index 0000000..914bd00
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryDataBatch.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.DrillBuf;
+
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
+
+public class QueryDataBatch {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultBatch.class);
+
+ private final QueryData header;
+ private final DrillBuf data;
+
+ public QueryDataBatch(QueryData header, DrillBuf data) {
+// logger.debug("New Result Batch with header {} and data {}", header, data);
+ this.header = header;
+ this.data = data;
+ if (this.data != null) {
+ data.retain();
+ }
+ }
+
+ public QueryData getHeader() {
+ return header;
+ }
+
+ public DrillBuf getData() {
+ return data;
+ }
+
+ public boolean hasData() {
+ return data != null;
+ }
+
+ public void release() {
+ if (data != null) {
+ data.release();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "QueryResultBatch [header=" + header + ", data=" + data + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
deleted file mode 100644
index ab4c9ef..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc.user;
-
-import io.netty.buffer.DrillBuf;
-
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-
-public class QueryResultBatch {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultBatch.class);
-
- private final QueryResult header;
- private final DrillBuf data;
-
- public QueryResultBatch(QueryResult header, DrillBuf data) {
-// logger.debug("New Result Batch with header {} and data {}", header, data);
- this.header = header;
- this.data = data;
- if (this.data != null) {
- data.retain();
- }
- }
-
- public QueryResult getHeader() {
- return header;
- }
-
- public DrillBuf getData() {
- return data;
- }
-
- public boolean hasData() {
- return data != null;
- }
-
- public void release() {
- if (data != null) {
- data.release();
- }
- }
-
- @Override
- public String toString() {
- return "QueryResultBatch [header=" + header + ", data=" + data + "]";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index c05b127..a1be83b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -20,17 +20,13 @@ package org.apache.drill.exec.rpc.user;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import javax.annotation.Nullable;
-
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcBus;
@@ -65,179 +61,137 @@ public class QueryResultHandler {
private final ConcurrentMap<QueryId, UserResultsListener> queryIdToResultsListenersMap =
Maps.newConcurrentMap();
- /**
- * Any is-last-chunk batch being deferred until the next batch
- * (normally one with COMPLETED) arrives, per active query.
- * <ul>
- * <li>Last-chunk batch is added (and not passed on) when it arrives.</li>
- * <li>Last-chunk batch is removed (and passed on) when next batch arrives
- * and has state {@link QueryState.COMPLETED}.</li>
- * <li>Last-chunk batch is removed (and not passed on) when next batch
- * arrives and has state {@link QueryState.CANCELED} or
- * {@link QueryState.FAILED}.</li>
- * </ul>
- */
- private final Map<QueryId, QueryResultBatch> queryIdToDeferredLastChunkBatchesMap =
- new ConcurrentHashMap<>();
-
-
public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener resultsListener) {
return new SubmissionListener(resultsListener);
}
/**
- * Maps internal low-level API protocol to {@link UserResultsListener}-level
- * API protocol, deferring sending is-last-chunk batches until (internal)
- * COMPLETED batch.
+ * Maps internal low-level API protocol to {@link UserResultsListener}-level API protocol.
+ * handles data result messages
*/
- public void batchArrived( ConnectionThrottle throttle,
- ByteBuf pBody, ByteBuf dBody ) throws RpcException {
+ public void resultArrived( ByteBuf pBody ) throws RpcException {
final QueryResult queryResult = RpcBus.get( pBody, QueryResult.PARSER );
- // Current batch coming in. (Not necessarily passed along now or ever.)
- final QueryResultBatch inputBatch = new QueryResultBatch( queryResult,
- (DrillBuf) dBody );
final QueryId queryId = queryResult.getQueryId();
- final QueryState queryState = inputBatch.getHeader().getQueryState();
+ final QueryState queryState = queryResult.getQueryState();
+
+ logger.debug( "resultArrived: queryState: {}, queryId = {}", queryState, queryId );
- logger.debug( "batchArrived: isLastChunk: {}, queryState: {}, queryId = {}",
- inputBatch.getHeader().getIsLastChunk(), queryState, queryId );
- logger.trace( "batchArrived: currentBatch = {}", inputBatch );
+ assert queryResult.hasQueryState() : "received query result without QueryState";
- final boolean isFailureBatch = QueryState.FAILED == queryState;
- final boolean isCompletionBatch = QueryState.COMPLETED == queryState;
- final boolean isLastChunkBatchToDelay =
- inputBatch.getHeader().getIsLastChunk() && QueryState.PENDING == queryState;
- final boolean isTerminalBatch;
+ final boolean isFailureResult = QueryState.FAILED == queryState;
+ // CANCELED queries are handled the same way as COMPLETED
+ final boolean isTerminalResult;
switch ( queryState ) {
case PENDING:
- isTerminalBatch = false;
- break;
+ isTerminalResult = false;
+ break;
case FAILED:
case CANCELED:
case COMPLETED:
- isTerminalBatch = true;
+ isTerminalResult = true;
break;
default:
logger.error( "Unexpected/unhandled QueryState " + queryState
- + " (for query " + queryId + ")" );
- isTerminalBatch = false;
+ + " (for query " + queryId + ")" );
+ isTerminalResult = false;
break;
}
- assert isFailureBatch || inputBatch.getHeader().getErrorCount() == 0
- : "Error count for the query batch is non-zero but QueryState != FAILED";
- UserResultsListener resultsListener = queryIdToResultsListenersMap.get( queryId );
- logger.trace( "For QueryId [{}], retrieved results listener {}", queryId,
- resultsListener );
- if ( null == resultsListener ) {
- // WHO?? didn't get query ID response and set submission listener yet,
- // so install a buffering listener for now
+ assert isFailureResult || queryResult.getErrorCount() == 0
+ : "Error count for the query batch is non-zero but QueryState != FAILED";
- BufferingResultsListener bl = new BufferingResultsListener();
- resultsListener = queryIdToResultsListenersMap.putIfAbsent( queryId, bl );
- // If we had a successful insertion, use that reference. Otherwise, just
- // throw away the new buffering listener.
- if ( null == resultsListener ) {
- resultsListener = bl;
- }
- // TODO: Is there a more direct way to detect a Query ID in whatever
- // state this string comparison detects?
- if ( queryId.toString().equals( "" ) ) {
- failAll();
- }
- }
+ UserResultsListener resultsListener = newUserResultsListener(queryId);
try {
- if (isFailureBatch) {
+ if (isFailureResult) {
// Failure case--pass on via submissionFailed(...).
+ String message = buildErrorMessage(queryResult);
+ resultsListener.submissionFailed(new RpcException(message));
+ // Note: Listener is removed in finally below.
+ } else if (isTerminalResult) {
+ // A successful completion/canceled case--pass on via resultArrived
+
try {
- String message = buildErrorMessage(inputBatch);
- resultsListener.submissionFailed(new RpcException(message));
+ resultsListener.queryCompleted();
+ } catch ( Exception e ) {
+ resultsListener.submissionFailed(new RpcException(e));
}
- finally {
- inputBatch.release();
- }
- // Note: Listener and any delayed batch are removed in finally below.
} else {
- // A successful (data, completion, or cancelation) case--pass on via
- // resultArrived, delaying any last-chunk batches until following
- // COMPLETED batch and omitting COMPLETED batch.
-
- // If is last-chunk batch, save until next batch for query (normally a
- // COMPLETED batch) comes in:
- if ( isLastChunkBatchToDelay ) {
- // We have a (non-failure) is-last-chunk batch--defer it until we get
- // the query's COMPLETED batch.
-
- QueryResultBatch expectNone;
- assert null == ( expectNone =
- queryIdToDeferredLastChunkBatchesMap.get( queryId ) )
- : "Already have pending last-batch QueryResultBatch " + expectNone
- + " (at receiving last-batch QueryResultBatch " + inputBatch
- + ") for query " + queryId;
- queryIdToDeferredLastChunkBatchesMap.put( queryId, inputBatch );
- // Can't release batch now; will release at terminal batch in
- // finally below.
- } else {
- // We have a batch triggering sending out a batch (maybe same one,
- // maybe deferred one.
-
- // Batch to send out in response to current batch.
- final QueryResultBatch outputBatch;
- if ( isCompletionBatch ) {
- // We have a COMPLETED batch--we should have a saved is-last-chunk
- // batch, and we must pass that on now (that we've seen COMPLETED).
-
- outputBatch = queryIdToDeferredLastChunkBatchesMap.get( queryId );
- assert null != outputBatch
- : "No pending last-batch QueryResultsBatch saved, at COMPLETED"
- + " QueryResultsBatch " + inputBatch + " for query " + queryId;
- } else {
- // We have a non--last-chunk PENDING batch or a CANCELED
- // batch--pass it on.
- outputBatch = inputBatch;
- }
- // Note to release input batch if it's not the batch we're sending out.
- final boolean releaseInputBatch = outputBatch != inputBatch;
-
- try {
- resultsListener.resultArrived( outputBatch, throttle );
- // That releases outputBatch if successful.
- } catch ( Exception e ) {
- outputBatch.release();
- resultsListener.submissionFailed(new RpcException(e));
- }
- finally {
- if ( releaseInputBatch ) {
- inputBatch.release();
- }
- }
- }
+ logger.warn("queryState {} was ignored", queryState);
}
} finally {
- if ( isTerminalBatch ) {
- // Remove and release any deferred is-last-chunk batch:
- QueryResultBatch anyUnsentLastChunkBatch =
- queryIdToDeferredLastChunkBatchesMap.remove( queryId );
- if ( null != anyUnsentLastChunkBatch ) {
- anyUnsentLastChunkBatch.release();
- }
-
- // TODO: What exactly are we checking for? How should we really check
+ if ( isTerminalResult ) {
+ // TODO: What exactly are we checking for? How should we really check
// for it?
if ( (! ( resultsListener instanceof BufferingResultsListener )
- || ((BufferingResultsListener) resultsListener).output != null ) ) {
+ || ((BufferingResultsListener) resultsListener).output != null ) ) {
queryIdToResultsListenersMap.remove( queryId, resultsListener );
}
}
}
}
- protected String buildErrorMessage(QueryResultBatch batch) {
+ /**
+ * Maps internal low-level API protocol to {@link UserResultsListener}-level API protocol.
+ * handles query data messages
+ */
+ public void batchArrived( ConnectionThrottle throttle,
+ ByteBuf pBody, ByteBuf dBody ) throws RpcException {
+ final QueryData queryData = RpcBus.get( pBody, QueryData.PARSER );
+ // Current batch coming in.
+ final QueryDataBatch batch = new QueryDataBatch( queryData, (DrillBuf) dBody );
+
+ final QueryId queryId = queryData.getQueryId();
+
+ logger.debug( "batchArrived: queryId = {}", queryId );
+ logger.trace( "batchArrived: batch = {}", batch );
+
+ UserResultsListener resultsListener = newUserResultsListener(queryId);
+
+ // A data case--pass on via dataArrived
+
+ try {
+ resultsListener.dataArrived(batch, throttle);
+ // That releases batch if successful.
+ } catch ( Exception e ) {
+ batch.release();
+ resultsListener.submissionFailed(new RpcException(e));
+ }
+ }
+
+ /**
+ * Return {@link UserResultsListener} associated with queryId. Will create a new {@link BufferingResultsListener}
+ * if no listener found.
+ * @param queryId queryId we are getting the listener for
+ * @return {@link UserResultsListener} associated with queryId
+ */
+ private UserResultsListener newUserResultsListener(QueryId queryId) {
+ UserResultsListener resultsListener = queryIdToResultsListenersMap.get( queryId );
+ logger.trace( "For QueryId [{}], retrieved results listener {}", queryId, resultsListener );
+ if ( null == resultsListener ) {
+ // WHO?? didn't get query ID response and set submission listener yet,
+ // so install a buffering listener for now
+
+ BufferingResultsListener bl = new BufferingResultsListener();
+ resultsListener = queryIdToResultsListenersMap.putIfAbsent( queryId, bl );
+ // If we had a successful insertion, use that reference. Otherwise, just
+ // throw away the new buffering listener.
+ if ( null == resultsListener ) {
+ resultsListener = bl;
+ }
+ // TODO: Is there a more direct way to detect a Query ID in whatever state this string comparison detects?
+ if ( queryId.toString().isEmpty() ) {
+ failAll();
+ }
+ }
+ return resultsListener;
+ }
+
+ protected String buildErrorMessage(QueryResult result) {
StringBuilder sb = new StringBuilder();
- for (UserBitShared.DrillPBError error : batch.getHeader().getErrorList()) {
+ for (UserBitShared.DrillPBError error : result.getErrorList()) {
sb.append(error.getMessage());
sb.append("\n");
}
@@ -252,7 +206,7 @@ public class QueryResultHandler {
private static class BufferingResultsListener implements UserResultsListener {
- private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue();
+ private ConcurrentLinkedQueue<QueryDataBatch> results = Queues.newConcurrentLinkedQueue();
private volatile boolean finished = false;
private volatile RpcException ex;
private volatile UserResultsListener output;
@@ -261,31 +215,39 @@ public class QueryResultHandler {
public boolean transferTo(UserResultsListener l) {
synchronized (this) {
output = l;
- boolean last = false;
- for (QueryResultBatch r : results) {
- l.resultArrived(r, throttle);
- last = r.getHeader().getIsLastChunk();
+ for (QueryDataBatch r : results) {
+ l.dataArrived(r, throttle);
}
if (ex != null) {
l.submissionFailed(ex);
return true;
+ } else if (finished) {
+ l.queryCompleted();
}
- return last;
+
+ return finished;
}
}
@Override
- public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
- this.throttle = throttle;
- if (result.getHeader().getIsLastChunk()) {
- finished = true;
+ public void queryCompleted() {
+ finished = true;
+ synchronized (this) {
+ if (output != null) {
+ output.queryCompleted();
+ }
}
+ }
+
+ @Override
+ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
+ this.throttle = throttle;
synchronized (this) {
if (output == null) {
this.results.add(result);
} else {
- output.resultArrived(result, throttle);
+ output.dataArrived(result, throttle);
}
}
}
@@ -340,11 +302,10 @@ public class QueryResultHandler {
if (oldListener != null) {
logger.debug("Unable to place user results listener, buffering listener was already in place.");
if (oldListener instanceof BufferingResultsListener) {
- queryIdToResultsListenersMap.remove(oldListener);
boolean all = ((BufferingResultsListener) oldListener).transferTo(this.resultsListener);
// simply remove the buffering listener if we already have the last response.
if (all) {
- queryIdToResultsListenersMap.remove(oldListener);
+ queryIdToResultsListenersMap.remove(queryId);
} else {
boolean replaced = queryIdToResultsListenersMap.replace(queryId, oldListener, resultsListener);
if (!replaced) {
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 925154d..fd82699 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -41,7 +42,7 @@ import org.apache.drill.exec.rpc.RpcException;
import com.google.protobuf.MessageLite;
public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHandshake, BitToUserHandshake> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
private final QueryResultHandler queryResultHandler = new QueryResultHandler();
@@ -80,8 +81,10 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
return BitToUserHandshake.getDefaultInstance();
case RpcType.QUERY_HANDLE_VALUE:
return QueryId.getDefaultInstance();
- case RpcType.QUERY_RESULT_VALUE:
- return QueryResult.getDefaultInstance();
+ case RpcType.QUERY_RESULT_VALUE:
+ return QueryResult.getDefaultInstance();
+ case RpcType.QUERY_DATA_VALUE:
+ return QueryData.getDefaultInstance();
}
throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
}
@@ -89,9 +92,12 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
@Override
protected Response handleReponse(ConnectionThrottle throttle, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
switch (rpcType) {
- case RpcType.QUERY_RESULT_VALUE:
+ case RpcType.QUERY_DATA_VALUE:
queryResultHandler.batchArrived(throttle, pBody, dBody);
return new Response(RpcType.ACK, Ack.getDefaultInstance());
+ case RpcType.QUERY_RESULT_VALUE:
+ queryResultHandler.resultArrived(pBody);
+ return new Response(RpcType.ACK, Ack.getDefaultInstance());
default:
throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
index 9f83a4f..934a094 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java
@@ -22,8 +22,30 @@ import org.apache.drill.exec.rpc.RpcException;
public interface UserResultsListener {
- public abstract void queryIdArrived(QueryId queryId);
- public abstract void submissionFailed(RpcException ex);
- public abstract void resultArrived(QueryResultBatch result, ConnectionThrottle throttle);
+ /**
+ * QueryId is available. Called when a query is successfully submitted to the server.
+ * @param queryId sent by the server along {@link org.apache.drill.exec.rpc.Acks.OK Acks.OK}
+ */
+ void queryIdArrived(QueryId queryId);
+
+ /**
+ * The query has failed. Most likely called when the server returns a FAILED query state. Can also be called if
+ * {@link #dataArrived(QueryDataBatch, ConnectionThrottle) dataArrived()} throws an exception
+ * @param ex exception describing the cause of the failure
+ */
+ void submissionFailed(RpcException ex);
+
+ /**
+ * The query has completed (successsful completion or cancellation). The listener will not receive any other
+ * data or result message. Called when the server returns a terminal-non failing- state (COMPLETED or CANCELLED)
+ */
+ void queryCompleted();
+
+ /**
+ * A {@link org.apache.drill.exec.proto.beans.QueryData QueryData} message was received
+ * @param result data batch received
+ * @param throttle connection throttle
+ */
+ void dataArrived(QueryDataBatch result, ConnectionThrottle throttle);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 908d304..9fb8bdb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.rpc.user;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -33,8 +34,9 @@ public class UserRpcConfig {
.add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) //user to bit.
.add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) //user to bit
.add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) //user to bit
+ .add(RpcType.QUERY_DATA, QueryData.class, RpcType.ACK, Ack.class) //bit to user
.add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) //bit to user
.build();
- public static int RPC_VERSION = 3;
+ public static int RPC_VERSION = 4;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index c76d324..5b2433b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
@@ -45,7 +46,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnection> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
final UserWorker worker;
final BufferAllocator alloc;
@@ -117,16 +118,19 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
return session;
}
- public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
+ public void sendResult(RpcOutcomeListener<Ack> listener, QueryResult result, boolean allowInEventThread){
logger.trace("Sending result to client with {}", result);
- send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, false, result.getBuffers());
+ send(listener, this, RpcType.QUERY_RESULT, result, Ack.class, allowInEventThread);
}
- public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result, boolean allowInEventThread){
- logger.trace("Sending result to client with {}", result);
- send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, allowInEventThread, result.getBuffers());
+ public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
+ sendData(listener, result, false);
}
+ public void sendData(RpcOutcomeListener<Ack> listener, QueryWritableBatch result, boolean allowInEventThread){
+ logger.trace("Sending data to client with {}", result);
+ send(listener, this, RpcType.QUERY_DATA, result.getHeader(), Ack.class, allowInEventThread, result.getBuffers());
+ }
@Override
public BufferAllocator getAllocator() {
return alloc;
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
index 8996a69..fbbf0b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/QueryWrapper.java
@@ -39,7 +39,7 @@ import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.vector.ValueVector;
@@ -49,8 +49,7 @@ import parquet.Preconditions;
@XmlRootElement
public class QueryWrapper {
-
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWrapper.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWrapper.class);
private String query;
private String queryType;
@@ -137,7 +136,12 @@ public class QueryWrapper {
}
@Override
- public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
+ public void queryCompleted() {
+ latch.countDown();
+ }
+
+ @Override
+ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
try {
final int rows = result.getHeader().getRowCount();
if (result.hasData()) {
@@ -162,9 +166,6 @@ public class QueryWrapper {
throw new RuntimeException(e);
} finally {
result.release();
- if (result.getHeader().getIsLastChunk()) {
- latch.countDown();
- }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 285b75a..23ef0d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -46,7 +46,6 @@ import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.ExternalSort;
-import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
@@ -612,7 +611,6 @@ public class Foreman implements Runnable {
* Construct the response based on the latest resultState. The builder shouldn't fail.
*/
final QueryResult.Builder resultBuilder = QueryResult.newBuilder()
- .setIsLastChunk(resultState != QueryState.COMPLETED) // TODO(DRILL-2498) temporary
.setQueryId(queryId)
.setQueryState(resultState);
if (resultException != null) {
@@ -629,7 +627,7 @@ public class Foreman implements Runnable {
*/
try {
// send whatever result we ended up with
- initiatingClient.sendResult(responseListener, new QueryWritableBatch(resultBuilder.build()), true);
+ initiatingClient.sendResult(responseListener, resultBuilder.build(), true);
} catch(Exception e) {
addException(e);
logger.warn("Exception sending result to client", resultException);
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 64cf2ec..c602a01 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -38,7 +38,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
@@ -188,19 +188,19 @@ public class BaseTestQuery extends ExecTest {
listener.waitForCompletion();
}
- protected static List<QueryResultBatch> testSqlWithResults(String sql) throws Exception{
+ protected static List<QueryDataBatch> testSqlWithResults(String sql) throws Exception{
return testRunAndReturn(QueryType.SQL, sql);
}
- protected static List<QueryResultBatch> testLogicalWithResults(String logical) throws Exception{
+ protected static List<QueryDataBatch> testLogicalWithResults(String logical) throws Exception{
return testRunAndReturn(QueryType.LOGICAL, logical);
}
- protected static List<QueryResultBatch> testPhysicalWithResults(String physical) throws Exception{
+ protected static List<QueryDataBatch> testPhysicalWithResults(String physical) throws Exception{
return testRunAndReturn(QueryType.PHYSICAL, physical);
}
- public static List<QueryResultBatch> testRunAndReturn(QueryType type, String query) throws Exception{
+ public static List<QueryDataBatch> testRunAndReturn(QueryType type, String query) throws Exception{
query = QueryTestUtil.normalizeQuery(query);
return client.runQuery(type, query);
}
@@ -221,9 +221,9 @@ public class BaseTestQuery extends ExecTest {
query = String.format(query, args);
logger.debug("Running query:\n--------------\n"+query);
for (int i = 0; i < interation; i++) {
- List<QueryResultBatch> results = client.runQuery(QueryType.SQL, query);
- for (QueryResultBatch queryResultBatch : results) {
- queryResultBatch.release();
+ List<QueryDataBatch> results = client.runQuery(QueryType.SQL, query);
+ for (QueryDataBatch queryDataBatch : results) {
+ queryDataBatch.release();
}
}
}
@@ -252,7 +252,7 @@ public class BaseTestQuery extends ExecTest {
testPhysical(getFile(file));
}
- protected static List<QueryResultBatch> testPhysicalFromFileWithResults(String file) throws Exception {
+ protected static List<QueryDataBatch> testPhysicalFromFileWithResults(String file) throws Exception {
return testRunAndReturn(QueryType.PHYSICAL, getFile(file));
}
@@ -285,16 +285,18 @@ public class BaseTestQuery extends ExecTest {
}
@Override
- public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
+ public void queryCompleted() {
+ System.out.println("Query completed successfully with row count: " + count.get());
+ latch.countDown();
+ }
+
+ @Override
+ public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
int rows = result.getHeader().getRowCount();
if (result.getData() != null) {
count.addAndGet(rows);
}
result.release();
- if (result.getHeader().getIsLastChunk()) {
- System.out.println("Query completed successfully with row count: " + count.get());
- latch.countDown();
- }
}
@Override
@@ -317,10 +319,10 @@ public class BaseTestQuery extends ExecTest {
this.columnWidths = columnWidths;
}
- protected int printResult(List<QueryResultBatch> results) throws SchemaChangeException {
+ protected int printResult(List<QueryDataBatch> results) throws SchemaChangeException {
int rowCount = 0;
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
- for(QueryResultBatch result : results) {
+ for(QueryDataBatch result : results) {
rowCount += result.getHeader().getRowCount();
loader.load(result.getHeader().getDef(), result.getData());
if (loader.getRecordCount() <= 0) {
@@ -334,12 +336,12 @@ public class BaseTestQuery extends ExecTest {
return rowCount;
}
- protected static String getResultString(List<QueryResultBatch> results, String delimiter)
+ protected static String getResultString(List<QueryDataBatch> results, String delimiter)
throws SchemaChangeException {
StringBuilder formattedResults = new StringBuilder();
boolean includeHeader = true;
RecordBatchLoader loader = new RecordBatchLoader(getAllocator());
- for(QueryResultBatch result : results) {
+ for(QueryDataBatch result : results) {
loader.load(result.getHeader().getDef(), result.getData());
if (loader.getRecordCount() <= 0) {
continue;
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
index 75a91b3..d05c896 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/DrillTestWrapper.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.record.HyperVectorWrapper;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.io.Text;
@@ -166,13 +166,13 @@ public class DrillTestWrapper {
}
}
- private Map<String, HyperVectorValueIterator> addToHyperVectorMap(List<QueryResultBatch> records, RecordBatchLoader loader,
+ private Map<String, HyperVectorValueIterator> addToHyperVectorMap(List<QueryDataBatch> records, RecordBatchLoader loader,
BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
Map<String, HyperVectorValueIterator> combinedVectors = new HashMap();
long totalRecords = 0;
- QueryResultBatch batch;
+ QueryDataBatch batch;
int size = records.size();
for (int i = 0; i < size; i++) {
batch = records.get(i);
@@ -213,13 +213,13 @@ public class DrillTestWrapper {
* @throws SchemaChangeException
* @throws UnsupportedEncodingException
*/
- private Map<String, List> addToCombinedVectorResults(List<QueryResultBatch> records, RecordBatchLoader loader,
+ private Map<String, List> addToCombinedVectorResults(List<QueryDataBatch> records, RecordBatchLoader loader,
BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
// TODO - this does not handle schema changes
Map<String, List> combinedVectors = new HashMap();
long totalRecords = 0;
- QueryResultBatch batch;
+ QueryDataBatch batch;
int size = records.size();
for (int i = 0; i < size; i++) {
batch = records.get(0);
@@ -268,14 +268,14 @@ public class DrillTestWrapper {
BatchSchema schema = null;
BaseTestQuery.test(testOptionSettingQueries);
- List<QueryResultBatch> expected = BaseTestQuery.testRunAndReturn(queryType, query);
+ List<QueryDataBatch> expected = BaseTestQuery.testRunAndReturn(queryType, query);
addTypeInfoIfMissing(expected.get(0), testBuilder);
List<Map> expectedRecords = new ArrayList<>();
addToMaterializedResults(expectedRecords, expected, loader, schema);
- List<QueryResultBatch> results = new ArrayList();
+ List<QueryDataBatch> results = new ArrayList();
List<Map> actualRecords = new ArrayList<>();
// If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
// the cases where the baseline is stored in a file.
@@ -313,13 +313,13 @@ public class DrillTestWrapper {
BatchSchema schema = null;
BaseTestQuery.test(testOptionSettingQueries);
- List<QueryResultBatch> results = BaseTestQuery.testRunAndReturn(queryType, query);
+ List<QueryDataBatch> results = BaseTestQuery.testRunAndReturn(queryType, query);
// To avoid extra work for test writers, types can optionally be inferred from the test query
addTypeInfoIfMissing(results.get(0), testBuilder);
Map<String, List> actualSuperVectors = addToCombinedVectorResults(results, loader, schema);
- List<QueryResultBatch> expected = null;
+ List<QueryDataBatch> expected = null;
Map<String, List> expectedSuperVectors = null;
// If baseline data was not provided to the test builder directly, we must run a query for the baseline, this includes
@@ -353,14 +353,14 @@ public class DrillTestWrapper {
BatchSchema schema = null;
BaseTestQuery.test(testOptionSettingQueries);
- List<QueryResultBatch> results = BaseTestQuery.testRunAndReturn(queryType, query);
+ List<QueryDataBatch> results = BaseTestQuery.testRunAndReturn(queryType, query);
// To avoid extra work for test writers, types can optionally be inferred from the test query
addTypeInfoIfMissing(results.get(0), testBuilder);
Map<String, HyperVectorValueIterator> actualSuperVectors = addToHyperVectorMap(results, loader, schema);
BaseTestQuery.test(baselineOptionSettingQueries);
- List<QueryResultBatch> expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
+ List<QueryDataBatch> expected = BaseTestQuery.testRunAndReturn(baselineQueryType, testBuilder.getValidationQuery());
Map<String, HyperVectorValueIterator> expectedSuperVectors = addToHyperVectorMap(expected, loader, schema);
@@ -368,7 +368,7 @@ public class DrillTestWrapper {
cleanupBatches(results, expected);
}
- private void addTypeInfoIfMissing(QueryResultBatch batch, TestBuilder testBuilder) {
+ private void addTypeInfoIfMissing(QueryDataBatch batch, TestBuilder testBuilder) {
if (! testBuilder.typeInfoSet()) {
Map<SchemaPath, TypeProtos.MajorType> typeMap = getTypeMapFromBatch(batch);
testBuilder.baselineTypes(typeMap);
@@ -376,7 +376,7 @@ public class DrillTestWrapper {
}
- private Map<SchemaPath, TypeProtos.MajorType> getTypeMapFromBatch(QueryResultBatch batch) {
+ private Map<SchemaPath, TypeProtos.MajorType> getTypeMapFromBatch(QueryDataBatch batch) {
Map<SchemaPath, TypeProtos.MajorType> typeMap = new HashMap();
for (int i = 0; i < batch.getHeader().getDef().getFieldCount(); i++) {
typeMap.put(MaterializedField.create(batch.getHeader().getDef().getField(i)).getPath(),
@@ -385,18 +385,18 @@ public class DrillTestWrapper {
return typeMap;
}
- private void cleanupBatches(List<QueryResultBatch>... results) {
- for (List<QueryResultBatch> resultList : results ) {
- for (QueryResultBatch result : resultList) {
+ private void cleanupBatches(List<QueryDataBatch>... results) {
+ for (List<QueryDataBatch> resultList : results ) {
+ for (QueryDataBatch result : resultList) {
result.release();
}
}
}
- protected void addToMaterializedResults(List<Map> materializedRecords, List<QueryResultBatch> records, RecordBatchLoader loader,
+ protected void addToMaterializedResults(List<Map> materializedRecords, List<QueryDataBatch> records, RecordBatchLoader loader,
BatchSchema schema) throws SchemaChangeException, UnsupportedEncodingException {
long totalRecords = 0;
- QueryResultBatch batch;
+ QueryDataBatch batch;
int size = records.size();
for (int i = 0; i < size; i++) {
batch = records.get(0);
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
index 80b4d13..4744978 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
@@ -28,7 +28,7 @@ import java.util.regex.Pattern;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
import org.eigenbase.sql.SqlExplain.Depth;
@@ -288,12 +288,12 @@ public class PlanTestBase extends BaseTestQuery {
*/
protected static String getPlanInString(String sql, String columnName)
throws Exception {
- List<QueryResultBatch> results = testSqlWithResults(sql);
+ List<QueryDataBatch> results = testSqlWithResults(sql);
RecordBatchLoader loader = new RecordBatchLoader(getDrillbitContext().getAllocator());
StringBuilder builder = new StringBuilder();
- for (QueryResultBatch b : results) {
+ for (QueryDataBatch b : results) {
if (!b.hasData()) {
continue;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
index 3d19229..82f1752 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java
@@ -29,7 +29,7 @@ import org.apache.drill.exec.client.PrintingResultsListener;
import org.apache.drill.exec.client.QuerySubmitter.Format;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.util.VectorUtil;
@@ -59,11 +59,11 @@ public class QueryTestUtil {
final DrillClient drillClient = new DrillClient(drillConfig, remoteServiceSet.getCoordinator());
drillClient.connect();
- final List<QueryResultBatch> results = drillClient.runQuery(
+ final List<QueryDataBatch> results = drillClient.runQuery(
QueryType.SQL, String.format("alter session set `%s` = %d",
ExecConstants.MAX_WIDTH_PER_NODE_KEY, maxWidth));
- for (QueryResultBatch queryResultBatch : results) {
- queryResultBatch.release();
+ for (QueryDataBatch queryDataBatch : results) {
+ queryDataBatch.release();
}
return drillClient;
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
index 07cb833..5703bf9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java
@@ -24,17 +24,17 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.ConnectionThrottle;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.rpc.user.UserResultsListener;
/**
* Result listener that is set up to receive a single row. Useful for queries
- * such with a count(*) or limit 1. The abstract method {@link #rowArrived(QueryResultBatch)} provides
+ * such with a count(*) or limit 1. The abstract method {@link #rowArrived(QueryDataBatch)} provides
* the means for a derived class to get the expected record's data.
*/
public abstract class SingleRowListener implements UserResultsListener {
@@ -51,14 +51,26 @@ public abstract class SingleRowListener implements UserResultsListener {
@Override
public void submissionFailed(final RpcException ex) {
exception = ex;
+ synchronized(errorList) {
+ errorList.add(ex.getRemoteError());
+ }
latch.countDown();
}
@Override
- public void resultArrived(final QueryResultBatch result, final ConnectionThrottle throttle) {
- final QueryResult queryResult = result.getHeader();
+ public void queryCompleted() {
+ try {
+ cleanup();
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+ final QueryData queryData = result.getHeader();
if (result.hasData()) {
- final int nRows = this.nRows.addAndGet(queryResult.getRowCount());
+ final int nRows = this.nRows.addAndGet(queryData.getRowCount());
if (nRows > 1) {
throw new IllegalStateException("Expected exactly one row, but got " + nRows);
}
@@ -66,22 +78,7 @@ public abstract class SingleRowListener implements UserResultsListener {
rowArrived(result);
}
- // TODO this appears to never be set
- if (queryResult.hasQueryState()) {
- queryState = queryResult.getQueryState();
- }
-
- synchronized(errorList) {
- errorList.addAll(queryResult.getErrorList());
- }
-
- final boolean isLastChunk = queryResult.getIsLastChunk();
result.release();
-
- if (isLastChunk) {
- cleanup();
- latch.countDown();
- }
}
/**
@@ -110,9 +107,9 @@ public abstract class SingleRowListener implements UserResultsListener {
* <p>Derived classes provide whatever implementation they require here to access
* the record's data.
*
- * @param queryResultBatch result batch holding the row
+ * @param queryDataBatch result batch holding the row
*/
- protected abstract void rowArrived(QueryResultBatch queryResultBatch);
+ protected abstract void rowArrived(QueryDataBatch queryDataBatch);
/**
* Wait for the completion of this query; receiving a record or an error will both cause the
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java
index 67b102d..0a24073 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestQueriesOnLargeFile.java
@@ -27,7 +27,7 @@ import java.util.List;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.drill.exec.vector.BigIntVector;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -72,12 +72,12 @@ public class TestQueriesOnLargeFile extends BaseTestQuery {
@Test
public void testRead() throws Exception {
- List<QueryResultBatch> results = testSqlWithResults(
+ List<QueryDataBatch> results = testSqlWithResults(
String.format("SELECT count(*) FROM dfs_test.`default`.`%s`", dataFile.getPath()));
RecordBatchLoader batchLoader = new RecordBatchLoader(getAllocator());
- for(QueryResultBatch batch : results) {
+ for(QueryDataBatch batch : results) {
batchLoader.load(batch.getHeader().getDef(), batch.getData());
if (batchLoader.getRecordCount() <= 0) {
@@ -96,10 +96,10 @@ public class TestQueriesOnLargeFile extends BaseTestQuery {
public void testMergingReceiver() throws Exception {
String plan = Files.toString(FileUtils.getResourceAsFile("/largefiles/merging_receiver_large_data.json"),
Charsets.UTF_8).replace("#{TEST_FILE}", escapeJsonString(dataFile.getPath()));
- List<QueryResultBatch> results = testPhysicalWithResults(plan);
+ List<QueryDataBatch> results = testPhysicalWithResults(plan);
int recordsInOutput = 0;
- for(QueryResultBatch batch : results) {
+ for(QueryDataBatch batch : results) {
recordsInOutput += batch.getHeader().getDef().getRecordCount();
batch.release();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/1d9d82b0/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
index 98919ec..df03c7d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
@@ -21,7 +21,7 @@ import java.util.List;
import org.apache.drill.exec.DrillSystemTestBase;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
-import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -53,8 +53,8 @@ public class DrillClientSystemTest extends DrillSystemTestBase {
startCluster(1);
DrillClient client = new DrillClient();
client.connect();
- List<QueryResultBatch> results = client.runQuery(QueryType.LOGICAL, plan);
- for (QueryResultBatch result : results) {
+ List<QueryDataBatch> results = client.runQuery(QueryType.LOGICAL, plan);
+ for (QueryDataBatch result : results) {
System.out.println(result);
result.release();
}
@@ -66,8 +66,8 @@ public class DrillClientSystemTest extends DrillSystemTestBase {
startCluster(2);
DrillClient client = new DrillClient();
client.connect();
- List<QueryResultBatch> results = client.runQuery(QueryType.LOGICAL, plan);
- for (QueryResultBatch result : results) {
+ List<QueryDataBatch> results = client.runQuery(QueryType.LOGICAL, plan);
+ for (QueryDataBatch result : results) {
System.out.println(result);
result.release();
}