You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/08/04 01:01:21 UTC

[drill] branch master updated (7c35326 -> 11f3c33)

This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git.


    from 7c35326  DRILL-6655: Require package declaration in files.
     new 024fd8a  DRILL-6645: Transform TopN in Lateral Unnest pipeline to Sort and Limit.
     new 11f3c33  DRILL-6654: Data verification failure with lateral unnest query having filter in and order by

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../impl/aggregate/StreamingAggTemplate.java       |   2 +
 .../drill/exec/planner/physical/AggPrelBase.java   |   2 +-
 .../drill/exec/planner/physical/FilterPrel.java    |   2 +-
 .../drill/exec/planner/physical/LimitPrel.java     |   2 +-
 .../apache/drill/exec/planner/physical/Prel.java   |   8 +-
 .../drill/exec/planner/physical/ProjectPrel.java   |   2 +-
 .../physical/SelectionVectorRemoverPrel.java       |   2 +-
 .../drill/exec/planner/physical/SortPrel.java      |   2 +-
 .../drill/exec/planner/physical/TopNPrel.java      |  18 ++-
 .../drill/exec/planner/physical/UnnestPrel.java    |   2 +-
 .../visitor/LateralUnnestRowIDVisitor.java         |   6 +-
 .../impl/agg/TestStreamingAggEmitOutcome.java      | 137 +++++++++++++++++++++
 12 files changed, 172 insertions(+), 13 deletions(-)


[drill] 01/02: DRILL-6645: Transform TopN in Lateral Unnest pipeline to Sort and Limit.

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 024fd8aa43e413bfa5a0f84f6c66b5402788b2bc
Author: Hanumath Rao Maduri <ha...@gmail.com>
AuthorDate: Wed Aug 1 15:41:29 2018 -0700

    DRILL-6645: Transform TopN in Lateral Unnest pipeline to Sort and Limit.
    
    closes #1417
