You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/20 22:24:55 UTC
[19/32] git commit: DRILL-884: Always return a schema,
even when there are no records
DRILL-884: Always return a schema, even when there are no records
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/a314c824
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/a314c824
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/a314c824
Branch: refs/heads/master
Commit: a314c824ba99edf0c29b004c121904847bab2c15
Parents: a3bf05d
Author: Steven Phillips <sp...@maprtech.com>
Authored: Wed Jun 4 22:24:12 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 19 20:30:55 2014 -0700
----------------------------------------------------------------------
.../drill/exec/physical/impl/ScanBatch.java | 18 ++++++++-
.../drill/exec/physical/impl/ScreenCreator.java | 22 ++++++----
.../exec/physical/impl/SingleSenderCreator.java | 6 ++-
.../exec/physical/impl/TopN/TopNBatch.java | 11 ++++-
.../exec/physical/impl/WireRecordBatch.java | 12 +++++-
.../physical/impl/aggregate/HashAggBatch.java | 25 ++++++++++--
.../impl/aggregate/HashAggTemplate.java | 2 +-
.../impl/aggregate/StreamingAggBatch.java | 27 ++++++++++---
.../impl/aggregate/StreamingAggTemplate.java | 9 ++++-
.../exec/physical/impl/join/HashJoinBatch.java | 5 ++-
.../exec/physical/impl/join/MergeJoinBatch.java | 29 ++++++++------
.../impl/materialize/QueryWritableBatch.java | 24 ++++++++++-
.../impl/mergereceiver/MergingRecordBatch.java | 24 +++++++++++
.../PartitionSenderRootExec.java | 5 +--
.../impl/project/ProjectRecordBatch.java | 1 +
.../impl/sort/SortRecordBatchBuilder.java | 10 ++---
.../IteratorValidatorBatchIterator.java | 6 ++-
.../physical/impl/xsort/ExternalSortBatch.java | 40 ++++++++++++++-----
.../impl/xsort/SingleBatchSorterTemplate.java | 4 +-
.../exec/record/AbstractSingleRecordBatch.java | 11 ++++-
.../apache/drill/exec/record/BatchSchema.java | 9 +++++
.../exec/record/FragmentWritableBatch.java | 16 ++++++++
.../drill/exec/record/RecordBatchLoader.java | 4 +-
.../apache/drill/exec/record/SchemaBuilder.java | 8 ++++
.../org/apache/drill/TestExampleQueries.java | 8 +++-
.../test/resources/mergerecv/empty_batch.json | 2 +-
.../java/org/apache/drill/jdbc/DrillCursor.java | 17 +++++---
.../drill/jdbc/test/TestJdbcDistQuery.java | 42 +++++++++++++++-----
.../resources/bootstrap-storage-plugins.json | 3 ++
29 files changed, 322 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 55d3f62..5f8bfb9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -80,6 +80,8 @@ public class ScanBatch implements RecordBatch {
private List<ValueVector> partitionVectors;
private List<Integer> selectedPartitionColumns;
private String partitionColumnDesignator;
+ private boolean first = true;
+ private boolean done = false;
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
this.context = context;
@@ -92,7 +94,7 @@ public class ScanBatch implements RecordBatch {
this.partitionColumns = partitionColumns.iterator();
this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
this.selectedPartitionColumns = selectedPartitionColumns;
- DrillConfig config = context.getConfig(); //This nonsense it is to not break all the stupid unit tests using SimpleRootExec
+ DrillConfig config = context.getConfig();
this.partitionColumnDesignator = config == null ? "dir" : config.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
addPartitionVectors();
}
@@ -122,11 +124,14 @@ public class ScanBatch implements RecordBatch {
}
private void releaseAssets() {
- container.clear();
+ container.zeroVectors();
}
@Override
public IterOutcome next() {
+ if (done) {
+ return IterOutcome.NONE;
+ }
oContext.getStats().startProcessing();
try {
mutator.allocate(MAX_RECORD_CNT);
@@ -135,6 +140,14 @@ public class ScanBatch implements RecordBatch {
if (!readers.hasNext()) {
currentReader.cleanup();
releaseAssets();
+ if (first) {
+ first = false;
+ done = true;
+ populatePartitionVectors();
+ container.buildSchema(SelectionVectorMode.NONE);
+ schema = container.getSchema();
+ return IterOutcome.OK_NEW_SCHEMA;
+ }
return IterOutcome.NONE;
}
oContext.getStats().startSetup();
@@ -154,6 +167,7 @@ public class ScanBatch implements RecordBatch {
return IterOutcome.STOP;
}
}
+ first = false;
populatePartitionVectors();
if (mutator.isNewSchema()) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 86e77d8..9ad85af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -64,6 +64,7 @@ public class ScreenCreator implements RootCreator<Screen>{
final FragmentContext context;
final UserClientConnection connection;
private RecordMaterializer materializer;
+ private boolean first = true;
public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
super(context, config);
@@ -108,13 +109,18 @@ public class ScreenCreator implements RootCreator<Screen>{
case NONE: {
sendCount.waitForSendComplete();
// context.getStats().batchesCompleted.inc(1);
- QueryResult header = QueryResult.newBuilder() //
- .setQueryId(context.getHandle().getQueryId()) //
- .setRowCount(0) //
- .setDef(RecordBatchDef.getDefaultInstance()) //
- .setIsLastChunk(true) //
- .build();
- QueryWritableBatch batch = new QueryWritableBatch(header);
+ QueryWritableBatch batch;
+ if (!first) {
+ QueryResult header = QueryResult.newBuilder() //
+ .setQueryId(context.getHandle().getQueryId()) //
+ .setRowCount(0) //
+ .setDef(RecordBatchDef.getDefaultInstance()) //
+ .setIsLastChunk(true) //
+ .build();
+ batch = new QueryWritableBatch(header);
+ } else {
+ batch = QueryWritableBatch.getEmptyBatchWithSchema(context.getHandle().getQueryId(), 0, true, incoming.getSchema());
+ }
stats.startWait();
try {
connection.sendResult(listener, batch);
@@ -140,6 +146,7 @@ public class ScreenCreator implements RootCreator<Screen>{
}
sendCount.increment();
+ first = false;
return true;
default:
throw new UnsupportedOperationException();
@@ -181,6 +188,7 @@ public class ScreenCreator implements RootCreator<Screen>{
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 9e91468..1b63112 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -80,7 +80,8 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
switch(out){
case STOP:
case NONE:
- FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0);
+ FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), handle.getMajorFragmentId(),
+ handle.getMinorFragmentId(), recMajor, 0, incoming.getSchema());
sendCount.increment();
stats.startWait();
try {
@@ -92,7 +93,8 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
case OK_NEW_SCHEMA:
case OK:
- FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+ FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(),
+ handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
sendCount.increment();
stats.startWait();
try {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index c9cd2dd..4a5d368 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -46,7 +46,9 @@ import org.apache.drill.exec.physical.impl.svremover.Copier;
import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
@@ -113,7 +115,14 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
return sv4;
}
-
+ @Override
+ public BatchSchema getSchema() {
+ List<MaterializedField> fields = Lists.newArrayList();
+ for (MaterializedField field : incoming.getSchema()) {
+ fields.add(field);
+ }
+ return BatchSchema.newBuilder().addFields(fields).setSelectionVectorMode(SelectionVectorMode.FOUR_BYTE).build();
+ }
@Override
public void cleanup() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index 2a19ba7..1eae0c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -48,6 +48,7 @@ public class WireRecordBatch implements RecordBatch {
private FragmentContext context;
private BatchSchema schema;
private OperatorStats stats;
+ private boolean first = true;
public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, RandomReceiver config) throws OutOfMemoryException {
@@ -114,13 +115,22 @@ public class WireRecordBatch implements RecordBatch {
batch = fragProvider.getNext();
// skip over empty batches. we do this since these are basically control messages.
- while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){
+ while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && !first){
+ if (first) {
+ first = false;
+ RecordBatchDef rbd = batch.getHeader().getDef();
+ batchLoader.load(rbd, batch.getBody());
+ batch.release();
+ schema = batchLoader.getSchema().clone();
+ batchLoader.clear();
+ }
batch = fragProvider.getNext();
}
} finally {
stats.stopWait();
}
+ first = false;
if (batch == null){
batchLoader.clear();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 6adc304..3609c02 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -40,9 +40,11 @@ import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -65,6 +67,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
private LogicalExpression[] aggrExprs;
private TypedFieldId[] groupByOutFieldIds ;
private TypedFieldId[] aggrOutFieldIds ; // field ids for the outgoing batch
+ private boolean first = true;
private final GeneratorMapping UPDATE_AGGR_INSIDE =
GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */,
@@ -90,12 +93,16 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
@Override
public IterOutcome innerNext() {
+ if (done) {
+ return IterOutcome.NONE;
+ }
// this is only called on the first batch. Beyond this, the aggregator manages batches.
if (aggregator == null) {
IterOutcome outcome = next(incoming);
logger.debug("Next outcome of {}", outcome);
switch (outcome) {
case NONE:
+ throw new UnsupportedOperationException("Received NONE on first batch");
case NOT_YET:
case STOP:
return outcome;
@@ -118,7 +125,13 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
if (aggregator.buildComplete() && ! aggregator.allFlushed()) {
// aggregation is complete and not all records have been output yet
- return aggregator.outputCurrentBatch();
+ IterOutcome outcome = aggregator.outputCurrentBatch();
+ if (outcome == IterOutcome.NONE && first) {
+ first = false;
+ done = true;
+ return IterOutcome.OK_NEW_SCHEMA;
+ }
+ return outcome;
}
logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
@@ -128,11 +141,17 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
switch(out){
case CLEANUP_AND_RETURN:
- container.clear();
+ container.zeroVectors();
aggregator.cleanup();
done = true;
- return aggregator.getOutcome();
+ // fall through
case RETURN_OUTCOME:
+ IterOutcome outcome = aggregator.getOutcome();
+ if (outcome == IterOutcome.NONE && first) {
+ first = false;
+ done = true;
+ return IterOutcome.OK_NEW_SCHEMA;
+ }
return aggregator.getOutcome();
case UPDATE_AGGREGATOR:
aggregator = null;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 5069a2d..935bbb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -429,7 +429,7 @@ public abstract class HashAggTemplate implements HashAggregator {
// get the number of groups in the batch holder corresponding to this batch index
int batchOutputRecords = batchHolders.get(outBatchIndex).getNumGroups();
- if (batchOutputRecords == 0) {
+ if (!first && batchOutputRecords == 0) {
this.outcome = IterOutcome.NONE;
return outcome;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index ec12de9..367d2c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -45,6 +45,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.impl.aggregate.StreamingAggregator.AggOutcome;
import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
@@ -65,6 +66,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private StreamingAggregator aggregator;
private final RecordBatch incoming;
private boolean done = false;
+ private boolean first = true;
public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(popConfig, context);
@@ -74,12 +76,17 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
@Override
public int getRecordCount() {
if(done) return 0;
+ if (aggregator == null) return 0;
return aggregator.getOutputCount();
}
@Override
public IterOutcome innerNext() {
- // this is only called on the first batch. Beyond this, the aggregator manages batches.
+ if (done) {
+ container.zeroVectors();
+ return IterOutcome.NONE;
+ }
+ // this is only called on the first batch. Beyond this, the aggregator manages batches.
if (aggregator == null) {
IterOutcome outcome = next(incoming);
logger.debug("Next outcome of {}", outcome);
@@ -106,17 +113,25 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
switch(out){
case CLEANUP_AND_RETURN:
- container.clear();
+ if (!first) container.zeroVectors();
done = true;
- return aggregator.getOutcome();
+ // fall through
case RETURN_OUTCOME:
- return aggregator.getOutcome();
+ IterOutcome outcome = aggregator.getOutcome();
+ if (outcome == IterOutcome.NONE && first) {
+ first = false;
+ done = true;
+ return IterOutcome.OK_NEW_SCHEMA;
+ }
+ first = false;
+ return outcome;
case UPDATE_AGGREGATOR:
+ first = false;
aggregator = null;
if(!createAggregator()){
return IterOutcome.STOP;
- }
- continue;
+ }
+ continue;
default:
throw new IllegalStateException(String.format("Unknown state %s.", out));
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index e73f21b..3bd861d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -58,7 +58,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
this.allocators = allocators;
this.outgoing = outgoing;
setupInterior(incoming, outgoing);
- this.currentIndex = this.getVectorIndex(underlyingIndex);
+ this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex);
}
@@ -158,13 +158,18 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
try{
while(true){
+ if (previous != null) {
+ previous.clear();
+ }
previous = new InternalBatch(incoming);
IterOutcome out = outgoing.next(0, incoming);
if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
switch(out){
case NONE:
lastOutcome = out;
- if(addedRecordCount > 0){
+ if (first && addedRecordCount == 0) {
+ return setOkAndReturn();
+ } else if(addedRecordCount > 0){
if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous;
if(EXTRA_DEBUG) logger.debug("Received no more batches, returning.");
return setOkAndReturn();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 11368e3..1c028d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -109,6 +109,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
// Schema of the build side
private BatchSchema rightSchema = null;
+ private boolean first = true;
+
// Generator mapping for the build side
private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
"projectBuildRecord" /* eval method */,
@@ -187,7 +189,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
* 2. We've filled up the outgoing batch to the maximum and we need to return upstream
* Either case build the output container's schema and return
*/
- if (outputRecords > 0) {
+ if (outputRecords > 0 || first) {
+ first = false;
// Build the container schema and set the counts
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index e32b653..6943d1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -110,6 +110,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private final JoinRelType joinType;
private JoinWorker worker;
public MergeJoinBatchBuilder batchBuilder;
+ private boolean done = false;
protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
super(popConfig, context);
@@ -136,6 +137,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
@Override
public IterOutcome innerNext() {
+ if (done) {
+ return IterOutcome.NONE;
+ }
// we do this in the here instead of the constructor because don't necessary want to start consuming on construction.
status.ensureInitial();
@@ -190,9 +194,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
kill();
return IterOutcome.STOP;
case NO_MORE_DATA:
- logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE"));
+ logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" :"NONE")));
setRecordCountInContainer();
- return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE;
+ done = true;
+ return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.NONE);
case SCHEMA_CHANGED:
worker = null;
if(status.getOutPosition() > 0){
@@ -349,7 +354,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
//////////////////////
cg.setMappingSet(copyLeftMapping);
int vectorId = 0;
- if (status.isLeftPositionAllowed()) {
+ if (worker == null || status.isLeftPositionAllowed()) {
for (VectorWrapper<?> vw : left) {
MajorType inputType = vw.getField().getType();
MajorType outputType;
@@ -379,7 +384,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
cg.setMappingSet(copyRightMappping);
int rightVectorBase = vectorId;
- if (status.isRightPositionAllowed()) {
+ if (worker == null || status.isRightPositionAllowed()) {
for (VectorWrapper<?> vw : right) {
MajorType inputType = vw.getField().getType();
MajorType outputType;
@@ -414,12 +419,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
container.clear();
//estimation of joinBatchSize : max of left/right size, expanded by a factor of 16, which is then bounded by MAX_BATCH_SIZE.
- int leftCount = status.isLeftPositionAllowed() ? left.getRecordCount() : 0;
- int rightCount = status.isRightPositionAllowed() ? right.getRecordCount() : 0;
+ int leftCount = worker == null ? left.getRecordCount() : (status.isLeftPositionAllowed() ? left.getRecordCount() : 0);
+ int rightCount = worker == null ? left.getRecordCount() : (status.isRightPositionAllowed() ? right.getRecordCount() : 0);
int joinBatchSize = Math.min(Math.max(leftCount, rightCount) * 16, MAX_BATCH_SIZE);
// add fields from both batches
- if (leftCount > 0) {
+ if (worker == null || leftCount > 0) {
for (VectorWrapper<?> w : left) {
MajorType inputType = w.getField().getType();
@@ -430,12 +435,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
outputType = inputType;
}
ValueVector outgoingVector = TypeHelper.getNewVector(MaterializedField.create(w.getField().getPath(), outputType), oContext.getAllocator());
- VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / left.getRecordCount())).alloc(joinBatchSize);
+ VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / Math.max(1, left.getRecordCount()))).alloc(joinBatchSize);
container.add(outgoingVector);
}
}
- if (rightCount > 0) {
+ if (worker == null || rightCount > 0) {
for (VectorWrapper<?> w : right) {
MajorType inputType = w.getField().getType();
MajorType outputType;
@@ -445,7 +450,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
outputType = inputType;
}
ValueVector outgoingVector = TypeHelper.getNewVector(MaterializedField.create(w.getField().getPath(), outputType), oContext.getAllocator());
- VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / right.getRecordCount())).alloc(joinBatchSize);
+ VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / Math.max(1, right.getRecordCount()))).alloc(joinBatchSize);
container.add(outgoingVector);
}
}
@@ -465,7 +470,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
// materialize value vector readers from join expression
LogicalExpression materializedLeftExpr;
- if (status.isLeftPositionAllowed()) {
+ if (worker == null || status.isLeftPositionAllowed()) {
materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry());
} else {
materializedLeftExpr = new TypedNullConstant(Types.optional(MinorType.INT));
@@ -475,7 +480,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
"Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString()));
LogicalExpression materializedRightExpr;
- if (status.isRightPositionAllowed()) {
+ if (worker == null || status.isRightPositionAllowed()) {
materializedRightExpr = ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, context.getFunctionRegistry());
} else {
materializedRightExpr = new TypedNullConstant(Types.optional(MinorType.INT));
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index aba7370..c219cce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -18,10 +18,18 @@
package org.apache.drill.exec.physical.impl.materialize;
import java.util.Arrays;
+import java.util.List;
+import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.WritableBatch;
public class QueryWritableBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
@@ -48,6 +56,20 @@ public class QueryWritableBatch {
public String toString() {
return "QueryWritableBatch [header=" + header + ", buffers=" + Arrays.toString(buffers) + "]";
}
-
+
+ public static QueryWritableBatch getEmptyBatchWithSchema(QueryId queryId, int rowCount, boolean isLastChunk, BatchSchema schema) {
+ List<SerializedField> fields = Lists.newArrayList();
+ for (MaterializedField field : schema) {
+ fields.add(field.getAsBuilder().build());
+ }
+ RecordBatchDef def = RecordBatchDef.newBuilder().addAllField(fields).build();
+ QueryResult header = QueryResult.newBuilder() //
+ .setQueryId(queryId) //
+ .setRowCount(rowCount) //
+ .setDef(def) //
+ .setIsLastChunk(isLastChunk) //
+ .build();
+ return new QueryWritableBatch(header);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index a5d80b0..9351844 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -46,6 +46,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.record.RawFragmentBatchProvider;
@@ -97,6 +98,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private int[] batchOffsets;
private PriorityQueue <Node> pqueue;
private List<VectorAllocator> allocators;
+ private RawFragmentBatch emptyBatch = null;
+ private boolean done = false;
public static enum Metric implements MetricDef{
NEXT_WAIT_NANOS;
@@ -134,6 +137,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
@Override
public IterOutcome innerNext() {
if (fragProviders.length == 0) return IterOutcome.NONE;
+ if (done) return IterOutcome.NONE;
boolean schemaChanged = false;
if (prevBatchWasFull) {
@@ -155,6 +159,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// set up each (non-empty) incoming record batch
List<RawFragmentBatch> rawBatches = Lists.newArrayList();
+ boolean firstBatch = true;
for (RawFragmentBatchProvider provider : fragProviders) {
RawFragmentBatch rawBatch = null;
try {
@@ -165,12 +170,31 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
rawBatches.add(rawBatch);
+ } else if (emptyBatch == null) {
+ emptyBatch = rawBatch;
+ }
+ if (firstBatch) {
+ schema = BatchSchema.newBuilder().addSerializedFields(rawBatch.getHeader().getDef().getFieldList()).build();
}
}
// allocate the incoming record batch loaders
senderCount = rawBatches.size();
if (senderCount == 0) {
+ if (firstBatch) {
+ RecordBatchLoader loader = new RecordBatchLoader(oContext.getAllocator());
+ try {
+ loader.load(emptyBatch.getHeader().getDef(), emptyBatch.getBody());
+ } catch (SchemaChangeException e) {
+ throw new RuntimeException(e);
+ }
+ for (VectorWrapper w : loader) {
+ outgoingContainer.add(w.getValueVector());
+ }
+ outgoingContainer.buildSchema(SelectionVectorMode.NONE);
+ done = true;
+ return IterOutcome.OK_NEW_SCHEMA;
+ }
return IterOutcome.NONE;
}
incomingBatches = new RawFragmentBatch[senderCount];
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 23296fb..c4844d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -208,18 +208,17 @@ public class PartitionSenderRootExec extends BaseRootExec {
public void sendEmptyBatch() {
FragmentHandle handle = context.getHandle();
int fieldId = 0;
- VectorContainer container = new VectorContainer();
StatusHandler statusHandler = new StatusHandler(sendCount, context);
for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
DataTunnel tunnel = context.getDataTunnel(endpoint, opposite);
- FragmentWritableBatch writableBatch = new FragmentWritableBatch(true,
+ FragmentWritableBatch writableBatch = FragmentWritableBatch.getEmptyLastWithSchema(
handle.getQueryId(),
handle.getMajorFragmentId(),
handle.getMinorFragmentId(),
operator.getOppositeMajorFragmentId(),
fieldId,
- WritableBatch.getBatchNoHVWrap(0, container, false));
+ incoming.getSchema());
stats.startWait();
try {
tunnel.sendRecordBatch(statusHandler, writableBatch);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 5ee01f1..e6ddf90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -51,6 +51,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index bf9db9a..ba200f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -69,11 +69,12 @@ public class SortRecordBatchBuilder {
*/
public boolean add(VectorAccessible batch){
if(batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
- if (batch.getRecordCount() == 0)
+ if (batch.getRecordCount() == 0 && batches.size() > 0) {
return true; // skip over empty record batches.
+ }
long batchBytes = getSize(batch);
- if (batchBytes == 0) {
+ if (batchBytes == 0 && batches.size() > 0) {
return true;
}
if(batchBytes + runningBytes > maxBytes) return false; // enough data memory.
@@ -81,7 +82,6 @@ public class SortRecordBatchBuilder {
if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false; // sv allocation available.
- if (batch.getRecordCount() == 0) return true;
RecordBatchData bd = new RecordBatchData(batch);
runningBytes += batchBytes;
batches.put(batch.getSchema(), bd);
@@ -91,7 +91,7 @@ public class SortRecordBatchBuilder {
public boolean add(RecordBatchData rbd) {
long batchBytes = getSize(rbd.getContainer());
- if (batchBytes == 0) {
+ if (batchBytes == 0 && batches.size() > 0) {
return true;
}
if(batchBytes + runningBytes > maxBytes) {
@@ -105,7 +105,7 @@ public class SortRecordBatchBuilder {
}
- if (rbd.getRecordCount() == 0) return true;
+ if (rbd.getRecordCount() == 0 && batches.size() > 0) return true;
runningBytes += batchBytes;
batches.put(rbd.getContainer().getSchema(), rbd);
recordCount += rbd.getRecordCount();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 19f6497..ee8f37a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -35,6 +35,7 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
private IterOutcome state = IterOutcome.NOT_YET;
private final RecordBatch incoming;
+ private boolean first = true;
public IteratorValidatorBatchIterator(RecordBatch incoming) {
this.incoming = incoming;
@@ -67,7 +68,6 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
@Override
public BatchSchema getSchema() {
- validateReadState();
return incoming.getSchema();
}
@@ -110,6 +110,10 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
public IterOutcome next() {
if(state == IterOutcome.NONE ) throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again.");
state = incoming.next();
+ if (first && state == IterOutcome.NONE) {
+ throw new IllegalStateException("The incoming iterator returned a state of NONE on the first batch. There should always be at least one batch output before returning NONE");
+ }
+ if (first) first = !first;
if(state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {
BatchSchema schema = incoming.getSchema();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 02b9ba0..237a631 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -102,6 +102,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private int spillCount = 0;
private int batchesSinceLastSpill = 0;
private long uid;//used for spill files to ensure multiple sorts within same fragment don't clobber each others' files
+ private boolean useIncomingSchema = false;
+ private boolean first = true;
public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
@@ -144,6 +146,17 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return this.sv4;
}
+ @Override
+ public BatchSchema getSchema() {
+ if (useIncomingSchema) {
+ List<MaterializedField> fields = Lists.newArrayList();
+ for (MaterializedField field : incoming.getSchema()) {
+ fields.add(field);
+ }
+ return BatchSchema.newBuilder().addFields(fields).setSelectionVectorMode(SelectionVectorMode.FOUR_BYTE).build();
+ }
+ return super.getSchema();
+ }
@Override
@@ -205,6 +218,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
switch (upstream) {
case NONE:
+ assert !first;
break outer;
case NOT_YET:
throw new UnsupportedOperationException();
@@ -212,6 +226,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return upstream;
case OK_NEW_SCHEMA:
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
+ first = false;
if(!incoming.getSchema().equals(schema)){
if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
this.schema = incoming.getSchema();
@@ -220,6 +235,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// fall through.
case OK:
SelectionVector2 sv2;
+// if (incoming.getRecordCount() == 0) {
+// break outer;
+// }
if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
sv2 = incoming.getSelectionVector2();
} else {
@@ -231,9 +249,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
int count = sv2.getCount();
totalcount += count;
- if (count == 0) {
- break outer;
- }
+// if (count == 0) {
+// break outer;
+// }
sorter.setup(context, sv2, incoming);
Stopwatch w = new Stopwatch();
w.start();
@@ -261,18 +279,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
}
- if (schema == null || totalcount == 0){
- // builder may be null at this point if the first incoming batch is empty
- return IterOutcome.NONE;
- }
+// if (schema == null || totalcount == 0){
+// builder may be null at this point if the first incoming batch is empty
+// useIncomingSchema = true;
+// return IterOutcome.NONE;
+// }
if (spillCount == 0) {
Stopwatch watch = new Stopwatch();
watch.start();
- if (schema == null){
+// if (schema == null){
// builder may be null at this point if the first incoming batch is empty
- return IterOutcome.NONE;
- }
+// useIncomingSchema = true;
+// return IterOutcome.NONE;
+// }
builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index 0ba84f9..3cb7641 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -48,7 +48,9 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
QuickSort qs = new QuickSort();
Stopwatch watch = new Stopwatch();
watch.start();
- qs.sort(this, 0, vector2.getCount());
+ if (vector2.getCount() > 0) {
+ qs.sort(this, 0, vector2.getCount());
+ }
logger.debug("Took {} us to sort {} records", watch.elapsed(TimeUnit.MICROSECONDS), vector2.getCount());
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 9473945..721755d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -43,7 +43,6 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
@Override
public IterOutcome innerNext() {
IterOutcome upstream = next(incoming);
- if(first && upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA;
if (!first && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
do {
for (VectorWrapper w : incoming) {
@@ -51,15 +50,17 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
}
} while ((upstream = next(incoming)) == IterOutcome.OK && incoming.getRecordCount() == 0);
}
- first = false;
+ if(first && upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA;
switch(upstream){
case NONE:
+ assert !first;
case NOT_YET:
case STOP:
return upstream;
case OUT_OF_MEMORY:
return upstream;
case OK_NEW_SCHEMA:
+ first = false;
try{
stats.startSetup();
setupNewSchema();
@@ -73,6 +74,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
}
// fall through.
case OK:
+ assert !first : "First batch should be OK_NEW_SCHEMA";
doWork();
if (outOfMemory) {
outOfMemory = false;
@@ -91,6 +93,11 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
incoming.cleanup();
}
+ @Override
+ public BatchSchema getSchema() {
+ return container.getSchema();
+ }
+
protected abstract void setupNewSchema() throws SchemaChangeException;
protected abstract void doWork();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index b4da6e0..5af3fb8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -17,6 +17,9 @@
*/
package org.apache.drill.exec.record;
+
+import com.google.common.collect.Lists;
+
import java.util.Iterator;
import java.util.List;
@@ -54,6 +57,12 @@ public class BatchSchema implements Iterable<MaterializedField> {
return selectionVectorMode;
}
+ public BatchSchema clone() {
+ List<MaterializedField> newFields = Lists.newArrayList();
+ newFields.addAll(fields);
+ return new BatchSchema(selectionVectorMode, newFields);
+ }
+
@Override
public String toString() {
return "BatchSchema [fields=" + fields + ", selectionVector=" + selectionVectorMode + "]";
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index 33fd5a2..ef7b5f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -17,12 +17,17 @@
*/
package org.apache.drill.exec.record;
+import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDefOrBuilder;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+
+import java.util.List;
public class FragmentWritableBatch{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWritableBatch.class);
@@ -59,6 +64,17 @@ public class FragmentWritableBatch{
return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentId, EMPTY_DEF);
}
+ public static FragmentWritableBatch getEmptyLastWithSchema(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId,
+ int receiveMajorFragmentId, int receiveMinorFragmentId, BatchSchema schema){
+
+ List<SerializedField> fields = Lists.newArrayList();
+ for (MaterializedField field : schema) {
+ fields.add(field.getAsBuilder().build());
+ }
+ RecordBatchDef def = RecordBatchDef.newBuilder().addAllField(fields).build();
+ return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentId, def);
+ }
+
public ByteBuf[] getBuffers(){
return buffers;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index cac610c..33bcb3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -25,6 +25,7 @@ import java.util.Map;
import javax.jdo.metadata.FieldMetadata;
+import io.netty.buffer.EmptyByteBuf;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
@@ -85,7 +86,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
v = TypeHelper.getNewVector(fieldDef, allocator);
}
if (fmd.getValueCount() == 0){
- v.clear();
+// v.clear();
+ v.load(fmd, new EmptyByteBuf(allocator.getUnderlyingAllocator()));
} else {
v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
index c954354..f405585 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
@@ -21,6 +21,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import com.google.common.collect.Lists;
@@ -64,6 +65,13 @@ public class SchemaBuilder {
}
return this;
}
+
+ public SchemaBuilder addSerializedFields(Iterable<SerializedField> fields) {
+ for (SerializedField f : fields) {
+ addField(MaterializedField.create(f));
+ }
+ return this;
+ }
// private void setTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass)
// throws SchemaChangeException {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 1d6ca33..308db3d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -23,6 +23,11 @@ import org.junit.Test;
public class TestExampleQueries extends BaseTestQuery{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestExampleQueries.class);
+
+ @Test
+ public void testQ() throws Exception {
+ test("select * from cp.`customer.json` where 0 = 1");
+ }
@Test // see DRILL-553
public void testQueryWithNullValues() throws Exception {
@@ -102,6 +107,7 @@ public class TestExampleQueries extends BaseTestQuery{
@Test
public void testJoin() throws Exception{
+ test("alter session set `planner.enable_hashjoin` = false");
test("SELECT\n" +
" nations.N_NAME,\n" +
" regions.R_NAME\n" +
@@ -109,7 +115,7 @@ public class TestExampleQueries extends BaseTestQuery{
" dfs.`[WORKING_PATH]/../../sample-data/nation.parquet` nations\n" +
"JOIN\n" +
" dfs.`[WORKING_PATH]/../../sample-data/region.parquet` regions\n" +
- " on nations.N_REGIONKEY = regions.R_REGIONKEY");
+ " on nations.N_REGIONKEY = regions.R_REGIONKEY where 1 = 0");
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/mergerecv/empty_batch.json b/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
index 361c4af..55b3f7d 100644
--- a/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
+++ b/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
@@ -27,7 +27,7 @@
{
@id: 2,
child: 1,
- pop: "sort",
+ pop: "external-sort",
orderings: [ {expr: "blue", order:"DESC"}, {expr: "red", order:"DESC"} ]
},
{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
index 1145b84..c2c9dd8 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
@@ -20,16 +20,15 @@ package org.apache.drill.jdbc;
import java.sql.SQLException;
import java.util.Calendar;
import java.util.List;
-import java.util.concurrent.LinkedBlockingDeque;
import net.hydromatic.avatica.ColumnMetaData;
import net.hydromatic.avatica.Cursor;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.user.QueryResultBatch;
-import org.apache.drill.jdbc.DrillResultSet.Listener;
public class DrillCursor implements Cursor{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillCursor.class);
@@ -41,8 +40,10 @@ public class DrillCursor implements Cursor{
private final RecordBatchLoader currentBatch;
private final DrillResultSet.Listener listener;
private boolean redoFirstNext = false;
+ private boolean first = true;
private DrillColumnMetaDataList columnMetaDataList;
+ private BatchSchema schema;
final DrillResultSet results;
int currentRecord = 0;
@@ -82,11 +83,13 @@ public class DrillCursor implements Cursor{
try {
QueryResultBatch qrb = listener.getNext();
recordBatchCount++;
- while(qrb != null && qrb.getHeader().getRowCount() == 0 ){
+ while(qrb != null && qrb.getHeader().getRowCount() == 0 && !first){
qrb.release();
qrb = listener.getNext();
recordBatchCount++;
}
+
+ first = false;
if(qrb == null){
finished = true;
@@ -94,7 +97,11 @@ public class DrillCursor implements Cursor{
}else{
currentRecord = 0;
boolean changed = currentBatch.load(qrb.getHeader().getDef(), qrb.getData());
+ schema = currentBatch.getSchema();
if(changed) updateColumns();
+ if (redoFirstNext && currentBatch.getRecordCount() == 0) {
+ redoFirstNext = false;
+ }
return true;
}
} catch (RpcException | InterruptedException | SchemaChangeException e) {
@@ -106,8 +113,8 @@ public class DrillCursor implements Cursor{
void updateColumns(){
accessors.generateAccessors(this, currentBatch);
- columnMetaDataList.updateColumnMetaData(UNKNOWN, UNKNOWN, UNKNOWN, currentBatch.getSchema());
- if(results.changeListener != null) results.changeListener.schemaChanged(currentBatch.getSchema());
+ columnMetaDataList.updateColumnMetaData(UNKNOWN, UNKNOWN, UNKNOWN, schema);
+ if(results.changeListener != null) results.changeListener.schemaChanged(schema);
}
public long getRecordBatchCount(){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java
index 30a7144..93cfce3 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java
@@ -23,12 +23,15 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.Statement;
+import java.util.List;
import java.util.concurrent.TimeUnit;
+import com.google.common.collect.Lists;
import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
import org.apache.drill.jdbc.Driver;
import org.apache.drill.jdbc.JdbcTest;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@@ -182,16 +185,17 @@ public class TestJdbcDistQuery extends JdbcTest{
Statement s = c.createStatement();
ResultSet r = s.executeQuery(sql);
boolean first = true;
- while (r.next()) {
- ResultSetMetaData md = r.getMetaData();
- if (first == true) {
- for (int i = 1; i <= md.getColumnCount(); i++) {
- System.out.print(md.getColumnName(i));
- System.out.print('\t');
- }
- System.out.println();
- first = false;
+ ResultSetMetaData md = r.getMetaData();
+ if (first == true) {
+ for (int i = 1; i <= md.getColumnCount(); i++) {
+ System.out.print(md.getColumnName(i));
+ System.out.print('\t');
}
+ System.out.println();
+ first = false;
+ }
+ while(r.next()){
+ md = r.getMetaData();
for (int i = 1; i <= md.getColumnCount(); i++) {
System.out.print(r.getObject(i));
@@ -211,4 +215,24 @@ public class TestJdbcDistQuery extends JdbcTest{
}
+
+ @Test
+ public void testSchemaForEmptyResultSet() throws Exception {
+ String query = "select fullname, occupation, postal_code from cp.`customer.json` where 0 = 1";
+ try (Connection c = DriverManager.getConnection("jdbc:drill:zk=local", null);) {
+ Statement s = c.createStatement();
+ ResultSet r = s.executeQuery(query);
+ ResultSetMetaData md = r.getMetaData();
+ List<String> columns = Lists.newArrayList();
+ for (int i = 1; i <= md.getColumnCount(); i++) {
+ System.out.print(md.getColumnName(i));
+ System.out.print('\t');
+ columns.add(md.getColumnName(i));
+ }
+ String[] expected = {"fullname", "occupation", "postal_code"};
+ Assert.assertEquals(3, md.getColumnCount());
+ Assert.assertArrayEquals(expected, columns.toArray());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/jdbc/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/resources/bootstrap-storage-plugins.json b/exec/jdbc/src/test/resources/bootstrap-storage-plugins.json
index 3861317..53600f3 100644
--- a/exec/jdbc/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/jdbc/src/test/resources/bootstrap-storage-plugins.json
@@ -32,6 +32,9 @@
},
"parquet" : {
type: "parquet"
+ },
+ "json" : {
+ type: "json"
}
}
},