You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2018/07/12 01:43:44 UTC

[drill] branch master updated (b0314a3 -> a77fd14)

This is an automated email from the ASF dual-hosted git repository.

parthc pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from b0314a3  DRILL-6583: Add space between pagination links in Profiles (WebUI) list
     new 7655ec4  DRILL-6516: Fix memory leak issue with Sort and StreamingAgg together
     new a77fd14  DRILL-6516: EMIT support in streaming agg

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../physical/impl/aggregate/StreamingAggBatch.java | 274 ++++++---
 .../impl/aggregate/StreamingAggTemplate.java       | 425 +++++++++-----
 .../impl/aggregate/StreamingAggregator.java        |  30 +-
 .../impl/xsort/managed/ExternalSortBatch.java      |  54 +-
 .../impl/agg/TestStreamingAggEmitOutcome.java      | 614 +++++++++++++++++++++
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |  63 +++
 6 files changed, 1226 insertions(+), 234 deletions(-)
 create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java


[drill] 02/02: DRILL-6516: EMIT support in streaming agg

Posted by pa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit a77fd142d86dd5648cda8866b8ff3af39c7b6b11
Author: Parth Chandra <pa...@apache.org>
AuthorDate: Mon Jun 18 21:34:20 2018 -0700

    DRILL-6516: EMIT support in streaming agg
    
    This closes #1358
---
 .../physical/impl/aggregate/StreamingAggBatch.java | 274 ++++++---
 .../impl/aggregate/StreamingAggTemplate.java       | 425 +++++++++-----
 .../impl/aggregate/StreamingAggregator.java        |  30 +-
 .../impl/agg/TestStreamingAggEmitOutcome.java      | 614 +++++++++++++++++++++
 .../impl/lateraljoin/TestE2EUnnestAndLateral.java  |  63 +++
 5 files changed, 1198 insertions(+), 208 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index caeed50..882c36d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -67,15 +67,49 @@ import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JVar;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
 public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class);
 
   private StreamingAggregator aggregator;
   private final RecordBatch incoming;
   private List<BaseWriter.ComplexWriter> complexWriters;