---
 .../drill/exec/planner/physical/AggPrelBase.java       |  2 +-
 .../apache/drill/exec/planner/physical/FilterPrel.java |  2 +-
 .../apache/drill/exec/planner/physical/LimitPrel.java  |  2 +-
 .../org/apache/drill/exec/planner/physical/Prel.java   |  8 +++++++-
 .../drill/exec/planner/physical/ProjectPrel.java       |  2 +-
 .../planner/physical/SelectionVectorRemoverPrel.java   |  2 +-
 .../apache/drill/exec/planner/physical/SortPrel.java   |  2 +-
 .../apache/drill/exec/planner/physical/TopNPrel.java   | 18 ++++++++++++++++--
 .../apache/drill/exec/planner/physical/UnnestPrel.java |  2 +-
 .../physical/visitor/LateralUnnestRowIDVisitor.java    |  6 +++---
 10 files changed, 33 insertions(+), 13 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index a4f51f3..ca68a7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -189,7 +189,7 @@ public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     List<Integer> groupingCols = Lists.newArrayList();
     groupingCols.add(0);
     for (int groupingCol : groupSet.asList()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
index 1c9112c..33c2944 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrel.java
@@ -85,7 +85,7 @@ public class FilterPrel extends DrillFilterRelBase implements Prel {
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     RexBuilder builder = this.getCluster().getRexBuilder();
     // right shift the previous field indices.
     return (Prel) this.copy(this.traitSet, children.get(0), DrillRelOptUtil.transformExpr(builder,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index 057cfae..ccbff17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -111,7 +111,7 @@ public class LimitPrel extends DrillLimitRelBase implements Prel {
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     return new LimitPrel(this.getCluster(), this.traitSet, children.get(0), getOffset(), getFetch(), isPushDown(), true);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
index b72aff7..01d8e9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java
@@ -56,7 +56,13 @@ public interface Prel extends DrillRelNode, Iterable<Prel> {
   SelectionVectorMode getEncoding();
   boolean needsFinalColumnReordering();
 
-  default Prel addImplicitRowIDCol(List<RelNode> children) {
+  /**
+   * If the operator is in Lateral/Unnest pipeline, then it generates a new operator which knows how to process
+   * the rows accordingly during execution.
+   * eg: TopNPrel -> SortPrel and LimitPrel
+   * Other operators like FilterPrel, ProjectPrel etc will add an implicit row id to the output.
+   */
+  default Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     throw new UnsupportedOperationException("Adding Implicit RowID column is not supported for " +
             this.getClass().getSimpleName() + " operator ");
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
index 0a9e8bf..4d5de20 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrel.java
@@ -136,7 +136,7 @@ public class ProjectPrel extends DrillProjectRelBase implements Prel{
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     RelDataTypeFactory typeFactory = this.getCluster().getTypeFactory();
     RexBuilder builder = this.getCluster().getRexBuilder();
     List<RexNode> projects = Lists.newArrayList();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
index a4cd921..3fad017 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SelectionVectorRemoverPrel.java
@@ -55,7 +55,7 @@ public class SelectionVectorRemoverPrel extends SinglePrel{
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     return (Prel) this.copy(this.traitSet, children);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index 686e04a..aa7158a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -124,7 +124,7 @@ public class SortPrel extends org.apache.calcite.rel.core.Sort implements Prel {
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     List<RelFieldCollation> relFieldCollations = Lists.newArrayList();
     relFieldCollations.add(new RelFieldCollation(0,
                             RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
index 9bdcad0..3e407f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/TopNPrel.java
@@ -18,11 +18,14 @@
 package org.apache.drill.exec.planner.physical;
 
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.List;
 
 import com.google.common.collect.Lists;
 import org.apache.calcite.rel.RelCollationImpl;
 import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.TopN;
 import org.apache.drill.exec.planner.cost.DrillCostBase;
@@ -101,7 +104,7 @@ public class TopNPrel extends SinglePrel {
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     List<RelFieldCollation> relFieldCollations = Lists.newArrayList();
     relFieldCollations.add(new RelFieldCollation(0,
                           RelFieldCollation.Direction.ASCENDING, RelFieldCollation.NullDirection.FIRST));
@@ -115,6 +118,17 @@ public class TopNPrel extends SinglePrel {
                                     .replace(this.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE))
                                     .replace(collationTrait)
                                     .replace(DRILL_PHYSICAL);
-    return (Prel) this.copy(traits, children);
+    return transformTopNToSortAndLimit(children, traits, collationTrait);
+  }
+
+  private Prel transformTopNToSortAndLimit(List<RelNode> children, RelTraitSet traits, RelCollation collationTrait) {
+    SortPrel sortprel = new SortPrel(this.getCluster(), traits, children.get(0), collationTrait);
+    RexNode offset = this.getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(0),
+            this.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+    RexNode limit = this.getCluster().getRexBuilder().makeExactLiteral(BigDecimal.valueOf(this.limit),
+            this.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER));
+    //SMEX is not needed here because Lateral/Unnest pipeline doesn't support exchanges.
+    LimitPrel limitPrel = new LimitPrel(this.getCluster(), traits, sortprel, offset, limit, false, true);
+    return limitPrel;
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
index 2331138..274f27a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnnestPrel.java
@@ -86,7 +86,7 @@ public class UnnestPrel extends DrillUnnestRelBase implements Prel {
   }
 
   @Override
-  public Prel addImplicitRowIDCol(List<RelNode> children) {
+  public Prel prepareForLateralUnnestPipeline(List<RelNode> children) {
     RelDataTypeFactory typeFactory = this.getCluster().getTypeFactory();
     List<String> fieldNames = new ArrayList<>();
     List<RelDataType> fieldTypes = new ArrayList<>();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java
index 4692202..dc4af5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/LateralUnnestRowIDVisitor.java
@@ -43,7 +43,7 @@ public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, Ru
   public Prel visitPrel(Prel prel, Boolean isRightOfLateral) throws RuntimeException {
     List<RelNode> children = getChildren(prel, isRightOfLateral);
     if (isRightOfLateral) {
-      return prel.addImplicitRowIDCol(children);
+      return prel.prepareForLateralUnnestPipeline(children);
     } else {
       return (Prel) prel.copy(prel.getTraitSet(), children);
     }
@@ -61,7 +61,7 @@ public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, Ru
   @Override
   public Prel visitLateral(LateralJoinPrel prel, Boolean value) throws RuntimeException {
     List<RelNode> children = Lists.newArrayList();
-    children.add(((Prel)prel.getInput(0)).accept(this, false));
+    children.add(((Prel)prel.getInput(0)).accept(this, value));
     children.add(((Prel) prel.getInput(1)).accept(this, true));
 
     return (Prel) prel.copy(prel.getTraitSet(), children);
@@ -69,6 +69,6 @@ public class LateralUnnestRowIDVisitor extends BasePrelVisitor<Prel, Boolean, Ru
 
   @Override
   public Prel visitUnnest(UnnestPrel prel, Boolean value) throws RuntimeException {
-    return prel.addImplicitRowIDCol(null);
+    return prel.prepareForLateralUnnestPipeline(null);
   }
 }


[drill] 02/02: DRILL-6654: Data verification failure with lateral unnest query having filter in and order by

Posted by ti...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 11f3c332bb4b51ad43053cb3b1fad5891bda2132
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Tue Jul 31 15:53:57 2018 -0700

    DRILL-6654: Data verification failure with lateral unnest query having filter in and order by
    
    closes #1418
---
 .../impl/aggregate/StreamingAggTemplate.java       |   2 +
 .../impl/agg/TestStreamingAggEmitOutcome.java      | 137 +++++++++++++++++++++
 2 files changed, 139 insertions(+)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 9165850..f30616b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -136,12 +136,14 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                   outcome = out;
                   return AggOutcome.RETURN_OUTCOME;
                 case EMIT:
+                  outerOutcome = EMIT;
                   if (incoming.getRecordCount() == 0) {
                     // When we see an EMIT we let the  agg record batch know that it should either
                     // send out an EMIT or an OK_NEW_SCHEMA, followed by an EMIT. To do that we simply return
                     // RETURN_AND_RESET with the outcome so the record batch can take care of it.
                     return setOkAndReturnEmit();
                   } else {
+                    currentIndex = this.getVectorIndex(underlyingIndex);
                     break outer;
                   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
index 2183efa..cead984 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.rowSet.DirectRowSet;
@@ -1164,4 +1165,140 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
   }
 
+  /**
+   Repeats t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2 with no group by
+   */
+  @Test
+  public void t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2() {
+    TupleMetadata inputSchema_sv2 = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .withSVMode(BatchSchema.SelectionVectorMode.TWO_BYTE)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyRowSet_Sv2 = operatorFixture.rowSetBuilder(inputSchema_sv2)
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema_sv2)
+      .addSelection(false, 2, 20, "item2")
+      .addSelection(true, 3, 30, "item3")
+      .withSv2()
+      .build();
+
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, inputContainerSv2, inputContainer.get(0).getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      new ArrayList<NamedExpression>(),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+      .addRow((long)33)
+      .build();
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // For special batch.
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    nonEmptyInputRowSet2.clear();
+    emptyRowSet_Sv2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   Repeats t22_testStreamingAggrRunsOfEmpty_NonEmpty with no group by
+   */
+  @Test
+  public void t22_testStreamingAggrRunsOfEmpty_NonEmpty() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+      .build();
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, inputContainer.get(0).getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      new ArrayList<NamedExpression>(),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
 }