You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/08/01 18:36:13 UTC

[drill] 02/03: DRILL-6631: Streaming agg causes queries with Lateral and Unnest to return incorrect results.

This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit ee841643d0f2d746126f623d6a0c480b3011d38f
Author: Parth Chandra <pa...@apache.org>
AuthorDate: Fri Jul 20 17:24:38 2018 -0700

    DRILL-6631: Streaming agg causes queries with Lateral and Unnest to return incorrect results.
    
    This commit fixes issues with handling straight aggregates (no group by)
    with empty batches received between EMIT(s).
    
    closes #1399
---
 .../physical/impl/aggregate/StreamingAggBatch.java |  39 +-
 .../impl/aggregate/StreamingAggTemplate.java       |   2 +-
 .../impl/agg/TestStreamingAggEmitOutcome.java      | 553 +++++++++++++++++++++
 3 files changed, 573 insertions(+), 21 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 882c36d..70880c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -188,16 +188,19 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   public IterOutcome innerNext() {
 
     // if a special batch has been sent, we have no data in the incoming so exit early
-    if ( done || specialBatchSent) {
+    if (done || specialBatchSent) {
+      assert (sendEmit != true); // if special batch sent with emit then flag will not be set
       return NONE;
     }
 
     // We sent an OK_NEW_SCHEMA and also encountered the end of a data set. So we need to send
     // an EMIT with an empty batch now
     if (sendEmit) {
+      first = false; // first is set only in the case when we see a NONE after an empty first (and only) batch
       sendEmit = false;
       firstBatchForDataSet = true;
       recordCount = 0;
+      specialBatchSent = false;
       return EMIT;
     }
 
@@ -212,15 +215,19 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       logger.debug("Next outcome of {}", lastKnownOutcome);
       switch (lastKnownOutcome) {
         case NONE:
-          if (firstBatchForDataSet && popConfig.getKeys().size() == 0) {
+
+          if (first && popConfig.getKeys().size() == 0) {
             // if we have a straight aggregate and empty input batch, we need to handle it in a different way
+            // Wewant to produce the special batch only if we got a NONE as the first outcome after
+            // OK_NEW_SCHEMA. If we get a NONE immediately after we see an EMIT, then we have already handled
+            // the case of the empty batch
             constructSpecialBatch();
             // set state to indicate the fact that we have sent a special batch and input is empty
             specialBatchSent = true;
             // If outcome is NONE then we send the special batch in the first iteration and the NONE
             // outcome in the next iteration. If outcome is EMIT, we can send the special
             // batch and the EMIT outcome at the same time.
-            return getFinalOutcome();
+            return IterOutcome.OK;
           }
           // else fall thru
         case OUT_OF_MEMORY:
@@ -238,13 +245,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           // we have to do the special handling
           if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && incoming.getRecordCount() == 0) {
             constructSpecialBatch();
-            // set state to indicate the fact that we have sent a special batch and input is empty
-            specialBatchSent = true;
             firstBatchForDataSet = true; // reset on the next iteration
             // If outcome is NONE then we send the special batch in the first iteration and the NONE
             // outcome in the next iteration. If outcome is EMIT, we can send the special
-            // batch and the EMIT outcome at the same time.
-            return getFinalOutcome();
+            // batch and the EMIT outcome at the same time. (unless the finalOutcome is OK_NEW_SCHEMA)
+            IterOutcome finalOutcome =  getFinalOutcome();
+            return finalOutcome;
           }
           // else fall thru
         case OK:
@@ -269,13 +275,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           }
         }
       }
-      // We sent an EMIT in the previous iteration, so we must be starting a new data set
-      if (firstBatchForDataSet) {
-        done = false;
-        sendEmit = false;
-        specialBatchSent = false;
-        firstBatchForDataSet = false;
-      }
     }
     AggOutcome aggOutcome = aggregator.doWork(lastKnownOutcome);
     recordCount = aggregator.getOutputCount();
@@ -296,14 +295,15 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
         if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && recordCount == 0) {
           // if we have a straight aggregate and empty input batch, we need to handle it in a different way
           constructSpecialBatch();
-          // set state to indicate the fact that we have sent a special batch and input is empty
-          specialBatchSent = true;
           // If outcome is NONE then we send the special batch in the first iteration and the NONE
           // outcome in the next iteration. If outcome is EMIT, we can send the special
           // batch and the EMIT outcome at the same time.
-          return getFinalOutcome();
+
+          IterOutcome finalOutcome =  getFinalOutcome();
+          return finalOutcome;
         }
         firstBatchForDataSet = true;
+        firstBatchForSchema = false;
         if(first) {
           first = false;
         }
@@ -332,9 +332,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
           }
         } else if (lastKnownOutcome == OK && first) {
           lastKnownOutcome = OK_NEW_SCHEMA;
-        } else if (lastKnownOutcome != IterOutcome.OUT_OF_MEMORY) {
-          first = false;
         }
+        first = false;
         return lastKnownOutcome;
       case UPDATE_AGGREGATOR:
         // We could get this either between data sets or within a data set.