-  private boolean done = false;
-  private boolean first = true;
-  private int recordCount = 0;
+  //
+  // Streaming agg can be in (a) a normal pipeline or (b) it may be in a pipeline that is part of a subquery involving
+  // lateral and unnest. In case(a), the aggregator proceeds normally until it sees a group change or a NONE. If a
+  // group has changed, the aggregated data is sent downstream and the aggregation continues with the next group. If
+  // a NONE is seen, the aggregator completes, sends data downstream and cleans up.
+  // In case (b), the aggregator behaves similar to case(a) if a group change or NONE is observed. However it will
+  // also encounter a new state EMIT, every time unnest processes a new row. In this case the aggregator must complete the
+  // aggregation, send out the results, AND reset to receive more data. To make the treatment of these two cases
+  // similar, we define the aggregation operation in terms of data sets.
+  //   Data Set = The set of data that the aggregator is currently aggregating. In a normal query, the entire data is
+  //   a single data set. In the case of a Lateral subquery, every row processed by unnest is a data set.  There can,
+  //   therefore, be one or more data sets in an aggregation.
+  //   Data Sets may have multiple batches and may contain one or more empty batches. A data set may consist entirely
+  //   of empty batches.
+  //   Schema may change across Data Sets.
+  //   A corner case is the case of a Data Set having many empty batches in the beginning. Such a data set may see a
+  //   schema change once the first non-empty batch is received.
+  //   Schema change within a Data Set is not supported.
+  //
+  //   We will define some states for internal management
+  //
+  private boolean done = false;  // END of all data
+  private boolean first = true;  // Beginning of new data set. True during the build schema phase. False once the first
+                                 // call to inner next is made.
+  private boolean sendEmit = false; // In the case where we see an OK_NEW_SCHEMA along with the end of a data set
+                                    // we send out a batch with OK_NEW_SCHEMA first, then in the next iteration,
+                                    // we send out an emopty batch with EMIT.
+  private IterOutcome lastKnownOutcome = OK; // keep track of the outcome from the previous call to incoming.next
+  private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA
+  private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set
+  private int recordCount = 0;  // number of records output in the current data set
+
   private BatchSchema incomingSchema;
 
   /*
@@ -154,83 +188,174 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   public IterOutcome innerNext() {
 
     // if a special batch has been sent, we have no data in the incoming so exit early
-    if (specialBatchSent) {
-      return IterOutcome.NONE;
+    if ( done || specialBatchSent) {
+      return NONE;
+    }
+
+    // We sent an OK_NEW_SCHEMA and also encountered the end of a data set. So we need to send
+    // an EMIT with an empty batch now
+    if (sendEmit) {
+      sendEmit = false;
+      firstBatchForDataSet = true;
+      recordCount = 0;
+      return EMIT;
     }
 
     // this is only called on the first batch. Beyond this, the aggregator manages batches.
     if (aggregator == null || first) {
-      IterOutcome outcome;
       if (first && incoming.getRecordCount() > 0) {
         first = false;
-        outcome = IterOutcome.OK_NEW_SCHEMA;
+        lastKnownOutcome = OK_NEW_SCHEMA;
       } else {
-        outcome = next(incoming);
+        lastKnownOutcome = next(incoming);
       }
-      logger.debug("Next outcome of {}", outcome);
-      switch (outcome) {
-      case NONE:
-        if (first && popConfig.getKeys().size() == 0) {
+      logger.debug("Next outcome of {}", lastKnownOutcome);
+      switch (lastKnownOutcome) {
+        case NONE:
+          if (firstBatchForDataSet && popConfig.getKeys().size() == 0) {
+            // if we have a straight aggregate and empty input batch, we need to handle it in a different way
+            constructSpecialBatch();
+            // set state to indicate the fact that we have sent a special batch and input is empty
+            specialBatchSent = true;
+            // If outcome is NONE then we send the special batch in the first iteration and the NONE
+            // outcome in the next iteration. If outcome is EMIT, we can send the special
+            // batch and the EMIT outcome at the same time.
+            return getFinalOutcome();
+          }
+          // else fall thru
+        case OUT_OF_MEMORY:
+        case NOT_YET:
+        case STOP:
+          return lastKnownOutcome;
+        case OK_NEW_SCHEMA:
+          if (!createAggregator()) {
+            done = true;
+            return IterOutcome.STOP;
+          }
+          break;
+        case EMIT:
+          // if we get an EMIT with an empty batch as the first (and therefore only) batch
+          // we have to do the special handling
+          if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && incoming.getRecordCount() == 0) {
+            constructSpecialBatch();
+            // set state to indicate the fact that we have sent a special batch and input is empty
+            specialBatchSent = true;
+            firstBatchForDataSet = true; // reset on the next iteration
+            // If outcome is NONE then we send the special batch in the first iteration and the NONE
+            // outcome in the next iteration. If outcome is EMIT, we can send the special
+            // batch and the EMIT outcome at the same time.
+            return getFinalOutcome();
+          }
+          // else fall thru
+        case OK:
+          break;
+        default:
+          throw new IllegalStateException(String.format("unknown outcome %s", lastKnownOutcome));
+      }
+    } else {
+      if ( lastKnownOutcome != NONE && firstBatchForDataSet && !aggregator.isDone()) {
+        lastKnownOutcome = incoming.next();
+        if (!first ) {
+          //Setup needs to be called again. During setup, generated code saves a reference to the vectors
+          // pointed to by the incoming batch so that the dereferencing of the vector wrappers to get to
+          // the vectors  does not have to be done at each call to eval. However, after an EMIT is seen,
+          // the vectors are replaced and the reference to the old vectors is no longer valid
+          try {
+            aggregator.setup(oContext, incoming, this);
+          } catch (SchemaChangeException e) {
+            UserException.Builder exceptionBuilder = UserException.functionError(e)
+                .message("A Schema change exception occured in calling setup() in generated code.");
+            throw exceptionBuilder.build(logger);
+          }
+        }
+      }
+      // We sent an EMIT in the previous iteration, so we must be starting a new data set
+      if (firstBatchForDataSet) {
+        done = false;
+        sendEmit = false;
+        specialBatchSent = false;
+        firstBatchForDataSet = false;
+      }
+    }
+    AggOutcome aggOutcome = aggregator.doWork(lastKnownOutcome);
+    recordCount = aggregator.getOutputCount();
+    container.setRecordCount(recordCount);
+    logger.debug("Aggregator response {}, records {}", aggOutcome, aggregator.getOutputCount());
+    // overwrite the outcome variable since we no longer need to remember the first batch outcome
+    lastKnownOutcome = aggregator.getOutcome();
+    switch (aggOutcome) {
+      case CLEANUP_AND_RETURN:
+        if (!first) {
+          container.zeroVectors();
+        }
+        done = true;
+        ExternalSortBatch.releaseBatches(incoming);
+        return lastKnownOutcome;
+      case RETURN_AND_RESET:
+        //WE could have got a string of batches, all empty, until we hit an emit
+        if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && recordCount == 0) {
           // if we have a straight aggregate and empty input batch, we need to handle it in a different way
           constructSpecialBatch();
-          first = false;
           // set state to indicate the fact that we have sent a special batch and input is empty
           specialBatchSent = true;
-          return IterOutcome.OK;
+          // If outcome is NONE then we send the special batch in the first iteration and the NONE
+          // outcome in the next iteration. If outcome is EMIT, we can send the special
+          // batch and the EMIT outcome at the same time.
+          return getFinalOutcome();
         }
-      case OUT_OF_MEMORY:
-      case NOT_YET:
-      case STOP:
-        return outcome;
-      case OK_NEW_SCHEMA:
-        if (!createAggregator()) {
-          done = true;
+        firstBatchForDataSet = true;
+        if(first) {
+          first = false;
+        }
+        if(lastKnownOutcome == OK_NEW_SCHEMA) {
+          sendEmit = true;
+        }
+        // Release external sort batches after EMIT is seen
+        ExternalSortBatch.releaseBatches(incoming);
+        return lastKnownOutcome;
+      case RETURN_OUTCOME:
+        // In case of complex writer expression, vectors would be added to batch run-time.
+        // We have to re-build the schema.
+        if (complexWriters != null) {
+          container.buildSchema(SelectionVectorMode.NONE);
+        }
+        if (lastKnownOutcome == IterOutcome.NONE ) {
+          // we will set the 'done' flag in the next call to innerNext and use the lastKnownOutcome
+          // to determine whether we should set the flag or not.
+          // This is so that if someone calls getRecordCount in between calls to innerNext, we will
+          // return the correct record count (if the done flag is set, we will return 0).
+          if (first) {
+            first = false;
+            return OK_NEW_SCHEMA;
+          } else {
+            return OK;
+          }
+        } else if (lastKnownOutcome == OK && first) {
+          lastKnownOutcome = OK_NEW_SCHEMA;
+        } else if (lastKnownOutcome != IterOutcome.OUT_OF_MEMORY) {
+          first = false;
+        }
+        return lastKnownOutcome;
+      case UPDATE_AGGREGATOR:
+        // We could get this either between data sets or within a data set.
+        // If the former, we can handle the change and so need to update the aggregator and
+        // continue. If the latter, we cannot (currently) handle the schema change, so throw
+        // and exception
+        // This case is not tested since there are no unit tests for this and there is no support
+        // from the sort operator for this case
+        if (lastKnownOutcome == EMIT) {
+          createAggregator();
+          return OK_NEW_SCHEMA;
+        } else {
+          context.getExecutorState().fail(UserException.unsupportedError().message(SchemaChangeException
+              .schemaChanged("Streaming aggregate does not support schema changes", incomingSchema,
+                  incoming.getSchema()).getMessage()).build(logger));
+          close();
+          killIncoming(false);
           return IterOutcome.STOP;
         }
-        break;
-      case OK:
-        break;
       default:
-        throw new IllegalStateException(String.format("unknown outcome %s", outcome));
-      }
-    }
-    AggOutcome out = aggregator.doWork();
-    recordCount = aggregator.getOutputCount();
-    logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
-    switch (out) {
-    case CLEANUP_AND_RETURN:
-      if (!first) {
-        container.zeroVectors();
-      }
-      done = true;
-      ExternalSortBatch.releaseBatches(incoming);
-      // fall through
-    case RETURN_OUTCOME:
-      IterOutcome outcome = aggregator.getOutcome();
-      // In case of complex writer expression, vectors would be added to batch run-time.
-      // We have to re-build the schema.
-      if (complexWriters != null) {
-        container.buildSchema(SelectionVectorMode.NONE);
-      }
-      if (outcome == IterOutcome.NONE && first) {
-        first = false;
-        done = true;
-        return IterOutcome.OK_NEW_SCHEMA;
-      } else if (outcome == IterOutcome.OK && first) {
-        outcome = IterOutcome.OK_NEW_SCHEMA;
-      } else if (outcome != IterOutcome.OUT_OF_MEMORY) {
-        first = false;
-      }
-      return outcome;
-    case UPDATE_AGGREGATOR:
-      context.getExecutorState().fail(UserException.unsupportedError()
-        .message(SchemaChangeException.schemaChanged("Streaming aggregate does not support schema changes", incomingSchema, incoming.getSchema()).getMessage())
-        .build(logger));
-      close();
-      killIncoming(false);
-      return IterOutcome.STOP;
-    default:
-      throw new IllegalStateException(String.format("Unknown state %s.", out));
+        throw new IllegalStateException(String.format("Unknown state %s.", aggOutcome));
     }
   }
 
@@ -309,7 +434,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions());
     cg.getCodeGenerator().plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
-    //cg.getCodeGenerator().saveCodeForDebugging(true);
+    //  cg.getCodeGenerator().saveCodeForDebugging(true);
     container.clear();
 
     LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()];
@@ -496,6 +621,25 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     }
   }
 
+  private IterOutcome getFinalOutcome() {
+    IterOutcome outcomeToReturn;
+
+    if (firstBatchForDataSet) {
+      firstBatchForDataSet = false;
+    }
+    if (firstBatchForSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchForSchema = false;
+    } else if (lastKnownOutcome == EMIT) {
+      firstBatchForDataSet = true;
+      outcomeToReturn = EMIT;
+    } else {
+      // get the outcome to return before calling refresh since that resets the lastKnowOutcome to OK
+      outcomeToReturn = (recordCount == 0) ? NONE : OK;
+    }
+    return outcomeToReturn;
+  }
+
   @Override
   public void close() {
     super.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index fb4d508..a752c7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -25,26 +25,49 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
 
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
 public abstract class StreamingAggTemplate implements StreamingAggregator {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class);
   private static final boolean EXTRA_DEBUG = false;
   private static final int OUTPUT_BATCH_SIZE = 32*1024;
 
+  // lastOutcome is set ONLY if the lastOutcome was NONE or STOP
   private IterOutcome lastOutcome = null;
+
+  // First batch after build schema phase
   private boolean first = true;
+  private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA.
+  private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set
+
   private boolean newSchema = false;
-  private int previousIndex = -1;
+
+  // End of all data
+  private boolean done = false;
+
+  // index in the incoming (sv4/sv2/vector)
   private int underlyingIndex = 0;
-  private int currentIndex;
+  // The indexes below refer to the actual record indexes in input batch
+  // (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if a vector then the record index itself)
+  private int previousIndex = -1;  // the last index that has been processed. Initialized to -1 every time a new
+                                   // aggregate group begins (including every time a new data set begins)
+  private int currentIndex; // current index being processed
   /**
    * Number of records added to the current aggregation group.
    */
   private long addedRecordCount = 0;
