You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/20 22:24:55 UTC

[19/32] git commit: DRILL-884: Always return a schema, even when there are no records

DRILL-884: Always return a schema, even when there are no records


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/a314c824
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/a314c824
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/a314c824

Branch: refs/heads/master
Commit: a314c824ba99edf0c29b004c121904847bab2c15
Parents: a3bf05d
Author: Steven Phillips <sp...@maprtech.com>
Authored: Wed Jun 4 22:24:12 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 19 20:30:55 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/ScanBatch.java     | 18 ++++++++-
 .../drill/exec/physical/impl/ScreenCreator.java | 22 ++++++----
 .../exec/physical/impl/SingleSenderCreator.java |  6 ++-
 .../exec/physical/impl/TopN/TopNBatch.java      | 11 ++++-
 .../exec/physical/impl/WireRecordBatch.java     | 12 +++++-
 .../physical/impl/aggregate/HashAggBatch.java   | 25 ++++++++++--
 .../impl/aggregate/HashAggTemplate.java         |  2 +-
 .../impl/aggregate/StreamingAggBatch.java       | 27 ++++++++++---
 .../impl/aggregate/StreamingAggTemplate.java    |  9 ++++-
 .../exec/physical/impl/join/HashJoinBatch.java  |  5 ++-
 .../exec/physical/impl/join/MergeJoinBatch.java | 29 ++++++++------
 .../impl/materialize/QueryWritableBatch.java    | 24 ++++++++++-
 .../impl/mergereceiver/MergingRecordBatch.java  | 24 +++++++++++
 .../PartitionSenderRootExec.java                |  5 +--
 .../impl/project/ProjectRecordBatch.java        |  1 +
 .../impl/sort/SortRecordBatchBuilder.java       | 10 ++---
 .../IteratorValidatorBatchIterator.java         |  6 ++-
 .../physical/impl/xsort/ExternalSortBatch.java  | 40 ++++++++++++++-----
 .../impl/xsort/SingleBatchSorterTemplate.java   |  4 +-
 .../exec/record/AbstractSingleRecordBatch.java  | 11 ++++-
 .../apache/drill/exec/record/BatchSchema.java   |  9 +++++
 .../exec/record/FragmentWritableBatch.java      | 16 ++++++++
 .../drill/exec/record/RecordBatchLoader.java    |  4 +-
 .../apache/drill/exec/record/SchemaBuilder.java |  8 ++++
 .../org/apache/drill/TestExampleQueries.java    |  8 +++-
 .../test/resources/mergerecv/empty_batch.json   |  2 +-
 .../java/org/apache/drill/jdbc/DrillCursor.java | 17 +++++---
 .../drill/jdbc/test/TestJdbcDistQuery.java      | 42 +++++++++++++++-----
 .../resources/bootstrap-storage-plugins.json    |  3 ++
 29 files changed, 322 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 55d3f62..5f8bfb9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -80,6 +80,8 @@ public class ScanBatch implements RecordBatch {
   private List<ValueVector> partitionVectors;
   private List<Integer> selectedPartitionColumns;
   private String partitionColumnDesignator;
+  private boolean first = true;
+  private boolean done = false;
 
   public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
     this.context = context;
@@ -92,7 +94,7 @@ public class ScanBatch implements RecordBatch {
     this.partitionColumns = partitionColumns.iterator();
     this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
     this.selectedPartitionColumns = selectedPartitionColumns;
-    DrillConfig config = context.getConfig(); //This nonsense it is to not break all the stupid unit tests using SimpleRootExec
+    DrillConfig config = context.getConfig();
     this.partitionColumnDesignator = config == null ? "dir" : config.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
     addPartitionVectors();
   }
@@ -122,11 +124,14 @@ public class ScanBatch implements RecordBatch {
   }
 
   private void releaseAssets() {
-    container.clear();
+    container.zeroVectors();
   }
 
   @Override
   public IterOutcome next() {
+    if (done) {
+      return IterOutcome.NONE;
+    }
     oContext.getStats().startProcessing();
     try {
       mutator.allocate(MAX_RECORD_CNT);
@@ -135,6 +140,14 @@ public class ScanBatch implements RecordBatch {
           if (!readers.hasNext()) {
             currentReader.cleanup();
             releaseAssets();
+            if (first) {
+              first = false;
+              done = true;
+              populatePartitionVectors();
+              container.buildSchema(SelectionVectorMode.NONE);
+              schema = container.getSchema();
+              return IterOutcome.OK_NEW_SCHEMA;
+            }
             return IterOutcome.NONE;
           }
           oContext.getStats().startSetup();
@@ -154,6 +167,7 @@ public class ScanBatch implements RecordBatch {
           return IterOutcome.STOP;
         }
       }
+      first = false;
 
       populatePartitionVectors();
       if (mutator.isNewSchema()) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 86e77d8..9ad85af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -64,6 +64,7 @@ public class ScreenCreator implements RootCreator<Screen>{
     final FragmentContext context;
     final UserClientConnection connection;
     private RecordMaterializer materializer;
+    private boolean first = true;
 
     public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
       super(context, config);
@@ -108,13 +109,18 @@ public class ScreenCreator implements RootCreator<Screen>{
       case NONE: {
         sendCount.waitForSendComplete();
 //        context.getStats().batchesCompleted.inc(1);
-        QueryResult header = QueryResult.newBuilder() //
-            .setQueryId(context.getHandle().getQueryId()) //
-            .setRowCount(0) //
-            .setDef(RecordBatchDef.getDefaultInstance()) //
-            .setIsLastChunk(true) //
-            .build();
-        QueryWritableBatch batch = new QueryWritableBatch(header);
+        QueryWritableBatch batch;
+        if (!first) {
+          QueryResult header = QueryResult.newBuilder() //
+              .setQueryId(context.getHandle().getQueryId()) //
+              .setRowCount(0) //
+              .setDef(RecordBatchDef.getDefaultInstance()) //
+              .setIsLastChunk(true) //
+              .build();
+          batch = new QueryWritableBatch(header);
+        } else {
+          batch = QueryWritableBatch.getEmptyBatchWithSchema(context.getHandle().getQueryId(), 0, true, incoming.getSchema());
+        }
         stats.startWait();
         try {
           connection.sendResult(listener, batch);
@@ -140,6 +146,7 @@ public class ScreenCreator implements RootCreator<Screen>{
         }
         sendCount.increment();
 
+        first = false;
         return true;
       default:
         throw new UnsupportedOperationException();
@@ -181,6 +188,7 @@ public class ScreenCreator implements RootCreator<Screen>{
     }
 
 
+
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 9e91468..1b63112 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -80,7 +80,8 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
       switch(out){
       case STOP:
       case NONE:
-        FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0);
+        FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(handle.getQueryId(), handle.getMajorFragmentId(),
+                handle.getMinorFragmentId(), recMajor, 0, incoming.getSchema());
         sendCount.increment();
         stats.startWait();
         try {
@@ -92,7 +93,8 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
 
       case OK_NEW_SCHEMA:
       case OK:
-        FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+        FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(),
+                handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
         sendCount.increment();
         stats.startWait();
         try {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index c9cd2dd..4a5d368 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -46,7 +46,9 @@ import org.apache.drill.exec.physical.impl.svremover.Copier;
 import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
+import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorAccessible;
@@ -113,7 +115,14 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     return sv4;
   }
 
-
+  @Override
+  public BatchSchema getSchema() {
+    List<MaterializedField> fields = Lists.newArrayList();
+    for (MaterializedField field : incoming.getSchema()) {
+      fields.add(field);
+    }
+    return BatchSchema.newBuilder().addFields(fields).setSelectionVectorMode(SelectionVectorMode.FOUR_BYTE).build();
+  }
 
   @Override
   public void cleanup() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index 2a19ba7..1eae0c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -48,6 +48,7 @@ public class WireRecordBatch implements RecordBatch {
   private FragmentContext context;
   private BatchSchema schema;
   private OperatorStats stats;
+  private boolean first = true;
 
 
   public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, RandomReceiver config) throws OutOfMemoryException {
@@ -114,13 +115,22 @@ public class WireRecordBatch implements RecordBatch {
         batch = fragProvider.getNext();
 
         // skip over empty batches. we do this since these are basically control messages.
-        while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){
+        while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && !first){
+          if (first) {
+            first = false;
+            RecordBatchDef rbd = batch.getHeader().getDef();
+            batchLoader.load(rbd, batch.getBody());
+            batch.release();
+            schema = batchLoader.getSchema().clone();
+            batchLoader.clear();
+          }
           batch = fragProvider.getNext();
         }
       } finally {
         stats.stopWait();
       }
 
+      first = false;
 
       if (batch == null){
         batchLoader.clear();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 6adc304..3609c02 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -40,9 +40,11 @@ import org.apache.drill.exec.expr.ValueVectorWriteExpression;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -65,6 +67,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
   private LogicalExpression[] aggrExprs;
   private TypedFieldId[] groupByOutFieldIds ;
   private TypedFieldId[] aggrOutFieldIds ;      // field ids for the outgoing batch
+  private boolean first = true;
 
   private final GeneratorMapping UPDATE_AGGR_INSIDE =
     GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */,
@@ -90,12 +93,16 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
   @Override
   public IterOutcome innerNext() {
+    if (done) {
+      return IterOutcome.NONE;
+    }
     // this is only called on the first batch. Beyond this, the aggregator manages batches.
     if (aggregator == null) {
       IterOutcome outcome = next(incoming);
       logger.debug("Next outcome of {}", outcome);
       switch (outcome) {
       case NONE:
+        throw new UnsupportedOperationException("Received NONE on first batch");
       case NOT_YET:
       case STOP:
         return outcome;
@@ -118,7 +125,13 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
 
   if (aggregator.buildComplete() && ! aggregator.allFlushed()) {
     // aggregation is complete and not all records have been output yet
-    return aggregator.outputCurrentBatch();
+    IterOutcome outcome = aggregator.outputCurrentBatch();
+    if (outcome == IterOutcome.NONE && first) {
+      first = false;
+      done = true;
+      return IterOutcome.OK_NEW_SCHEMA;
+    }
+    return outcome;
   }
 
   logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
@@ -128,11 +141,17 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
       logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
       switch(out){
       case CLEANUP_AND_RETURN:
-        container.clear();
+        container.zeroVectors();
         aggregator.cleanup();
         done = true;
-        return aggregator.getOutcome();
+        // fall through
       case RETURN_OUTCOME:
+        IterOutcome outcome = aggregator.getOutcome();
+        if (outcome == IterOutcome.NONE && first) {
+          first = false;
+          done = true;
+          return IterOutcome.OK_NEW_SCHEMA;
+        }
         return aggregator.getOutcome();
       case UPDATE_AGGREGATOR:
         aggregator = null;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 5069a2d..935bbb3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -429,7 +429,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     // get the number of groups in the batch holder corresponding to this batch index
     int batchOutputRecords = batchHolders.get(outBatchIndex).getNumGroups();
     
-    if (batchOutputRecords == 0) {
+    if (!first && batchOutputRecords == 0) {
       this.outcome = IterOutcome.NONE;
       return outcome;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index ec12de9..367d2c7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -45,6 +45,7 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.impl.aggregate.StreamingAggregator.AggOutcome;
 import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
@@ -65,6 +66,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   private StreamingAggregator aggregator;
   private final RecordBatch incoming;
   private boolean done = false;
+  private boolean first = true;
 
   public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
     super(popConfig, context);
@@ -74,12 +76,17 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   @Override
   public int getRecordCount() {
     if(done) return 0;
+    if (aggregator == null) return 0;
     return aggregator.getOutputCount();
   }
 
   @Override
   public IterOutcome innerNext() {
-    // this is only called on the first batch. Beyond this, the aggregator manages batches.
+    if (done) {
+      container.zeroVectors();
+      return IterOutcome.NONE;
+    }
+      // this is only called on the first batch. Beyond this, the aggregator manages batches.
     if (aggregator == null) {
       IterOutcome outcome = next(incoming);
       logger.debug("Next outcome of {}", outcome);
@@ -106,17 +113,25 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
       switch(out){
       case CLEANUP_AND_RETURN:
-        container.clear();
+        if (!first) container.zeroVectors();
         done = true;
-        return aggregator.getOutcome();
+        // fall through
       case RETURN_OUTCOME:
-        return aggregator.getOutcome();
+        IterOutcome outcome = aggregator.getOutcome();
+        if (outcome == IterOutcome.NONE && first) {
+          first = false;
+          done = true;
+          return IterOutcome.OK_NEW_SCHEMA;
+        }
+        first = false;
+        return outcome;
       case UPDATE_AGGREGATOR:
+        first = false;
         aggregator = null;
         if(!createAggregator()){
           return IterOutcome.STOP;
-        }
-        continue;
+      }
+      continue;
       default:
         throw new IllegalStateException(String.format("Unknown state %s.", out));
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index e73f21b..3bd861d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -58,7 +58,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     this.allocators = allocators;
     this.outgoing = outgoing;
     setupInterior(incoming, outgoing);
-    this.currentIndex = this.getVectorIndex(underlyingIndex);
+    this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex);
   }
 
 
@@ -158,13 +158,18 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
 
         try{
           while(true){
+            if (previous != null) {
+              previous.clear();
+            }
             previous = new InternalBatch(incoming);
             IterOutcome out = outgoing.next(0, incoming);
             if(EXTRA_DEBUG) logger.debug("Received IterOutcome of {}", out);
             switch(out){
             case NONE:
               lastOutcome = out;
-              if(addedRecordCount > 0){
+              if (first && addedRecordCount == 0) {
+                return setOkAndReturn();
+              } else if(addedRecordCount > 0){
                 if( !outputToBatchPrev( previous, previousIndex, outputCount) ) remainderBatch = previous;
                 if(EXTRA_DEBUG) logger.debug("Received no more batches, returning.");
                 return setOkAndReturn();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 11368e3..1c028d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -109,6 +109,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
     // Schema of the build side
     private BatchSchema rightSchema = null;
 
+    private boolean first = true;
+
     // Generator mapping for the build side
     private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
                                                                                   "projectBuildRecord" /* eval method */,
@@ -187,7 +189,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
                  * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
                  * Either case build the output container's schema and return
                  */
-                if (outputRecords > 0) {
+                if (outputRecords > 0 || first) {
+                  first = false;
 
                   // Build the container schema and set the counts
                   container.buildSchema(BatchSchema.SelectionVectorMode.NONE);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index e32b653..6943d1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -110,6 +110,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
   private final JoinRelType joinType;
   private JoinWorker worker;
   public MergeJoinBatchBuilder batchBuilder;
+  private boolean done = false;
 
   protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
     super(popConfig, context);
@@ -136,6 +137,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
 
   @Override
   public IterOutcome innerNext() {
+    if (done) {
+      return IterOutcome.NONE;
+    }
     // we do this in the here instead of the constructor because don't necessary want to start consuming on construction.
     status.ensureInitial();
 
@@ -190,9 +194,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
         kill();
         return IterOutcome.STOP;
       case NO_MORE_DATA:
-        logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE"));
+        logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" :"NONE")));
         setRecordCountInContainer();
-        return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE;
+        done = true;
+        return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.NONE);
       case SCHEMA_CHANGED:
         worker = null;
         if(status.getOutPosition() > 0){
@@ -349,7 +354,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     //////////////////////
     cg.setMappingSet(copyLeftMapping);
     int vectorId = 0;
-    if (status.isLeftPositionAllowed()) {
+    if (worker == null || status.isLeftPositionAllowed()) {
       for (VectorWrapper<?> vw : left) {
         MajorType inputType = vw.getField().getType();
         MajorType outputType;
@@ -379,7 +384,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     cg.setMappingSet(copyRightMappping);
 
     int rightVectorBase = vectorId;
-    if (status.isRightPositionAllowed()) {
+    if (worker == null || status.isRightPositionAllowed()) {
       for (VectorWrapper<?> vw : right) {
         MajorType inputType = vw.getField().getType();
         MajorType outputType;
@@ -414,12 +419,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     container.clear();
 
     //estimation of joinBatchSize : max of left/right size, expanded by a factor of 16, which is then bounded by MAX_BATCH_SIZE.
-    int leftCount = status.isLeftPositionAllowed() ? left.getRecordCount() : 0;
-    int rightCount = status.isRightPositionAllowed() ? right.getRecordCount() : 0;
+    int leftCount = worker == null ? left.getRecordCount() : (status.isLeftPositionAllowed() ? left.getRecordCount() : 0);
+    int rightCount = worker == null ? left.getRecordCount() : (status.isRightPositionAllowed() ? right.getRecordCount() : 0);
     int joinBatchSize = Math.min(Math.max(leftCount, rightCount) * 16, MAX_BATCH_SIZE);
 
     // add fields from both batches
-    if (leftCount > 0) {
+    if (worker == null || leftCount > 0) {
 
       for (VectorWrapper<?> w : left) {
         MajorType inputType = w.getField().getType();
@@ -430,12 +435,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
           outputType = inputType;
         }
         ValueVector outgoingVector = TypeHelper.getNewVector(MaterializedField.create(w.getField().getPath(), outputType), oContext.getAllocator());
-        VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / left.getRecordCount())).alloc(joinBatchSize);
+        VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / Math.max(1, left.getRecordCount()))).alloc(joinBatchSize);
         container.add(outgoingVector);
       }
     }
 
-    if (rightCount > 0) {
+    if (worker == null || rightCount > 0) {
       for (VectorWrapper<?> w : right) {
         MajorType inputType = w.getField().getType();
         MajorType outputType;
@@ -445,7 +450,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
           outputType = inputType;
         }
         ValueVector outgoingVector = TypeHelper.getNewVector(MaterializedField.create(w.getField().getPath(), outputType), oContext.getAllocator());
-        VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / right.getRecordCount())).alloc(joinBatchSize);
+        VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / Math.max(1, right.getRecordCount()))).alloc(joinBatchSize);
         container.add(outgoingVector);
       }
     }
@@ -465,7 +470,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
 
       // materialize value vector readers from join expression
       LogicalExpression materializedLeftExpr;
-      if (status.isLeftPositionAllowed()) {
+      if (worker == null || status.isLeftPositionAllowed()) {
         materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry());
       } else {
         materializedLeftExpr = new TypedNullConstant(Types.optional(MinorType.INT));
@@ -475,7 +480,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
             "Failure while trying to materialize incoming left field.  Errors:\n %s.", collector.toErrorString()));
 
       LogicalExpression materializedRightExpr;
-      if (status.isRightPositionAllowed()) {
+      if (worker == null || status.isRightPositionAllowed()) {
         materializedRightExpr = ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, context.getFunctionRegistry());
       } else {
         materializedRightExpr = new TypedNullConstant(Types.optional(MinorType.INT));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index aba7370..c219cce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -18,10 +18,18 @@
 package org.apache.drill.exec.physical.impl.materialize;
 
 import java.util.Arrays;
+import java.util.List;
 
+import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.WritableBatch;
 
 public class QueryWritableBatch {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
@@ -48,6 +56,20 @@ public class QueryWritableBatch {
   public String toString() {
     return "QueryWritableBatch [header=" + header + ", buffers=" + Arrays.toString(buffers) + "]";
   }
-  
+
+  public static QueryWritableBatch getEmptyBatchWithSchema(QueryId queryId, int rowCount, boolean isLastChunk, BatchSchema schema) {
+    List<SerializedField> fields = Lists.newArrayList();
+    for (MaterializedField field : schema) {
+      fields.add(field.getAsBuilder().build());
+    }
+    RecordBatchDef def = RecordBatchDef.newBuilder().addAllField(fields).build();
+    QueryResult header = QueryResult.newBuilder() //
+            .setQueryId(queryId) //
+            .setRowCount(rowCount) //
+            .setDef(def) //
+            .setIsLastChunk(isLastChunk) //
+            .build();
+    return new QueryWritableBatch(header);
+  }
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index a5d80b0..9351844 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -46,6 +46,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
 import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.record.RawFragmentBatchProvider;
@@ -97,6 +98,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   private int[] batchOffsets;
   private PriorityQueue <Node> pqueue;
   private List<VectorAllocator> allocators;
+  private RawFragmentBatch emptyBatch = null;
+  private boolean done = false;
 
   public static enum Metric implements MetricDef{
     NEXT_WAIT_NANOS;
@@ -134,6 +137,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
   @Override
   public IterOutcome innerNext() {
     if (fragProviders.length == 0) return IterOutcome.NONE;
+    if (done) return IterOutcome.NONE;
     boolean schemaChanged = false;
 
     if (prevBatchWasFull) {
@@ -155,6 +159,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
 
       // set up each (non-empty) incoming record batch
       List<RawFragmentBatch> rawBatches = Lists.newArrayList();
+      boolean firstBatch = true;
       for (RawFragmentBatchProvider provider : fragProviders) {
         RawFragmentBatch rawBatch = null;
         try {
@@ -165,12 +170,31 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
         }
         if (rawBatch.getHeader().getDef().getRecordCount() != 0) {
           rawBatches.add(rawBatch);
+        } else if (emptyBatch == null) {
+          emptyBatch = rawBatch;
+        }
+        if (firstBatch) {
+          schema = BatchSchema.newBuilder().addSerializedFields(rawBatch.getHeader().getDef().getFieldList()).build();
         }
       }
 
       // allocate the incoming record batch loaders
       senderCount = rawBatches.size();
       if (senderCount == 0) {
+        if (firstBatch) {
+          RecordBatchLoader loader = new RecordBatchLoader(oContext.getAllocator());
+          try {
+            loader.load(emptyBatch.getHeader().getDef(), emptyBatch.getBody());
+          } catch (SchemaChangeException e) {
+            throw new RuntimeException(e);
+          }
+          for (VectorWrapper w : loader) {
+            outgoingContainer.add(w.getValueVector());
+          }
+          outgoingContainer.buildSchema(SelectionVectorMode.NONE);
+          done = true;
+          return IterOutcome.OK_NEW_SCHEMA;
+        }
         return IterOutcome.NONE;
       }
       incomingBatches = new RawFragmentBatch[senderCount];

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 23296fb..c4844d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -208,18 +208,17 @@ public class PartitionSenderRootExec extends BaseRootExec {
   public void sendEmptyBatch() {
     FragmentHandle handle = context.getHandle();
     int fieldId = 0;
-    VectorContainer container = new VectorContainer();
     StatusHandler statusHandler = new StatusHandler(sendCount, context);
     for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
       FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
       DataTunnel tunnel = context.getDataTunnel(endpoint, opposite);
-      FragmentWritableBatch writableBatch = new FragmentWritableBatch(true,
+      FragmentWritableBatch writableBatch = FragmentWritableBatch.getEmptyLastWithSchema(
               handle.getQueryId(),
               handle.getMajorFragmentId(),
               handle.getMinorFragmentId(),
               operator.getOppositeMajorFragmentId(),
               fieldId,
-              WritableBatch.getBatchNoHVWrap(0, container, false));
+              incoming.getSchema());
       stats.startWait();
       try {
         tunnel.sendRecordBatch(statusHandler, writableBatch);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 5ee01f1..e6ddf90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -51,6 +51,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.Project;
 import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index bf9db9a..ba200f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -69,11 +69,12 @@ public class SortRecordBatchBuilder {
    */
   public boolean add(VectorAccessible batch){
     if(batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE) throw new UnsupportedOperationException("A sort cannot currently work against a sv4 batch.");
-    if (batch.getRecordCount() == 0)
+    if (batch.getRecordCount() == 0 && batches.size() > 0) {
       return true; // skip over empty record batches.
+    }
 
     long batchBytes = getSize(batch);
-    if (batchBytes == 0) {
+    if (batchBytes == 0 && batches.size() > 0) {
       return true;
     }
     if(batchBytes + runningBytes > maxBytes) return false; // enough data memory.
@@ -81,7 +82,6 @@ public class SortRecordBatchBuilder {
     if(!svAllocator.preAllocate(batch.getRecordCount()*4)) return false;  // sv allocation available.
 
 
-    if (batch.getRecordCount() == 0) return true;
     RecordBatchData bd = new RecordBatchData(batch);
     runningBytes += batchBytes;
     batches.put(batch.getSchema(), bd);
@@ -91,7 +91,7 @@ public class SortRecordBatchBuilder {
 
   public boolean add(RecordBatchData rbd) {
     long batchBytes = getSize(rbd.getContainer());
-    if (batchBytes == 0) {
+    if (batchBytes == 0 && batches.size() > 0) {
       return true;
     }
     if(batchBytes + runningBytes > maxBytes) {
@@ -105,7 +105,7 @@ public class SortRecordBatchBuilder {
     }
 
 
-    if (rbd.getRecordCount() == 0) return true;
+    if (rbd.getRecordCount() == 0 && batches.size() > 0) return true;
     runningBytes += batchBytes;
     batches.put(rbd.getContainer().getSchema(), rbd);
     recordCount += rbd.getRecordCount();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 19f6497..ee8f37a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -35,6 +35,7 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
 
   private IterOutcome state = IterOutcome.NOT_YET;
   private final RecordBatch incoming;
+  private boolean first = true;
 
   public IteratorValidatorBatchIterator(RecordBatch incoming) {
     this.incoming = incoming;
@@ -67,7 +68,6 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
 
   @Override
   public BatchSchema getSchema() {
-    validateReadState();
     return incoming.getSchema();
   }
 
@@ -110,6 +110,10 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
   public IterOutcome next() {
     if(state == IterOutcome.NONE ) throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again.");
     state = incoming.next();
+    if (first && state == IterOutcome.NONE) {
+      throw new IllegalStateException("The incoming iterator returned a state of NONE on the first batch. There should always be at least one batch output before returning NONE");
+    }
+    if (first) first = !first;
 
     if(state == IterOutcome.OK || state == IterOutcome.OK_NEW_SCHEMA) {
       BatchSchema schema = incoming.getSchema();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 02b9ba0..237a631 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -102,6 +102,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private int spillCount = 0;
   private int batchesSinceLastSpill = 0;
   private long uid;//used for spill files to ensure multiple sorts within same fragment don't clobber each others' files
+  private boolean useIncomingSchema = false;
+  private boolean first = true;
 
   public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context);
@@ -144,6 +146,17 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     return this.sv4;
   }
 
+  @Override
+  public BatchSchema getSchema() {
+    if (useIncomingSchema) {
+      List<MaterializedField> fields = Lists.newArrayList();
+      for (MaterializedField field : incoming.getSchema()) {
+        fields.add(field);
+      }
+      return BatchSchema.newBuilder().addFields(fields).setSelectionVectorMode(SelectionVectorMode.FOUR_BYTE).build();
+    }
+    return super.getSchema();
+  }
 
 
   @Override
@@ -205,6 +218,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 //        logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS));
         switch (upstream) {
         case NONE:
+          assert !first;
           break outer;
         case NOT_YET:
           throw new UnsupportedOperationException();
@@ -212,6 +226,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           return upstream;
         case OK_NEW_SCHEMA:
           // only change in the case that the schema truly changes.  Artificial schema changes are ignored.
+          first = false;
           if(!incoming.getSchema().equals(schema)){
             if (schema != null) throw new UnsupportedOperationException("Sort doesn't currently support sorts with changing schemas.");
             this.schema = incoming.getSchema();
@@ -220,6 +235,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           // fall through.
         case OK:
           SelectionVector2 sv2;
+//          if (incoming.getRecordCount() == 0) {
+//            break outer;
+//          }
           if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
             sv2 = incoming.getSelectionVector2();
           } else {
@@ -231,9 +249,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
           }
           int count = sv2.getCount();
           totalcount += count;
-          if (count == 0) {
-            break outer;
-          }
+//          if (count == 0) {
+//            break outer;
+//          }
           sorter.setup(context, sv2, incoming);
           Stopwatch w = new Stopwatch();
           w.start();
@@ -261,18 +279,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         }
       }
 
-      if (schema == null || totalcount == 0){
-        // builder may be null at this point if the first incoming batch is empty
-        return IterOutcome.NONE;
-      }
+//      if (schema == null || totalcount == 0){
+//        builder may be null at this point if the first incoming batch is empty
+//        useIncomingSchema = true;
+//        return IterOutcome.NONE;
+//      }
 
       if (spillCount == 0) {
         Stopwatch watch = new Stopwatch();
         watch.start();
-        if (schema == null){
+//        if (schema == null){
           // builder may be null at this point if the first incoming batch is empty
-          return IterOutcome.NONE;
-        }
+//          useIncomingSchema = true;
+//          return IterOutcome.NONE;
+//        }
 
         builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index 0ba84f9..3cb7641 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -48,7 +48,9 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
     QuickSort qs = new QuickSort();
     Stopwatch watch = new Stopwatch();
     watch.start();
-    qs.sort(this, 0, vector2.getCount());
+    if (vector2.getCount() > 0) {
+      qs.sort(this, 0, vector2.getCount());
+    }
     logger.debug("Took {} us to sort {} records", watch.elapsed(TimeUnit.MICROSECONDS), vector2.getCount());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 9473945..721755d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -43,7 +43,6 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
   @Override
   public IterOutcome innerNext() {
     IterOutcome upstream = next(incoming);
-    if(first && upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA;
     if (!first && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
       do {
         for (VectorWrapper w : incoming) {
@@ -51,15 +50,17 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
         }
       } while ((upstream = next(incoming)) == IterOutcome.OK && incoming.getRecordCount() == 0);
     }
-    first = false;
+    if(first && upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA;
     switch(upstream){
     case NONE:
+      assert !first;
     case NOT_YET:
     case STOP:
       return upstream;
     case OUT_OF_MEMORY:
       return upstream;
     case OK_NEW_SCHEMA:
+      first = false;
       try{
         stats.startSetup();
         setupNewSchema();
@@ -73,6 +74,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
       }
       // fall through.
     case OK:
+      assert !first : "First batch should be OK_NEW_SCHEMA";
       doWork();
       if (outOfMemory) {
         outOfMemory = false;
@@ -91,6 +93,11 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
     incoming.cleanup();
   }
 
+  @Override
+  public BatchSchema getSchema() {
+    return container.getSchema();
+  }
+
   protected abstract void setupNewSchema() throws SchemaChangeException;
   protected abstract void doWork();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index b4da6e0..5af3fb8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -17,6 +17,9 @@
  */
 package org.apache.drill.exec.record;
 
+
+import com.google.common.collect.Lists;
+
 import java.util.Iterator;
 import java.util.List;
 
@@ -54,6 +57,12 @@ public class BatchSchema implements Iterable<MaterializedField> {
     return selectionVectorMode;
   }
 
+  public BatchSchema clone() {
+    List<MaterializedField> newFields = Lists.newArrayList();
+    newFields.addAll(fields);
+    return new BatchSchema(selectionVectorMode, newFields);
+  }
+
   @Override
   public String toString() {
     return "BatchSchema [fields=" + fields + ", selectionVector=" + selectionVectorMode + "]";

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index 33fd5a2..ef7b5f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -17,12 +17,17 @@
  */
 package org.apache.drill.exec.record;
 
+import com.google.common.collect.Lists;
 import io.netty.buffer.ByteBuf;
 
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDefOrBuilder;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+
+import java.util.List;
 
 public class FragmentWritableBatch{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWritableBatch.class);
@@ -59,6 +64,17 @@ public class FragmentWritableBatch{
     return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentId, EMPTY_DEF);
   }
 
+  public static FragmentWritableBatch getEmptyLastWithSchema(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId,
+                                                             int receiveMajorFragmentId, int receiveMinorFragmentId, BatchSchema schema){
+
+    List<SerializedField> fields = Lists.newArrayList();
+    for (MaterializedField field : schema) {
+      fields.add(field.getAsBuilder().build());
+    }
+    RecordBatchDef def = RecordBatchDef.newBuilder().addAllField(fields).build();
+    return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentId, def);
+  }
+
   public ByteBuf[] getBuffers(){
     return buffers;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index cac610c..33bcb3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import javax.jdo.metadata.FieldMetadata;
 
+import io.netty.buffer.EmptyByteBuf;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
@@ -85,7 +86,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
         v = TypeHelper.getNewVector(fieldDef, allocator);
       }
       if (fmd.getValueCount() == 0){
-        v.clear();
+//        v.clear();
+        v.load(fmd, new EmptyByteBuf(allocator.getUnderlyingAllocator()));
       } else {
         v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
index c954354..f405585 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaBuilder.java
@@ -21,6 +21,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.proto.UserBitShared.SerializedField;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 
 import com.google.common.collect.Lists;
@@ -64,6 +65,13 @@ public class SchemaBuilder {
     }
     return this;
   }
+
+  public SchemaBuilder addSerializedFields(Iterable<SerializedField> fields) {
+    for (SerializedField f : fields) {
+      addField(MaterializedField.create(f));
+    }
+    return this;
+  }
   
 //  private void setTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass)
 //      throws SchemaChangeException {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 1d6ca33..308db3d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -23,6 +23,11 @@ import org.junit.Test;
 
 public class TestExampleQueries extends BaseTestQuery{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestExampleQueries.class);
+
+  @Test
+  public void testQ() throws Exception {
+    test("select * from cp.`customer.json` where 0 = 1");
+  }
   
   @Test // see DRILL-553
   public void testQueryWithNullValues() throws Exception {
@@ -102,6 +107,7 @@ public class TestExampleQueries extends BaseTestQuery{
 
   @Test
   public void testJoin() throws Exception{
+    test("alter session set `planner.enable_hashjoin` = false");
     test("SELECT\n" +
         "  nations.N_NAME,\n" +
         "  regions.R_NAME\n" +
@@ -109,7 +115,7 @@ public class TestExampleQueries extends BaseTestQuery{
         "  dfs.`[WORKING_PATH]/../../sample-data/nation.parquet` nations\n" +
         "JOIN\n" +
         "  dfs.`[WORKING_PATH]/../../sample-data/region.parquet` regions\n" +
-        "  on nations.N_REGIONKEY = regions.R_REGIONKEY");
+        "  on nations.N_REGIONKEY = regions.R_REGIONKEY where 1 = 0");
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/mergerecv/empty_batch.json b/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
index 361c4af..55b3f7d 100644
--- a/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
+++ b/exec/java-exec/src/test/resources/mergerecv/empty_batch.json
@@ -27,7 +27,7 @@
     {
       @id: 2,
       child: 1,
-      pop: "sort",
+      pop: "external-sort",
       orderings: [ {expr: "blue", order:"DESC"},  {expr: "red", order:"DESC"} ]
     },
     {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
index 1145b84..c2c9dd8 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java
@@ -20,16 +20,15 @@ package org.apache.drill.jdbc;
 import java.sql.SQLException;
 import java.util.Calendar;
 import java.util.List;
-import java.util.concurrent.LinkedBlockingDeque;
 
 import net.hydromatic.avatica.ColumnMetaData;
 import net.hydromatic.avatica.Cursor;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
-import org.apache.drill.jdbc.DrillResultSet.Listener;
 
 public class DrillCursor implements Cursor{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillCursor.class);
@@ -41,8 +40,10 @@ public class DrillCursor implements Cursor{
   private final RecordBatchLoader currentBatch;
   private final DrillResultSet.Listener listener;
   private boolean redoFirstNext = false;
+  private boolean first = true;
   
   private DrillColumnMetaDataList columnMetaDataList;
+  private BatchSchema schema;
   
   final DrillResultSet results;
   int currentRecord = 0;
@@ -82,11 +83,13 @@ public class DrillCursor implements Cursor{
       try {
         QueryResultBatch qrb = listener.getNext();
         recordBatchCount++;
-        while(qrb != null && qrb.getHeader().getRowCount() == 0 ){
+        while(qrb != null && qrb.getHeader().getRowCount() == 0 && !first){
           qrb.release();
           qrb = listener.getNext();
           recordBatchCount++;
         }
+
+        first = false;
         
         if(qrb == null){
           finished = true;
@@ -94,7 +97,11 @@ public class DrillCursor implements Cursor{
         }else{
           currentRecord = 0;
           boolean changed = currentBatch.load(qrb.getHeader().getDef(), qrb.getData());
+          schema = currentBatch.getSchema();
           if(changed) updateColumns();
+          if (redoFirstNext && currentBatch.getRecordCount() == 0) {
+            redoFirstNext = false;
+          }
           return true;
         }
       } catch (RpcException | InterruptedException | SchemaChangeException e) {
@@ -106,8 +113,8 @@ public class DrillCursor implements Cursor{
   
   void updateColumns(){
     accessors.generateAccessors(this, currentBatch);
-    columnMetaDataList.updateColumnMetaData(UNKNOWN, UNKNOWN, UNKNOWN, currentBatch.getSchema());
-    if(results.changeListener != null) results.changeListener.schemaChanged(currentBatch.getSchema());
+    columnMetaDataList.updateColumnMetaData(UNKNOWN, UNKNOWN, UNKNOWN, schema);
+    if(results.changeListener != null) results.changeListener.schemaChanged(schema);
   }
   
   public long getRecordBatchCount(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java
index 30a7144..93cfce3 100644
--- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java
+++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java
@@ -23,12 +23,15 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.Statement;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Lists;
 import org.apache.drill.common.util.TestTools;
 import org.apache.drill.exec.store.hive.HiveTestDataGenerator;
 import org.apache.drill.jdbc.Driver;
 import org.apache.drill.jdbc.JdbcTest;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -182,16 +185,17 @@ public class TestJdbcDistQuery extends JdbcTest{
         Statement s = c.createStatement();
         ResultSet r = s.executeQuery(sql);
         boolean first = true;
-        while (r.next()) {
-          ResultSetMetaData md = r.getMetaData();
-          if (first == true) {
-            for (int i = 1; i <= md.getColumnCount(); i++) {
-              System.out.print(md.getColumnName(i));
-              System.out.print('\t');
-            }
-            System.out.println();
-            first = false;
+        ResultSetMetaData md = r.getMetaData();
+        if (first == true) {
+          for (int i = 1; i <= md.getColumnCount(); i++) {
+            System.out.print(md.getColumnName(i));
+            System.out.print('\t');
           }
+          System.out.println();
+          first = false;
+        }
+        while(r.next()){
+          md = r.getMetaData();
 
           for (int i = 1; i <= md.getColumnCount(); i++) {
             System.out.print(r.getObject(i));
@@ -211,4 +215,24 @@ public class TestJdbcDistQuery extends JdbcTest{
 
 
   }
+
+  @Test
+  public void testSchemaForEmptyResultSet() throws Exception {
+    String query = "select fullname, occupation, postal_code from cp.`customer.json` where 0 = 1";
+    try (Connection c = DriverManager.getConnection("jdbc:drill:zk=local", null);) {
+      Statement s = c.createStatement();
+      ResultSet r = s.executeQuery(query);
+      ResultSetMetaData md = r.getMetaData();
+      List<String> columns = Lists.newArrayList();
+      for (int i = 1; i <= md.getColumnCount(); i++) {
+        System.out.print(md.getColumnName(i));
+        System.out.print('\t');
+        columns.add(md.getColumnName(i));
+      }
+      String[] expected = {"fullname", "occupation", "postal_code"};
+      Assert.assertEquals(3, md.getColumnCount());
+      Assert.assertArrayEquals(expected, columns.toArray());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/a314c824/exec/jdbc/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/jdbc/src/test/resources/bootstrap-storage-plugins.json b/exec/jdbc/src/test/resources/bootstrap-storage-plugins.json
index 3861317..53600f3 100644
--- a/exec/jdbc/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/jdbc/src/test/resources/bootstrap-storage-plugins.json
@@ -32,6 +32,9 @@
         },
         "parquet" : {
           type: "parquet"
+        },
+        "json" : {
+          type: "json"
         }
       }
     },