You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by so...@apache.org on 2018/10/10 20:47:41 UTC

[drill] 02/05: DRILL-6766: Lateral Unnest query : IllegalStateException - rowId in right batch of lateral is smaller than rowId in left batch being processed Note: Issue was in StreamingAgg where if output from one or multiple input batch was splitting into multiple output batch, then remaining input records were discarded after producing first output batch closes #1490

This is an automated email from the ASF dual-hosted git repository.

sorabh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit d5146c43986f09f132f4e96966082732a3740181
Author: Sorabh Hamirwasia <so...@apache.org>
AuthorDate: Mon Oct 1 14:15:33 2018 -0700

    DRILL-6766: Lateral Unnest query : IllegalStateException - rowId in right batch of lateral is smaller than rowId in left batch being processed
    Note: Issue was in StreamingAgg where if output from one or multiple input batch was splitting into multiple output batch, then remaining input
    records were discarded after producing first output batch
    closes #1490
---
 .../physical/impl/aggregate/StreamingAggBatch.java |  74 +++--
 .../impl/aggregate/StreamingAggTemplate.java       |  56 ++--
 .../impl/aggregate/StreamingAggregator.java        |  19 +-
 .../exec/physical/impl/join/LateralJoinBatch.java  |   4 +-
 .../exec/physical/impl/unnest/UnnestImpl.java      |  12 +-
 .../drill/exec/record/AbstractRecordBatch.java     |  10 +-
 .../impl/agg/TestStreamingAggEmitOutcome.java      | 315 ++++++++++++++++++++-
 7 files changed, 423 insertions(+), 67 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 2b9b317..ffcfa78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
@@ -71,6 +72,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP;
 
 public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class);
@@ -104,7 +106,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
                                  // call to inner next is made.
   private boolean sendEmit = false; // In the case where we see an OK_NEW_SCHEMA along with the end of a data set
                                     // we send out a batch with OK_NEW_SCHEMA first, then in the next iteration,
-                                    // we send out an emopty batch with EMIT.
+                                    // we send out an empty batch with EMIT.
   private IterOutcome lastKnownOutcome = OK; // keep track of the outcome from the previous call to incoming.next
   private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA
   private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set
@@ -127,7 +129,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   private boolean specialBatchSent = false;
   private static final int SPECIAL_BATCH_COUNT = 1;
 
-  public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
+  // TODO: Needs to adapt to batch sizing rather than hardcoded constant value
+  private int maxOutputRowCount = ValueVector.MAX_ROW_COUNT;
+
+  public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context)
+    throws OutOfMemoryException {
     super(popConfig, context);
     this.incoming = incoming;
 
@@ -189,7 +195,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
     // if a special batch has been sent, we have no data in the incoming so exit early
     if (done || specialBatchSent) {
-      assert (sendEmit != true); // if special batch sent with emit then flag will not be set
+      assert (!sendEmit); // if special batch sent with emit then flag will not be set
       return NONE;
     }
 
@@ -199,6 +205,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       first = false; // first is set only in the case when we see a NONE after an empty first (and only) batch
       sendEmit = false;
       firstBatchForDataSet = true;
+      firstBatchForSchema = false;
       recordCount = 0;
       specialBatchSent = false;
       return EMIT;
@@ -239,18 +246,17 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
             done = true;
             return IterOutcome.STOP;
           }
+          firstBatchForSchema = true;
           break;
         case EMIT:
           // if we get an EMIT with an empty batch as the first (and therefore only) batch
           // we have to do the special handling
           if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && incoming.getRecordCount() == 0) {
             constructSpecialBatch();
-            firstBatchForDataSet = true; // reset on the next iteration
             // If outcome is NONE then we send the special batch in the first iteration and the NONE
             // outcome in the next iteration. If outcome is EMIT, we can send the special
             // batch and the EMIT outcome at the same time. (unless the finalOutcome is OK_NEW_SCHEMA)
-            IterOutcome finalOutcome =  getFinalOutcome();
-            return finalOutcome;
+            return  getFinalOutcome();
           }
           // else fall thru
         case OK:
@@ -259,15 +265,18 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           throw new IllegalStateException(String.format("unknown outcome %s", lastKnownOutcome));
       }
     } else {
-      if ( lastKnownOutcome != NONE && firstBatchForDataSet && !aggregator.isDone()) {
+      // If this is not the first batch and previous batch is fully processed with no error condition or NONE is not
+      // seen then it will call next() on upstream to get new batch. Otherwise just process the previous incoming batch
+      if ( lastKnownOutcome != NONE && firstBatchForDataSet && !aggregator.isDone()
+        && aggregator.previousBatchProcessed()) {
         lastKnownOutcome = incoming.next();
         if (!first ) {
           //Setup needs to be called again. During setup, generated code saves a reference to the vectors
-          // pointed to by the incoming batch so that the dereferencing of the vector wrappers to get to
+          // pointed to by the incoming batch so that the de-referencing of the vector wrappers to get to
           // the vectors  does not have to be done at each call to eval. However, after an EMIT is seen,
           // the vectors are replaced and the reference to the old vectors is no longer valid
           try {
-            aggregator.setup(oContext, incoming, this);
+            aggregator.setup(oContext, incoming, this, maxOutputRowCount);
           } catch (SchemaChangeException e) {
             UserException.Builder exceptionBuilder = UserException.functionError(e)
                 .message("A Schema change exception occured in calling setup() in generated code.");
@@ -280,8 +289,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     recordCount = aggregator.getOutputCount();
     container.setRecordCount(recordCount);
     logger.debug("Aggregator response {}, records {}", aggOutcome, aggregator.getOutputCount());
-    // overwrite the outcome variable since we no longer need to remember the first batch outcome
-    lastKnownOutcome = aggregator.getOutcome();
+    // get the returned IterOutcome from aggregator and based on AggOutcome and returned IterOutcome update the
+    // lastKnownOutcome below. For example: if AggOutcome is RETURN_AND_RESET then lastKnownOutcome is always set to
+    // EMIT
+    IterOutcome returnOutcome = aggregator.getOutcome();
     switch (aggOutcome) {
       case CLEANUP_AND_RETURN:
         if (!first) {
@@ -289,7 +300,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
         }
         done = true;
         ExternalSortBatch.releaseBatches(incoming);
-        return lastKnownOutcome;
+        return returnOutcome;
       case RETURN_AND_RESET:
         //WE could have got a string of batches, all empty, until we hit an emit
         if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && recordCount == 0) {
@@ -298,28 +309,32 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           // If outcome is NONE then we send the special batch in the first iteration and the NONE
           // outcome in the next iteration. If outcome is EMIT, we can send the special
           // batch and the EMIT outcome at the same time.
-
-          IterOutcome finalOutcome =  getFinalOutcome();
-          return finalOutcome;
+          return getFinalOutcome();
         }
         firstBatchForDataSet = true;
         firstBatchForSchema = false;
         if(first) {
           first = false;
         }
-        if(lastKnownOutcome == OK_NEW_SCHEMA) {
-          sendEmit = true;
+        // Since AggOutcome is RETURN_AND_RESET and returned IterOutcome is OK_NEW_SCHEMA from Aggregator that means it
+        // has seen first batch with OK_NEW_SCHEMA and then last batch with EMIT outcome. In that case if all the input
+        // batch is processed to produce output batch it need to send and empty batch with EMIT outcome in subsequent
+        // next call.
+        if(returnOutcome == OK_NEW_SCHEMA) {
+          sendEmit = (aggregator == null) || aggregator.previousBatchProcessed();
         }
         // Release external sort batches after EMIT is seen
         ExternalSortBatch.releaseBatches(incoming);
-        return lastKnownOutcome;
+        lastKnownOutcome = EMIT;
+        return returnOutcome;
       case RETURN_OUTCOME:
         // In case of complex writer expression, vectors would be added to batch run-time.
         // We have to re-build the schema.
         if (complexWriters != null) {
           container.buildSchema(SelectionVectorMode.NONE);
         }
-        if (lastKnownOutcome == IterOutcome.NONE ) {
+        if (returnOutcome == IterOutcome.NONE ) {
+          lastKnownOutcome = NONE;
           // we will set the 'done' flag in the next call to innerNext and use the lastKnownOutcome
           // to determine whether we should set the flag or not.
           // This is so that if someone calls getRecordCount in between calls to innerNext, we will
@@ -330,11 +345,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           } else {
             return OK;
           }
-        } else if (lastKnownOutcome == OK && first) {
+        } else if (returnOutcome == OK && first) {
           lastKnownOutcome = OK_NEW_SCHEMA;
+          returnOutcome = OK_NEW_SCHEMA;
         }
         first = false;
-        return lastKnownOutcome;
+        return returnOutcome;
       case UPDATE_AGGREGATOR:
         // We could get this either between data sets or within a data set.
         // If the former, we can handle the change and so need to update the aggregator and
@@ -342,8 +358,9 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
         // and exception
         // This case is not tested since there are no unit tests for this and there is no support
         // from the sort operator for this case
-        if (lastKnownOutcome == EMIT) {
+        if (returnOutcome == EMIT) {
           createAggregator();
+          lastKnownOutcome = EMIT;
           return OK_NEW_SCHEMA;
         } else {
           context.getExecutorState().fail(UserException.unsupportedError().message(SchemaChangeException
@@ -351,6 +368,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
                   incoming.getSchema()).getMessage()).build(logger));
           close();
           killIncoming(false);
+          lastKnownOutcome = STOP;
           return IterOutcome.STOP;
         }
       default:
@@ -433,7 +451,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-    //  cg.getCodeGenerator().saveCodeForDebugging(true);
+    //cg.getCodeGenerator().saveCodeForDebugging(true);
     container.clear();
 
     LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()];
@@ -506,7 +524,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
     container.buildSchema(SelectionVectorMode.NONE);
     StreamingAggregator agg = context.getImplementationClass(cg);
-    agg.setup(oContext, incoming, this);
+    agg.setup(oContext, incoming, this, maxOutputRowCount);
     allocateComplexWriters();
     return agg;
   }
@@ -651,7 +669,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
   @Override
   public void dump() {
-    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]",
-        container, popConfig, aggregator, incomingSchema);
+    logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]", container, popConfig, aggregator, incomingSchema);
+  }
+
+  @VisibleForTesting
+  public void setMaxOutputRowCount(int maxOutputRowCount) {
+    this.maxOutputRowCount = maxOutputRowCount;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 4bde7ab..cc89f23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
 
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
@@ -33,7 +34,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA
 public abstract class StreamingAggTemplate implements StreamingAggregator {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class);
   private static final boolean EXTRA_DEBUG = false;
-  private static final int OUTPUT_BATCH_SIZE = 32*1024;
+  private int maxOutputRows = ValueVector.MAX_ROW_COUNT;
 
   // lastOutcome is set ONLY if the lastOutcome was NONE or STOP
   private IterOutcome lastOutcome = null;
@@ -54,7 +55,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   // (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if a vector then the record index itself)
   private int previousIndex = -1;  // the last index that has been processed. Initialized to -1 every time a new
                                    // aggregate group begins (including every time a new data set begins)
-  private int currentIndex; // current index being processed
+  private int currentIndex = Integer.MAX_VALUE; // current index being processed
   /**
    * Number of records added to the current aggregation group.
    */
@@ -72,10 +73,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
 
 
   @Override
-  public void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException {
+  public void setup(OperatorContext context, RecordBatch incoming,
+                    StreamingAggBatch outgoing, int outputRowCount) throws SchemaChangeException {
     this.context = context;
     this.incoming = incoming;
     this.outgoing = outgoing;
+    this.maxOutputRows = outputRowCount;
     setupInterior(incoming, outgoing);
   }
 
@@ -109,7 +112,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
       allocateOutgoing();
 
       if (firstBatchForDataSet) {
-        this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex);
+        this.currentIndex = incoming.getRecordCount() == 0 ? Integer.MAX_VALUE : this.getVectorIndex(underlyingIndex);
 
         if (outerOutcome == OK_NEW_SCHEMA) {
           firstBatchForSchema = true;
@@ -178,9 +181,10 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
         // loop through existing records, adding as necessary.
         if(!processRemainingRecordsInBatch()) {
           // output batch is full. Return.
-          return setOkAndReturn();
+          return setOkAndReturn(outerOutcome);
         }
-        // if the current batch came with an EMIT, we're done
+        // if the current batch came with an EMIT, we're done since if we are here it means output batch consumed all
+        // the rows in incoming batch
         if(outerOutcome == EMIT) {
           // output the last record
           outputToBatch(previousIndex);
@@ -215,14 +219,14 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                 done = true;
                 lastOutcome = out;
                 if (firstBatchForDataSet && addedRecordCount == 0) {
-                  return setOkAndReturn();
+                  return setOkAndReturn(NONE);
                 } else if (addedRecordCount > 0) {
                   outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
                   // (output container full or not) as we are not going to insert any more records.
                   if (EXTRA_DEBUG) {
                     logger.debug("Received no more batches, returning.");
                   }
-                  return setOkAndReturn();
+                  return setOkAndReturn(NONE);
                 } else {
                   // not first batch and record Count == 0
                   outcome = out;
@@ -237,6 +241,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                   }
                 } else {
                   resetIndex();
+                  currentIndex = this.getVectorIndex(underlyingIndex);
                   if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) {
                     if (EXTRA_DEBUG) {
                       logger.debug("New value was same as last value of previous batch, adding.");
@@ -256,13 +261,16 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                         if (EXTRA_DEBUG) {
                           logger.debug("Output container is full. flushing it.");
                         }
-                        return setOkAndReturnEmit();
+                        return setOkAndReturn(EMIT);
                       }
                     }
                     // important to set the previous index to -1 since we start a new group
                     previousIndex = -1;
                   }
-                  processRemainingRecordsInBatch();
+                  if (!processRemainingRecordsInBatch()) {
+                    // output batch is full. Return.
+                    return setOkAndReturn(EMIT);
+                  }
                   outputToBatch(previousIndex); // currentIndex has been reset to int_max so use previous index.
                 }
                 resetIndex();
@@ -285,7 +293,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                     logger.debug("Wrote out end of previous batch, returning.");
                   }
                   newSchema = true;
-                  return setOkAndReturn();
+                  return setOkAndReturn(OK_NEW_SCHEMA);
                 }
                 cleanup();
                 return AggOutcome.UPDATE_AGGREGATOR;