+  // There are two outcomes from the aggregator. One is the aggregator's outcome defined in
+  // StreamingAggregator.AggOutcome. The other is the outcome from the last call to incoming.next
   private IterOutcome outcome;
+  // Number of aggregation groups added into the output batch
   private int outputCount = 0;
   private RecordBatch incoming;
+  // the Streaming Agg Batch that this aggregator belongs to
   private StreamingAggBatch outgoing;
-  private boolean done = false;
+
   private OperatorContext context;
 
 
@@ -73,45 +96,67 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
   }
 
   @Override
-  public AggOutcome doWork() {
-    if (done) {
+  public AggOutcome doWork(IterOutcome outerOutcome) {
+    if (done || outerOutcome == NONE) {
       outcome = IterOutcome.NONE;
       return AggOutcome.CLEANUP_AND_RETURN;
     }
-    try { // outside loop to ensure that first is set to false after the first run.
+
+    try { // outside block to ensure that first is set to false after the first run.
       outputCount = 0;
       // allocate outgoing since either this is the first time or if a subsequent time we would
       // have sent the previous outgoing batch to downstream operator
       allocateOutgoing();
 
-      if (first) {
+      if (firstBatchForDataSet) {
         this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex);
 
-        // consume empty batches until we get one with data.
-        if (incoming.getRecordCount() == 0) {
-          outer: while (true) {
-            IterOutcome out = outgoing.next(0, incoming);
-            switch (out) {
-            case OK_NEW_SCHEMA:
-            case OK:
-              if (incoming.getRecordCount() == 0) {
-                continue;
-              } else {
-                currentIndex = this.getVectorIndex(underlyingIndex);
-                break outer;
-              }
-            case OUT_OF_MEMORY:
-              outcome = out;
-              return AggOutcome.RETURN_OUTCOME;
-            case NONE:
-              out = IterOutcome.OK_NEW_SCHEMA;
-            case STOP:
-            default:
-              lastOutcome = out;
-              outcome = out;
-              done = true;
-              return AggOutcome.CLEANUP_AND_RETURN;
-            }
+        if (outerOutcome == OK_NEW_SCHEMA) {
+          firstBatchForSchema = true;
+        }
+        // consume empty batches until we get one with data (unless we got an EMIT). If we got an emit
+        // then this is the first batch, it was empty and we also got an emit.
+        if (incoming.getRecordCount() == 0 ) {
+          if (outerOutcome != EMIT) {
+            outer:
+            while (true) {
+              IterOutcome out = outgoing.next(0, incoming);
+              switch (out) {
+                case OK_NEW_SCHEMA:
+                  //lastOutcome = out;
+                  firstBatchForSchema = true;
+                case OK:
+                  if (incoming.getRecordCount() == 0) {
+                    continue;
+                  } else {
+                    currentIndex = this.getVectorIndex(underlyingIndex);
+                    break outer;
+                  }
+                case OUT_OF_MEMORY:
+                  outcome = out;
+                  return AggOutcome.RETURN_OUTCOME;
+                case EMIT:
+                  if (incoming.getRecordCount() == 0) {
+                    // When we see an EMIT we let the  agg record batch know that it should either
+                    // send out an EMIT or an OK_NEW_SCHEMA, followed by an EMIT. To do that we simply return
+                    // RETURN_AND_RESET with the outcome so the record batch can take care of it.
+                    return setOkAndReturnEmit();
+                  } else {
+                    break outer;
+                  }
+
+                case NONE:
+                  out = IterOutcome.OK_NEW_SCHEMA;
+                case STOP:
+                default:
+                  lastOutcome = out;
+                  outcome = out;
+                  done = true;
+                  return AggOutcome.CLEANUP_AND_RETURN;
+              } // switch (outcome)
+            } // while empty batches are seen
+          } else {
+            return setOkAndReturnEmit();
           }
         }
       }
@@ -121,49 +166,24 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
         return AggOutcome.UPDATE_AGGREGATOR;
       }
 
-      if (lastOutcome != null) {
+      // if the previous iteration has an outcome that was terminal, don't do anything.
+      if (lastOutcome != null /*&& lastOutcome != IterOutcome.OK_NEW_SCHEMA*/) {
         outcome = lastOutcome;
         return AggOutcome.CLEANUP_AND_RETURN;
       }
 
       outside: while(true) {
-      // loop through existing records, adding as necessary.
-        for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
-          if (EXTRA_DEBUG) {
-            logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
-          }
-          if (previousIndex == -1) {
-            if (EXTRA_DEBUG) {
-              logger.debug("Adding the initial row's keys and values.");
-            }
-            addRecordInc(currentIndex);
-          }
-          else if (isSame( previousIndex, currentIndex )) {
-            if (EXTRA_DEBUG) {
-              logger.debug("Values were found the same, adding.");
-            }
-            addRecordInc(currentIndex);
-          } else {
-            if (EXTRA_DEBUG) {
-              logger.debug("Values were different, outputting previous batch.");
-            }
-            if(!outputToBatch(previousIndex)) {
-              // There is still space in outgoing container, so proceed to the next input.
-              if (EXTRA_DEBUG) {
-                logger.debug("Output successful.");
-              }
-              addRecordInc(currentIndex);
-            } else {
-              if (EXTRA_DEBUG) {
-                logger.debug("Output container has reached its capacity. Flushing it.");
-              }
-
-              // Update the indices to set the state for processing next record in incoming batch in subsequent doWork calls.
-              previousIndex = -1;
-              return setOkAndReturn();
-            }
-          }
-          previousIndex = currentIndex;
+        // loop through existing records, adding as necessary.
+        if(!processRemainingRecordsInBatch()) {
+          // output batch is full. Return.
+          return setOkAndReturn();
+        }
+        // if the current batch came with an EMIT, we're done
+        if(outerOutcome == EMIT) {
+          // output the last record
+          outputToBatch(previousIndex);
+          resetIndex();
+          return setOkAndReturnEmit();
         }
 
         /**
@@ -189,83 +209,122 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
               logger.debug("Received IterOutcome of {}", out);
             }
             switch (out) {
-            case NONE:
-              done = true;
-              lastOutcome = out;
-              if (first && addedRecordCount == 0) {
-                return setOkAndReturn();
-              } else if (addedRecordCount > 0) {
-                outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
-                // (output container full or not) as we are not going to insert any more records.
-                if (EXTRA_DEBUG) {
-                  logger.debug("Received no more batches, returning.");
+              case NONE:
+                done = true;
+                lastOutcome = out;
+                if (firstBatchForDataSet && addedRecordCount == 0) {
+                  return setOkAndReturn();
+                } else if (addedRecordCount > 0) {
+                  outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
+                  // (output container full or not) as we are not going to insert any more records.
+                  if (EXTRA_DEBUG) {
+                    logger.debug("Received no more batches, returning.");
+                  }
+                  return setOkAndReturn();
+                } else {
+                  // not first batch and record Count == 0
+                  outcome = out;
+                  return AggOutcome.CLEANUP_AND_RETURN;
                 }
-                return setOkAndReturn();
-              } else {
-                if (first && out == IterOutcome.OK) {
-                  out = IterOutcome.OK_NEW_SCHEMA;
+                // EMIT is handled like OK, except that we do not loop back to process the
+                // next incoming batch; we return instead
+              case EMIT:
+                if (incoming.getRecordCount() == 0) {
+                  if (addedRecordCount > 0) {
+                    outputToBatchPrev(previous, previousIndex, outputCount);
+                  }
+                } else {
+                  resetIndex();
+                  if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) {
+                    if (EXTRA_DEBUG) {
+                      logger.debug("New value was same as last value of previous batch, adding.");
+                    }
+                    addRecordInc(currentIndex);
+                    previousIndex = currentIndex;
+                    incIndex();
+                    if (EXTRA_DEBUG) {
+                      logger.debug("Continuing outside");
+                    }
+                  } else { // not the same
+                    if (EXTRA_DEBUG) {
+                      logger.debug("This is not the same as the previous, add record and continue outside.");
+                    }
+                    if (addedRecordCount > 0) {
+                      if (outputToBatchPrev(previous, previousIndex, outputCount)) {
+                        if (EXTRA_DEBUG) {
+                          logger.debug("Output container is full. flushing it.");
+                        }
+                        return setOkAndReturnEmit();
+                      }
+                    }
+                    // important to set the previous index to -1 since we start a new group
+                    previousIndex = -1;
+                  }
+                  processRemainingRecordsInBatch();
+                  outputToBatch(previousIndex); // currentIndex has been reset to int_max so use previous index.
                 }
-                outcome = out;
-                return AggOutcome.CLEANUP_AND_RETURN;
-              }
-
-            case NOT_YET:
-              this.outcome = out;
-              return AggOutcome.RETURN_OUTCOME;
-
-            case OK_NEW_SCHEMA:
-              if (EXTRA_DEBUG) {
-                logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
-              }
-              if (addedRecordCount > 0) {
-                outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
-                // (output container full or not) as we are not going to insert anymore records.
+                resetIndex();
+                return setOkAndReturnEmit();
+
+              case NOT_YET:
+                this.outcome = out;
+                return AggOutcome.RETURN_OUTCOME;
+
+              case OK_NEW_SCHEMA:
+                firstBatchForSchema = true;
+                //lastOutcome = out;
                 if (EXTRA_DEBUG) {
-                  logger.debug("Wrote out end of previous batch, returning.");
+                  logger.debug("Received new schema.  Batch has {} records.", incoming.getRecordCount());
                 }
-                newSchema = true;
-                return setOkAndReturn();
-              }
-              cleanup();
-              return AggOutcome.UPDATE_AGGREGATOR;
-            case OK:
-              resetIndex();
-              if (incoming.getRecordCount() == 0) {
-                continue;
-              } else {
-                if (previousIndex != -1 && isSamePrev(previousIndex , previous, currentIndex)) {
-                  if (EXTRA_DEBUG) {
-                    logger.debug("New value was same as last value of previous batch, adding.");
-                  }
-                  addRecordInc(currentIndex);
-                  previousIndex = currentIndex;
-                  incIndex();
-                  if (EXTRA_DEBUG) {
-                    logger.debug("Continuing outside");
-                  }
-                  continue outside;
-                } else { // not the same
+                if (addedRecordCount > 0) {
+                  outputToBatchPrev(previous, previousIndex, outputCount); // No need to check the return value
+                  // (output container full or not) as we are not going to insert anymore records.
                   if (EXTRA_DEBUG) {
-                    logger.debug("This is not the same as the previous, add record and continue outside.");
+                    logger.debug("Wrote out end of previous batch, returning.");
                   }
-                  if (addedRecordCount > 0) {
-                    if (outputToBatchPrev(previous, previousIndex, outputCount)) {
-                      if (EXTRA_DEBUG) {
-                        logger.debug("Output container is full. flushing it.");
+                  newSchema = true;
+                  return setOkAndReturn();
+                }
+                cleanup();
+                return AggOutcome.UPDATE_AGGREGATOR;
+              case OK:
+                resetIndex();
+                if (incoming.getRecordCount() == 0) {
+                  continue;
+                } else {
+                  if (previousIndex != -1 && isSamePrev(previousIndex, previous, currentIndex)) {
+                    if (EXTRA_DEBUG) {
+                      logger.debug("New value was same as last value of previous batch, adding.");
+                    }
+                    addRecordInc(currentIndex);
+                    previousIndex = currentIndex;
+                    incIndex();
+                    if (EXTRA_DEBUG) {
+                      logger.debug("Continuing outside");
+                    }
+                    continue outside;
+                  } else { // not the same
+                    if (EXTRA_DEBUG) {
+                      logger.debug("This is not the same as the previous, add record and continue outside.");
+                    }
+                    if (addedRecordCount > 0) {
+                      if (outputToBatchPrev(previous, previousIndex, outputCount)) {
+                        if (EXTRA_DEBUG) {
+                          logger.debug("Output container is full. flushing it.");
+                        }
+                        previousIndex = -1;
+                        return setOkAndReturn();
                       }
-                      previousIndex = -1;
-                      return setOkAndReturn();
                     }
+                    previousIndex = -1;
+                    continue outside;
                   }
-                  previousIndex = -1;
-                  continue outside;
                 }
-              }
-            case STOP:
-            default:
-              lastOutcome = out;
-              outcome = out;
-              return AggOutcome.CLEANUP_AND_RETURN;
+              case STOP:
+              default:
+                lastOutcome = out;
+                outcome = out;
+                return AggOutcome.CLEANUP_AND_RETURN;
             }
           }
         } finally {
@@ -277,12 +336,63 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
       }
     } finally {
       if (first) {
-        first = !first;
+        first = false;
       }
     }
 
   }
 
+  @Override
+  public boolean isDone() {
+    return done;
+  }
+
+  /**
+   * Process the remaining records in the batch. Returns false if not all records are processed (if the output
+   * container gets full), true otherwise.
+   * @return  Boolean indicating all records were processed
+   */
+  private boolean processRemainingRecordsInBatch() {
+    for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
+      if (EXTRA_DEBUG) {
+        logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
+      }
+      if (previousIndex == -1) {
+        if (EXTRA_DEBUG) {
+          logger.debug("Adding the initial row's keys and values.");
+        }
+        addRecordInc(currentIndex);
+      }
+      else if (isSame( previousIndex, currentIndex )) {
+        if (EXTRA_DEBUG) {
+          logger.debug("Values were found the same, adding.");
+        }
+        addRecordInc(currentIndex);
+      } else {
+        if (EXTRA_DEBUG) {
+          logger.debug("Values were different, outputting previous batch.");
+        }
+        if(!outputToBatch(previousIndex)) {
+          // There is still space in outgoing container, so proceed to the next input.
+          if (EXTRA_DEBUG) {
+            logger.debug("Output successful.");
+          }
+          addRecordInc(currentIndex);
+        } else {
+          if (EXTRA_DEBUG) {
+            logger.debug("Output container has reached its capacity. Flushing it.");
+          }
+
+          // Update the indices to set the state for processing next record in incoming batch in subsequent doWork calls.
+          previousIndex = -1;
+          return false;
+        }
+      }
+      previousIndex = currentIndex;
+    }
+    return true;
+  }
+
   private final void incIndex() {
     underlyingIndex++;
     if (underlyingIndex >= incoming.getRecordCount()) {
@@ -297,18 +407,51 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
     incIndex();
   }
 
+  /**
+   * Set the outcome to OK (or OK_NEW_SCHEMA) and return the AggOutcome parameter
+   *
+   * @return outcome
+   */
   private final AggOutcome setOkAndReturn() {
-    if (first) {
-      this.outcome = IterOutcome.OK_NEW_SCHEMA;
+    IterOutcome outcomeToReturn;
+    firstBatchForDataSet = false;
+    if (firstBatchForSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchForSchema = false;
     } else {
-      this.outcome = IterOutcome.OK;
+      outcomeToReturn = OK;
     }
+    this.outcome = outcomeToReturn;
+
     for (VectorWrapper<?> v : outgoing) {
       v.getValueVector().getMutator().setValueCount(outputCount);
     }
     return AggOutcome.RETURN_OUTCOME;
   }
 
+  /**
+   * setOkAndReturn (as above) if the iter outcome was EMIT
+   *
+   * @return outcome
+   */
+  private final AggOutcome setOkAndReturnEmit() {
+    IterOutcome outcomeToReturn;
+    firstBatchForDataSet = true;
+    previousIndex = -1;
+    if (firstBatchForSchema) {
+      outcomeToReturn = OK_NEW_SCHEMA;
+      firstBatchForSchema = false;
+    } else {
+      outcomeToReturn = EMIT;
+    }
+    this.outcome = outcomeToReturn;
+
+    for (VectorWrapper<?> v : outgoing) {
+      v.getValueVector().getMutator().setValueCount(outputCount);
+    }
+    return AggOutcome.RETURN_AND_RESET;
+  }
+
   // Returns output container status after insertion of the given record. Caller must check the return value if it
   // plans to insert more records into outgoing container.
   private final boolean outputToBatch(int inIndex) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index a300924..2a64b93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -27,8 +27,30 @@ public interface StreamingAggregator {
 
   public static TemplateClassDefinition<StreamingAggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<StreamingAggregator>(StreamingAggregator.class, StreamingAggTemplate.class);
 
+
+  /**
+   * The Aggregator can return one of the following outcomes:
+   * <p>
+   * <b>RETURN_OUTCOME:</b> The aggregation has seen a change in the group and should send data downstream. If
+   * complex writers are involved, then rebuild schema.
+   * <p>
+   * <b>CLEANUP_AND_RETURN:</b> End of all data. Return the data downstream, and cleanup.
+   * <p>
+   * <b>UPDATE_AGGREGATOR:</b> A schema change was encountered. The aggregator's generated  code and (possibly)
+   * container need to be updated
+   * <p>
+   * <b>RETURN_AND_RESET:</b> If the aggregator encounters an EMIT, then that implies the end of a data set but
+   * not of all the data. Return the data (aggregated so far) downstream, reset the internal state variables and
+   * come back for the next data set.
+   * <p>
+   * @see org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome HashAggregator.AggOutcome
+   */
   public static enum AggOutcome {
-    RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
+    RETURN_OUTCOME,
+    CLEANUP_AND_RETURN,
+    UPDATE_AGGREGATOR,
+    RETURN_AND_RESET
+    ;
   }
 
   public abstract void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException;
@@ -37,7 +59,11 @@ public interface StreamingAggregator {
 
   public abstract int getOutputCount();
 
-  public abstract AggOutcome doWork();
+  // do the work. Also pass in the Iteroutcome of the batch already read in case it might be an EMIT. If the
+  // outerOutcome is EMIT, we need to do the work without reading any more batches.
+  public abstract AggOutcome doWork(IterOutcome outerOutcome);
+
+  public abstract boolean isDone();
 
   public abstract void cleanup();
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
new file mode 100644
index 0000000..75c4598
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.agg;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
+import org.apache.drill.exec.physical.impl.MockRecordBatch;
+import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.schema.SchemaBuilder;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category(OperatorTest.class)
+public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
+  //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestStreamingAggEmitOutcome.class);
+  protected static TupleMetadata resultSchema;
+
+  @BeforeClass
+  public static void setUpBeforeClass2() throws Exception {
+    resultSchema = new SchemaBuilder()
+        .add("name", TypeProtos.MinorType.VARCHAR)
+        .addNullable("total_sum", TypeProtos.MinorType.BIGINT)
+        .buildSchema();
+  }
+
+  /**
+   * Verifies that if StreamingAggBatch receives empty batches with OK_NEW_SCHEMA and EMIT outcome then it correctly produces
+   * empty batches as output. First empty batch will be with OK_NEW_SCHEMA and second will be with EMIT outcome.
+   */
+  @Test
+  public void t1_testStreamingAggrEmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+
+  /**
+   * Verifies that if StreamingAgg receives a RecordBatch with EMIT outcome post build schema phase then it produces
+   * output for those input batch correctly. The first output batch will always be returned with OK_NEW_SCHEMA
+   * outcome followed by EMIT with empty batch. The test verifies the output order with the expected baseline.
+   */
+  @Test
+  public void t2_testStreamingAggrNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(13, 130, "item13")
+      .addRow(2, 20, "item2")
+      .addRow(2, 20, "item2")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)11)
+      .addRow("item13", (long)286)
+      .addRow("item2", (long)44)
+      .addRow("item4", (long)44)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // Data before EMIT is returned with an OK_NEW_SCHEMA.
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(4, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // EMIT comes with an empty batch
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void t3_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(0, 1300, "item13")
+      .addRow(2, 20, "item2")
+      .addRow(0, 2000, "item2")
+      .addRow(4, 40, "item4")
+      .addRow(0, 4000, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item13", (long)1443)
+      .addRow("item2", (long)2022)
+      .addRow("item4", (long)4044)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(3, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void t4_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(13, 130, "item13")
+      .addRow(0, 0, "item13")
+      .addRow(1, 33000, "item13")
+      .addRow(2, 20, "item2")
+      .addRow(0, 0, "item2")
+      .addRow(1, 11000, "item2")
+      .addRow(4, 40, "item4")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item13", (long)33144)
+      .addRow("item2", (long)11023)
+      .addRow("item4", (long)44)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(3, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   * Verifies that if StreamingAggr receives multiple non-empty record batch with EMIT outcome in between then it produces
+   * output for those input batch correctly. In this case it receives first non-empty batch with OK_NEW_SCHEMA in
+   * buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the
+   * record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and
+   * produces output for it differently. The test validates that for each output received the order of the records are
+   * correct.
+   * @throws Exception
+   */
+  @Test
+  public void t5_testStreamingAgrResetsAfterFirstEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .addRow(3, 30, "item3")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)11)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item2", (long)44)
+      .addRow("item3", (long)330)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   * Verifies that if StreamingAggr receives multiple non-empty record batch with EMIT outcome in between then it produces
+   * output for those input batch correctly. In this case it receives first non-empty batch with OK_NEW_SCHEMA in
+   * buildSchema phase followed by an empty batch with EMIT outcome. For this combination it produces output for the
+   * record received so far along with EMIT outcome. Then it receives second non-empty batch with OK outcome and
+   * produces output for it differently. The test validates that for each output received the order of the records are
+   * correct.
+   * @throws Exception
+   */
+  @Test
+  public void t6_testStreamingAggrOkFollowedByNone() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .addRow(4, 40, "item4")
+      .addRow(4, 40, "item4")
+      .addRow(5, 50, "item5")
+      .addRow(5, 50, "item5")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)11)
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item2", (long)22)
+      .addRow("item3", (long)33)
+      .addRow("item4", (long)88)
+      .addRow("item5", (long)110)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(4, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   * Normal case
+   */
+  @Test
+  public void t7_testStreamingAggrMultipleEMITOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .addRow(3, 30, "item3")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(2, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  /**
+   *
+   */
+  @Test
+  public void t8_testStreamingAggrMultipleInputToSingleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)11)
+      .addRow("item2", (long)22)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(2, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+
+  /*****************************************************************************************
+   Tests for validating regular StreamingAggr behavior with no EMIT outcome
+   ******************************************************************************************/
+  @Test
+  public void t9_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item1")
+      .addRow(13, 130, "item13")
+      .addRow(13, 130, "item13")
+      .addRow(13, 130, "item13")
+      .addRow(130, 1300, "item130")
+      .addRow(0, 0, "item130")
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(23, 230, "item23")
+      .addRow(3, 33, "item3")
+      .addRow(7, 70, "item7")
+      .addRow(17, 170, "item7")
+      .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchema)
+      .addRow("item1", (long)33)
+      .addRow("item13", (long)429)
+      .addRow("item130", (long)1430)
+      .addRow("item23", (long)253)
+      .addRow("item3", (long)36)
+      .addRow("item7", (long)264)
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      parseExprs("name_left", "name"),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(6, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    expectedRowSet.clear();
+  }
+
+  @Test
+  public void t10_testStreamingAggrWithEmptyDataSet() {
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        parseExprs("name_left", "name"),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
index c57093c..17a9d33 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/lateraljoin/TestE2EUnnestAndLateral.java
@@ -449,6 +449,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       + "dfs.`lateraljoin/multipleFiles/` customer) t1, LATERAL (SELECT CAST(MAX(t.ord.o_totalprice)"
       + " AS int) AS maxprice FROM UNNEST(t1.c_orders) t(ord) GROUP BY t.ord.o_orderstatus) t2";
 
+    try {
     testBuilder()
       .optionSettingQueriesForTestQuery("alter session set `%s` = false",
         PlannerSettings.STREAMAGG.getOptionName())
@@ -462,6 +463,9 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       .baselineValues(235695)
       .baselineValues(177819)
       .build().run();
+    } finally {
+      test("alter session set `" + PlannerSettings.STREAMAGG.getOptionName() + "` = true");
+    }
   }
 
   @Test
@@ -469,6 +473,7 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
     String sql = "SELECT key, t3.dsls FROM cp.`lateraljoin/with_nulls.json` t LEFT OUTER "
     + "JOIN LATERAL (SELECT DISTINCT t2.sls AS dsls FROM UNNEST(t.sales) t2(sls)) t3 ON TRUE";
 
+    try {
     testBuilder()
       .optionSettingQueriesForTestQuery("alter session set `%s` = false",
         PlannerSettings.STREAMAGG.getOptionName())
@@ -484,5 +489,63 @@ public class TestE2EUnnestAndLateral extends ClusterTest {
       .baselineValues("dd",111L)
       .baselineValues("dd",222L)
       .build().run();
+    } finally {
+      test("alter session set `" + PlannerSettings.STREAMAGG.getOptionName() + "` = true");
+    }
+  }
+
+  @Test
+  public void testMultipleBatchesLateral_WithStreamingAgg() throws Exception {
+    String sql = "SELECT t2.maxprice FROM (SELECT customer.c_orders AS c_orders FROM "
+        + "dfs.`lateraljoin/multipleFiles/` customer) t1, LATERAL (SELECT CAST(MAX(t.ord.o_totalprice)"
+        + " AS int) AS maxprice FROM UNNEST(t1.c_orders) t(ord) GROUP BY t.ord.o_orderstatus) t2";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("maxprice")
+        .baselineValues(367190)
+        .baselineValues(316347)
+        .baselineValues(146610)
+        .baselineValues(306996)
+        .baselineValues(235695)
+        .baselineValues(177819)
+        .build().run();
   }
+
+  @Test
+  public void testLateral_StreamingAgg_with_nulls() throws Exception {
+    String sql = "SELECT key, t3.dsls FROM cp.`lateraljoin/with_nulls.json` t LEFT OUTER "
+        + "JOIN LATERAL (SELECT DISTINCT t2.sls AS dsls FROM UNNEST(t.sales) t2(sls)) t3 ON TRUE";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("key","dsls")
+        .baselineValues("aa",null)
+        .baselineValues("bb",100L)
+        .baselineValues("bb",200L)
+        .baselineValues("bb",300L)
+        .baselineValues("bb",400L)
+        .baselineValues("cc",null)
+        .baselineValues("dd",111L)
+        .baselineValues("dd",222L)
+        .build().run();
+  }
+
+  @Test
+  public void testMultipleBatchesLateral_WithStreamingAggNoGroup() throws Exception {
+    String sql = "SELECT t2.maxprice FROM (SELECT customer.c_orders AS c_orders FROM "
+        + "dfs.`lateraljoin/multipleFiles/` customer) t1, LATERAL (SELECT CAST(MAX(t.ord.o_totalprice)"
+        + " AS int) AS maxprice FROM UNNEST(t1.c_orders) t(ord) ) t2";
+
+    testBuilder()
+        .sqlQuery(sql)
+        .unOrdered()
+        .baselineColumns("maxprice")
+        .baselineValues(367190)
+        .baselineValues(306996)
+        .build().run();
+  }
+
 }


[drill] 01/02: DRILL-6516: Fix memory leak issue with Sort and StreamingAgg together

Posted by pa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

parthc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 7655ec4f54976def63101daabf34e51697978c57
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Sun Jul 1 00:02:55 2018 -0600

    DRILL-6516: Fix memory leak issue with Sort and StreamingAgg together
---
 .../impl/xsort/managed/ExternalSortBatch.java      | 54 +++++++++++-----------
 1 file changed, 28 insertions(+), 26 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index ea7f51f..7db4d3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -315,7 +315,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     case START:
       return load();
     case LOAD:
-      resetSortState();
+      if (!this.retainInMemoryBatchesOnNone) {
+        resetSortState();
+      }
       return (sortState == SortState.DONE) ? NONE : load();
     case DELIVER:
       return nextOutputBatch();
@@ -578,36 +580,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     }
     if (incoming instanceof ExternalSortBatch) {
       ExternalSortBatch esb = (ExternalSortBatch) incoming;
-      esb.releaseResources();
+      esb.resetSortState();
     }
   }
 
   private void releaseResources() {
-    // This means if it has received NONE outcome and flag to retain is false OR if it has seen an EMIT
-    // then release the resources
-    if ((sortState == SortState.DONE && !this.retainInMemoryBatchesOnNone) ||
-      (sortState == SortState.LOAD)) {
-
-      // Close the iterator here to release any remaining resources such
-      // as spill files. This is important when a query has a join: the
-      // first branch sort may complete before the second branch starts;
-      // it may be quite a while after returning the last batch before the
-      // fragment executor calls this operator's close method.
-      //
-      // Note however, that the StreamingAgg operator REQUIRES that the sort
-      // retain the batches behind an SV4 when doing an in-memory sort because
-      // the StreamingAgg retains a reference to that data that it will use
-      // after receiving a NONE result code. See DRILL-5656.
-      //zeroResources();
-      if (resultsIterator != null) {
-        resultsIterator.close();
-      }
-      // We only zero vectors for actual output container
-      outputWrapperContainer.clear();
-      outputSV4.clear();
-      container.zeroVectors();
+    if (resultsIterator != null) {
+      resultsIterator.close();
     }
 
+    // We only zero vectors for actual output container
+    outputWrapperContainer.clear();
+    outputSV4.clear();
+    container.zeroVectors();
+
     // Close sortImpl for this boundary
     if (sortImpl != null) {
       sortImpl.close();
@@ -620,6 +606,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
    */
   private void resetSortState() {
     sortState = (lastKnownOutcome == EMIT) ? SortState.LOAD : SortState.DONE;
+    // This means if it has received NONE/EMIT outcome and flag to retain is false which will be the case in presence of
+    // StreamingAggBatch only since it will explicitly call releaseBacthes on ExternalSort when its done consuming
+    // all the data buffer.
+
+    // Close the iterator here to release any remaining resources such
+    // as spill files. This is important when a query has a join: the
+    // first branch sort may complete before the second branch starts;
+    // it may be quite a while after returning the last batch before the
+    // fragment executor calls this operator's close method.
+    //
+    // Note however, that the StreamingAgg operator REQUIRES that the sort
+    // retain the batches behind an SV4 when doing an in-memory sort because
+    // the StreamingAgg retains a reference to that data that it will use
+    // after receiving a NONE result code. See DRILL-5656.
     releaseResources();
 
     if (lastKnownOutcome == EMIT) {
@@ -674,7 +674,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       sortState = SortState.DELIVER;
     } else if (getRecordCount() == 0) { // There is no record to send downstream
       outcomeToReturn = lastKnownOutcome == EMIT ? EMIT : NONE;
-      resetSortState();
+      if (!this.retainInMemoryBatchesOnNone) {
+        resetSortState();
+      }
     } else if (lastKnownOutcome == EMIT) {
       final boolean hasMoreRecords = outputSV4.hasNext();
       sortState = hasMoreRecords ? SortState.DELIVER : SortState.LOAD;