@@ -629,12 +628,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
     }
     if (firstBatchForSchema) {
       outcomeToReturn = OK_NEW_SCHEMA;
+      sendEmit = true;
       firstBatchForSchema = false;
     } else if (lastKnownOutcome == EMIT) {
       firstBatchForDataSet = true;
       outcomeToReturn = EMIT;
     } else {
-      // get the outcome to return before calling refresh since that resets the lastKnowOutcome to OK
       outcomeToReturn = (recordCount == 0) ? NONE : OK;
     }
     return outcomeToReturn;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index a752c7e..9165850 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -40,7 +40,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
 
   // First batch after build schema phase
   private boolean first = true;
-  private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA.
+  private boolean firstBatchForSchema = false; // true if the current batch came in with an OK_NEW_SCHEMA.
   private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set
 
   private boolean newSchema = false;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
index 75c4598..2183efa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.agg;
 
 import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
@@ -33,6 +34,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.ArrayList;
+
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
 import static org.junit.Assert.assertEquals;
@@ -42,6 +45,7 @@ import static org.junit.Assert.assertTrue;
 public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
   //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestStreamingAggEmitOutcome.class);
   protected static TupleMetadata resultSchema;
+  protected static TupleMetadata resultSchemaNoGroupBy;
 
   @BeforeClass
   public static void setUpBeforeClass2() throws Exception {
@@ -49,6 +53,9 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
         .add("name", TypeProtos.MinorType.VARCHAR)
         .addNullable("total_sum", TypeProtos.MinorType.BIGINT)
         .buildSchema();
+    resultSchemaNoGroupBy = new SchemaBuilder()
+        .addNullable("total_sum", TypeProtos.MinorType.BIGINT)
+        .buildSchema();
   }
 
   /**
@@ -611,4 +618,550 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
   }
+
+  /*******************************************************
+   * Tests for EMIT with empty batches and no group by
+   * (Tests t1-t8 are repeated with no group by)
+   *******************************************************/
+
+
+  /**
+   * Repeats t1_testStreamingAggrEmptyBatchEmitOutcome with no group by
+   */
+  @Test
+  public void t11_testStreamingAggrEmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+
+  /**
+   Repeats t2_testStreamingAggrNonEmptyBatchEmitOutcome with no group by
+   */
+  @Test
+  public void t12_testStreamingAggrNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(13, 130, "item13")
+        .addRow(13, 130, "item13")
+        .addRow(2, 20, "item2")
+        .addRow(2, 20, "item2")
+        .addRow(4, 40, "item4")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)385)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // Data before EMIT is returned with an OK_NEW_SCHEMA.
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // EMIT comes with an empty batch
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   Repeats t3_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome with no group by
+   */
+  @Test
+  public void t13_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(13, 130, "item13")
+        .addRow(0, 1300, "item13")
+        .addRow(2, 20, "item2")
+        .addRow(0, 2000, "item2")
+        .addRow(4, 40, "item4")
+        .addRow(0, 4000, "item4")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)7509)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   Repeats t4_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome with no group by
+   */
+  @Test
+  public void t14_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(13, 130, "item13")
+        .addRow(0, 0, "item13")
+        .addRow(1, 33000, "item13")
+        .addRow(2, 20, "item2")
+        .addRow(0, 0, "item2")
+        .addRow(1, 11000, "item2")
+        .addRow(4, 40, "item4")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)44211)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   Repeats t5_testStreamingAgrResetsAfterFirstEmitOutcome with no group by
+   */
+  @Test
+  public void t15_testStreamingAgrResetsAfterFirstEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(2, 20, "item2")
+        .addRow(2, 20, "item2")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)11)
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)374)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   Repeats t6_testStreamingAggrOkFollowedByNone with no group by
+   */
+  @Test
+  public void t16_testStreamingAggrOkFollowedByNone() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(2, 20, "item2")
+        .addRow(3, 30, "item3")
+        .addRow(4, 40, "item4")
+        .addRow(4, 40, "item4")
+        .addRow(5, 50, "item5")
+        .addRow(5, 50, "item5")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)11)
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)253)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   Repeats t7_testStreamingAggrMultipleEMITOutcome with no group by
+   */
+  @Test
+  public void t17_testStreamingAggrMultipleEMITOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(2, 20, "item2")
+        .addRow(3, 30, "item3")
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  /**
+   Repeats t8_testStreamingAggrMultipleInputToSingleOutputBatch with no group by
+   */
+  @Test
+  public void t18_testStreamingAggrMultipleInputToSingleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(2, 20, "item2")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)33)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+
+  /**
+   Repeats t9_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome with no group by
+   */
+  @Test
+  public void t19_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(2, 20, "item1")
+        .addRow(13, 130, "item13")
+        .addRow(13, 130, "item13")
+        .addRow(13, 130, "item13")
+        .addRow(130, 1300, "item130")
+        .addRow(0, 0, "item130")
+        .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(23, 230, "item23")
+        .addRow(3, 33, "item3")
+        .addRow(7, 70, "item7")
+        .addRow(17, 170, "item7")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)2445)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   Repeats t10_testStreamingAggrWithEmptyDataSet with no group by
+   */
+  @Test
+  public void t20_testStreamingAggrWithEmptyDataSet() {
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+
 }