@@ -294,6 +302,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                 if (incoming.getRecordCount() == 0) {
                   continue;
                 } else {
+                  currentIndex = this.getVectorIndex(underlyingIndex);
                   if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) {
                     if (EXTRA_DEBUG) {
                       logger.debug("New value was same as last value of previous batch, adding.");
@@ -315,7 +324,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                           logger.debug("Output container is full. flushing it.");
                         }
                         previousIndex = -1;
-                        return setOkAndReturn();
+                        return setOkAndReturn(OK);
                       }
                     }
                     previousIndex = -1;
@@ -405,8 +414,8 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   }
 
   private final void resetIndex() {
-    underlyingIndex = -1;
-    incIndex();
+    underlyingIndex = 0;
+    currentIndex = Integer.MAX_VALUE;
   }
 
   /**
@@ -414,7 +423,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
    *
    * @return outcome
    */
-  private final AggOutcome setOkAndReturn() {
+  private final AggOutcome setOkAndReturn(IterOutcome seenOutcome) {
     IterOutcome outcomeToReturn;
     firstBatchForDataSet = false;
     if (firstBatchForSchema) {
@@ -428,7 +437,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(outputCount);
     }
-    return AggOutcome.RETURN_OUTCOME;
+    return (seenOutcome == EMIT) ? AggOutcome.RETURN_AND_RESET : AggOutcome.RETURN_OUTCOME;
   }
 
   /**
@@ -457,7 +466,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   // Returns output container status after insertion of the given record. Caller must check the return value if it
   // plans to insert more records into outgoing container.
   private final boolean outputToBatch(int inIndex) {
-    assert outputCount < OUTPUT_BATCH_SIZE:
+    assert outputCount < maxOutputRows :
         "Outgoing RecordBatch is not flushed. It reached its max capacity in the last update";
 
     outputRecordKeys(inIndex, outputCount);
@@ -470,14 +479,13 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     resetValues();
     outputCount++;
     addedRecordCount = 0;
-
-    return outputCount == OUTPUT_BATCH_SIZE;
+    return outputCount == maxOutputRows;
   }
 
   // Returns output container status after insertion of the given record. Caller must check the return value if it
   // plans to inserts more record into outgoing container.
   private final boolean outputToBatchPrev(InternalBatch b1, int inIndex, int outIndex) {
-    assert outputCount < OUTPUT_BATCH_SIZE:
+    assert outputCount < maxOutputRows :
         "Outgoing RecordBatch is not flushed. It reached its max capacity in the last update";
 
     outputRecordKeysPrev(b1, inIndex, outIndex);
@@ -485,8 +493,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     resetValues();
     outputCount++;
     addedRecordCount = 0;
-
-    return outputCount == OUTPUT_BATCH_SIZE;
+    return outputCount == maxOutputRows;
   }
 
   private void addRecordInc(int index) {
@@ -508,6 +515,11 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
         + "]";
   }
 
+  @Override
+  public boolean previousBatchProcessed() {
+    return (currentIndex == Integer.MAX_VALUE);
+  }
+
   public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
   public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
   public abstract boolean isSamePrev(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int b2Index);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 23fdcc1..57caa9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -25,7 +25,8 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 
 public interface StreamingAggregator {
 
-  public static TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class);
+  TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION =
+    new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class);
 
 
   /**
@@ -45,25 +46,27 @@ public interface StreamingAggregator {
    * <p>
    * @see org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome HashAggregator.AggOutcome
    */
-  public static enum AggOutcome {
+  enum AggOutcome {
     RETURN_OUTCOME,
     CLEANUP_AND_RETURN,
     UPDATE_AGGREGATOR,
     RETURN_AND_RESET;
   }
 
-  public abstract void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException;
+  void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing, int outputRowCount)
+    throws SchemaChangeException;
 
-  public abstract IterOutcome getOutcome();
+  IterOutcome getOutcome();
 
-  public abstract int getOutputCount();
+  int getOutputCount();
 
   // do the work. Also pass in the Iteroutcome of the batch already read in case it might be an EMIT. If the
   // outerOutcome is EMIT, we need to do the work without reading any more batches.
-  public abstract AggOutcome doWork(IterOutcome outerOutcome);
+  AggOutcome doWork(IterOutcome outerOutcome);
 
-  public abstract boolean isDone();
+  boolean isDone();
 
-  public abstract void cleanup();
+  void cleanup();
 
+  boolean previousBatchProcessed();
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 242687f..735f11f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -389,7 +389,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
   private boolean handleSchemaChange() {
     try {
       stats.startSetup();
-      logger.debug("Setting up new schema based on incoming batch. Old output schema: %s", container.getSchema());
+      logger.debug("Setting up new schema based on incoming batch. Old output schema: {}", container.getSchema());
       setupNewSchema();
       return true;
     } catch (SchemaChangeException ex) {
@@ -805,7 +805,7 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
    */
   private void setupNewSchema() throws SchemaChangeException {
 
-    logger.debug("Setting up new schema based on incoming batch. New left schema: %s and New right schema: %s",
+    logger.debug("Setting up new schema based on incoming batch. New left schema: {} and New right schema: {}",
       left.getSchema(), right.getSchema());
 
     // Clear up the container
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
index 508999f..a9c9598 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unnest/UnnestImpl.java
@@ -118,6 +118,7 @@ public class UnnestImpl implements Unnest {
     Preconditions.checkArgument(svMode == NONE, "Unnest does not support selection vector inputs.");
 
     final int initialInnerValueIndex = runningInnerValueIndex;
+    int nonEmptyArray = 0;
 
     outer:
     {
@@ -126,8 +127,12 @@ public class UnnestImpl implements Unnest {
 
       for (; valueIndex < valueCount; valueIndex++) {
         final int innerValueCount = accessor.getInnerValueCountAt(valueIndex);
-        logger.debug("Unnest: currentRecord: {}, innerValueCount: {}, record count: {}, output limit: {}",
-            innerValueCount, recordCount, outputLimit);
+        logger.trace("Unnest: CurrentRowId: {}, innerValueCount: {}, outputIndex: {},  output limit: {}",
+            valueIndex, innerValueCount, outputIndex, outputLimit);
+
+        if (innerValueCount > 0) {
+          ++nonEmptyArray;
+        }
 
         for (; innerValueIndex < innerValueCount; innerValueIndex++) {
           // If we've hit the batch size limit, stop and flush what we've got so far.
@@ -148,6 +153,9 @@ public class UnnestImpl implements Unnest {
       }  // forevery value in the array
     }  // for every incoming record
     final int delta = runningInnerValueIndex - initialInnerValueIndex;
+    logger.debug("Unnest: Finished processing current batch. [Details: LastProcessedRowIndex: {}, " +
+      "RowsWithNonEmptyArrays: {}, outputIndex: {}, outputLimit: {}, TotalIncomingRecords: {}]",
+      valueIndex, nonEmptyArray, delta, outputLimit, accessor.getValueCount());
     final SchemaChangeCallBack callBack = new SchemaChangeCallBack();
     for (TransferPair t : transfers) {
       t.splitAndTransfer(initialInnerValueIndex, delta);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 362ea29..eb6112d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -135,13 +135,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
       return next;
     }
 
-    switch(next) {
+    boolean isNewSchema = false;
+    logger.debug("Received next batch for index: {} with outcome: {}", inputIndex, next);
+    switch (next) {
       case OK_NEW_SCHEMA:
-        stats.batchReceived(inputIndex, b.getRecordCount(), true);
-        break;
+        isNewSchema = true;
       case OK:
       case EMIT:
-        stats.batchReceived(inputIndex, b.getRecordCount(), false);
+        stats.batchReceived(inputIndex, b.getRecordCount(), isNewSchema);
+        logger.debug("Number of records in received batch: {}", b.getRecordCount());
         break;
       default:
         break;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
index cead984..37a44ea 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -527,6 +527,232 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
     nonEmptyInputRowSet2.clear();
   }
 
+  /**
+   * Verifies scenario where multiple incoming batches received with OK_NEW_SCHEMA, OK, OK, EMIT whose output is split
+   * into multiple output batches is handled correctly such that first output is produced with OK_NEW_SCHEMA and then
+   * followed by EMIT outcome
+   */
+  @Test
+  public void t8_1_testStreamingAggr_InputSplitToMultipleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 20, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 30, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .addRow(2, 50, "item2")
+      .addRow(2, 60, "item2")
+      .addRow(2, 70, "item2")
+      .addRow(3, 100, "item3")
+      .addRow(3, 200, "item3")
+      .addRow(3, 300, "item3")
+      .addRow(3, 400, "item3")
+      .build();
+
+    TupleMetadata resultSchema2 = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR)
+      .add("id", TypeProtos.MinorType.INT)
+      .add("total_count", TypeProtos.MinorType.BIGINT)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item1", 1, (long)2)
+      .addRow("item2", 2, (long)5)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item3", 3, (long)4)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name", "id_left", "id"),
+      parseExprs("count(cost_left)", "total_count"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+    strAggBatch.setMaxOutputRowCount(2);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // Expect OK_NEW_SCHEMA first for all the input batch from second batch onwards since output batch is full after
+    // producing 2 groups as output
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    // The last group was produced in different output batch with EMIT outcome
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    nonEmptyInputRowSet4.clear();
+
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+  }
+
+  /**
+   * Verifies scenario where multiple incoming batches received with OK_NEW_SCHEMA, OK, OK, EMIT whose output is split
+   * into multiple output batches and incoming batches received with OK,OK,EMIT whose output is also split across
+   * multiple output batches is handled correctly.
+   */
+  @Test
+  public void t8_2_testStreamingAggr_Inputs_OK_EMIT_SplitToMultipleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 20, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 30, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .addRow(2, 50, "item2")
+      .addRow(2, 60, "item2")
+      .addRow(2, 70, "item2")
+      .addRow(3, 100, "item3")
+      .addRow(3, 200, "item3")
+      .addRow(3, 300, "item3")
+      .addRow(3, 400, "item3")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet5 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet6 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 50, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet7 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(3, 130, "item3")
+      .addRow(3, 130, "item3")
+      .addRow(4, 140, "item4")
+      .addRow(4, 140, "item4")
+      .build();
+
+    TupleMetadata resultSchema2 = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR)
+      .add("id", TypeProtos.MinorType.INT)
+      .add("total_count", TypeProtos.MinorType.BIGINT)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item1", 1, (long)2)
+      .addRow("item2", 2, (long)5)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item3", 3, (long)4)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet3 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item2", 2, (long)2)
+      .addRow("item3", 3, (long)2)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet4 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item4", 4, (long)2)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+    inputContainer.add(nonEmptyInputRowSet5.container());
+    inputContainer.add(nonEmptyInputRowSet6.container());
+    inputContainer.add(nonEmptyInputRowSet7.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name", "id_left", "id"),
+      parseExprs("count(cost_left)", "total_count"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+    strAggBatch.setMaxOutputRowCount(2);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    // Output batches for input batch 2 to 5
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    // Output batches for input batch 6 to 8
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    // output batch is full after producing 2 rows
+    assertEquals(2, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet3).verify(actualRowSet);
+
+    // output batch with pending rows
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet4).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    nonEmptyInputRowSet4.clear();
+    nonEmptyInputRowSet5.clear();
+    nonEmptyInputRowSet6.clear();
+    nonEmptyInputRowSet7.clear();
+
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+    expectedRowSet3.clear();
+    expectedRowSet4.clear();
+  }
 
   /*****************************************************************************************
    Tests for validating regular StreamingAggr behavior with no EMIT outcome
@@ -620,6 +846,88 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
   }
 
+  @Test
+  public void t10_1_testStreamingAggr_InputSplitToMultipleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(1, 20, "item1")
+      .build();
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 30, "item2")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet4 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 40, "item2")
+      .addRow(2, 50, "item2")
+      .addRow(2, 60, "item2")
+      .addRow(2, 70, "item2")
+      .addRow(3, 100, "item3")
+      .addRow(3, 200, "item3")
+      .addRow(3, 300, "item3")
+      .addRow(3, 400, "item3")
+      .build();
+
+    TupleMetadata resultSchema2 = new SchemaBuilder()
+      .add("name", TypeProtos.MinorType.VARCHAR)
+      .add("id", TypeProtos.MinorType.INT)
+      .add("total_count", TypeProtos.MinorType.BIGINT)
+      .buildSchema();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item1", 1, (long)2)
+      .addRow("item2", 2, (long)5)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema2)
+      .addRow("item3", 3, (long)4)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(nonEmptyInputRowSet4.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name", "id_left", "id"),
+      parseExprs("count(cost_left)", "total_count"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+    strAggBatch.setMaxOutputRowCount(2);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(1, strAggBatch.getRecordCount());
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    nonEmptyInputRowSet4.clear();
+
+    expectedRowSet1.clear();
+    expectedRowSet2.clear();
+  }
+
   /*******************************************************
    * Tests for EMIT with empty batches and no group by
    * (Tests t1-t8 are repeated with no group by)
@@ -813,14 +1121,15 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
 
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount()); // special batch
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
     assertEquals(0, strAggBatch.getRecordCount());
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
-    assertEquals(1, strAggBatch.getRecordCount());
+    assertEquals(1, strAggBatch.getRecordCount()); // special batch
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
-    assertEquals(1, strAggBatch.getRecordCount());
+    assertEquals(1, strAggBatch.getRecordCount()); // special batch
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
-    assertEquals(1, strAggBatch.getRecordCount());
+    assertEquals(1, strAggBatch.getRecordCount()); // data batch
 
     RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
     new RowSetComparison(expectedRowSet).verify(actualRowSet);