You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/08/01 18:36:13 UTC
[drill] 02/03: DRILL-6631: Streaming agg causes queries with
Lateral and Unnest to return incorrect results.
This is an automated email from the ASF dual-hosted git repository.
timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit ee841643d0f2d746126f623d6a0c480b3011d38f
Author: Parth Chandra <pa...@apache.org>
AuthorDate: Fri Jul 20 17:24:38 2018 -0700
DRILL-6631: Streaming agg causes queries with Lateral and Unnest to return incorrect results.
This commit fixes issues with handling straight aggregates (no group by)
with empty batches received between EMIT(s).
closes #1399
---
.../physical/impl/aggregate/StreamingAggBatch.java | 39 +-
.../impl/aggregate/StreamingAggTemplate.java | 2 +-
.../impl/agg/TestStreamingAggEmitOutcome.java | 553 +++++++++++++++++++++
3 files changed, 573 insertions(+), 21 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 882c36d..70880c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -188,16 +188,19 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
public IterOutcome innerNext() {
// if a special batch has been sent, we have no data in the incoming so exit early
- if ( done || specialBatchSent) {
+ if (done || specialBatchSent) {
+ assert (sendEmit != true); // if special batch sent with emit then flag will not be set
return NONE;
}
// We sent an OK_NEW_SCHEMA and also encountered the end of a data set. So we need to send
// an EMIT with an empty batch now
if (sendEmit) {
+ first = false; // first is set only in the case when we see a NONE after an empty first (and only) batch
sendEmit = false;
firstBatchForDataSet = true;
recordCount = 0;
+ specialBatchSent = false;
return EMIT;
}
@@ -212,15 +215,19 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
logger.debug("Next outcome of {}", lastKnownOutcome);
switch (lastKnownOutcome) {
case NONE:
- if (firstBatchForDataSet && popConfig.getKeys().size() == 0) {
+
+ if (first && popConfig.getKeys().size() == 0) {
// if we have a straight aggregate and empty input batch, we need to handle it in a different way
+ // Wewant to produce the special batch only if we got a NONE as the first outcome after
+ // OK_NEW_SCHEMA. If we get a NONE immediately after we see an EMIT, then we have already handled
+ // the case of the empty batch
constructSpecialBatch();
// set state to indicate the fact that we have sent a special batch and input is empty
specialBatchSent = true;
// If outcome is NONE then we send the special batch in the first iteration and the NONE
// outcome in the next iteration. If outcome is EMIT, we can send the special
// batch and the EMIT outcome at the same time.
- return getFinalOutcome();
+ return IterOutcome.OK;
}
// else fall thru
case OUT_OF_MEMORY:
@@ -238,13 +245,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
// we have to do the special handling
if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && incoming.getRecordCount() == 0) {
constructSpecialBatch();
- // set state to indicate the fact that we have sent a special batch and input is empty
- specialBatchSent = true;
firstBatchForDataSet = true; // reset on the next iteration
// If outcome is NONE then we send the special batch in the first iteration and the NONE
// outcome in the next iteration. If outcome is EMIT, we can send the special
- // batch and the EMIT outcome at the same time.
- return getFinalOutcome();
+ // batch and the EMIT outcome at the same time. (unless the finalOutcome is OK_NEW_SCHEMA)
+ IterOutcome finalOutcome = getFinalOutcome();
+ return finalOutcome;
}
// else fall thru
case OK:
@@ -269,13 +275,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
}
}
- // We sent an EMIT in the previous iteration, so we must be starting a new data set
- if (firstBatchForDataSet) {
- done = false;
- sendEmit = false;
- specialBatchSent = false;
- firstBatchForDataSet = false;
- }
}
AggOutcome aggOutcome = aggregator.doWork(lastKnownOutcome);
recordCount = aggregator.getOutputCount();
@@ -296,14 +295,15 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && recordCount == 0) {
// if we have a straight aggregate and empty input batch, we need to handle it in a different way
constructSpecialBatch();
- // set state to indicate the fact that we have sent a special batch and input is empty
- specialBatchSent = true;
// If outcome is NONE then we send the special batch in the first iteration and the NONE
// outcome in the next iteration. If outcome is EMIT, we can send the special
// batch and the EMIT outcome at the same time.
- return getFinalOutcome();
+
+ IterOutcome finalOutcome = getFinalOutcome();
+ return finalOutcome;
}
firstBatchForDataSet = true;
+ firstBatchForSchema = false;
if(first) {
first = false;
}
@@ -332,9 +332,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
} else if (lastKnownOutcome == OK && first) {
lastKnownOutcome = OK_NEW_SCHEMA;
- } else if (lastKnownOutcome != IterOutcome.OUT_OF_MEMORY) {
- first = false;
}
+ first = false;
return lastKnownOutcome;
case UPDATE_AGGREGATOR:
// We could get this either between data sets or within a data set.
@@ -629,12 +628,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
if (firstBatchForSchema) {
outcomeToReturn = OK_NEW_SCHEMA;
+ sendEmit = true;
firstBatchForSchema = false;
} else if (lastKnownOutcome == EMIT) {
firstBatchForDataSet = true;
outcomeToReturn = EMIT;
} else {
- // get the outcome to return before calling refresh since that resets the lastKnowOutcome to OK
outcomeToReturn = (recordCount == 0) ? NONE : OK;
}
return outcomeToReturn;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index a752c7e..9165850 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -40,7 +40,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
// First batch after build schema phase
private boolean first = true;
- private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA.
+ private boolean firstBatchForSchema = false; // true if the current batch came in with an OK_NEW_SCHEMA.
private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set
private boolean newSchema = false;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
index 75c4598..2183efa 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.agg;
import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
@@ -33,6 +34,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import java.util.ArrayList;
+
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
import static org.junit.Assert.assertEquals;
@@ -42,6 +45,7 @@ import static org.junit.Assert.assertTrue;
public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestStreamingAggEmitOutcome.class);
protected static TupleMetadata resultSchema;
+ protected static TupleMetadata resultSchemaNoGroupBy;
@BeforeClass
public static void setUpBeforeClass2() throws Exception {
@@ -49,6 +53,9 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
.add("name", TypeProtos.MinorType.VARCHAR)
.addNullable("total_sum", TypeProtos.MinorType.BIGINT)
.buildSchema();
+ resultSchemaNoGroupBy = new SchemaBuilder()
+ .addNullable("total_sum", TypeProtos.MinorType.BIGINT)
+ .buildSchema();
}
/**
@@ -611,4 +618,550 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
}
+
+ /*******************************************************
+ * Tests for EMIT with empty batches and no group by
+ * (Tests t1-t8 are repeated with no group by)
+ *******************************************************/
+
+
+ /**
+ * Repeats t1_testStreamingAggrEmptyBatchEmitOutcome with no group by
+ */
+ @Test
+ public void t11_testStreamingAggrEmptyBatchEmitOutcome() {
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(OK_NEW_SCHEMA);
+ inputOutcomes.add(EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+ }
+
+ /**
+ Repeats t2_testStreamingAggrNonEmptyBatchEmitOutcome with no group by
+ */
+ @Test
+ public void t12_testStreamingAggrNonEmptyBatchEmitOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(13, 130, "item13")
+ .addRow(13, 130, "item13")
+ .addRow(2, 20, "item2")
+ .addRow(2, 20, "item2")
+ .addRow(4, 40, "item4")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .addRow((long)385)
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // Data before EMIT is returned with an OK_NEW_SCHEMA.
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ // EMIT comes with an empty batch
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+ // Release memory for row sets
+ nonEmptyInputRowSet2.clear();
+ expectedRowSet.clear();
+ }
+
+ /**
+ Repeats t3_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome with no group by
+ */
+ @Test
+ public void t13_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(13, 130, "item13")
+ .addRow(0, 1300, "item13")
+ .addRow(2, 20, "item2")
+ .addRow(0, 2000, "item2")
+ .addRow(4, 40, "item4")
+ .addRow(0, 4000, "item4")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .addRow((long)7509)
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+ // Release memory for row sets
+ nonEmptyInputRowSet2.clear();
+ expectedRowSet.clear();
+ }
+
+ /**
+ Repeats t4_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome with no group by
+ */
+ @Test
+ public void t14_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(13, 130, "item13")
+ .addRow(0, 0, "item13")
+ .addRow(1, 33000, "item13")
+ .addRow(2, 20, "item2")
+ .addRow(0, 0, "item2")
+ .addRow(1, 11000, "item2")
+ .addRow(4, 40, "item4")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .addRow((long)44211)
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+ // Release memory for row sets
+ nonEmptyInputRowSet2.clear();
+ expectedRowSet.clear();
+ }
+
+ /**
+ Repeats t5_testStreamingAgrResetsAfterFirstEmitOutcome with no group by
+ */
+ @Test
+ public void t15_testStreamingAgrResetsAfterFirstEmitOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(2, 20, "item2")
+ .addRow(3, 30, "item3")
+ .addRow(3, 30, "item3")
+ .addRow(3, 30, "item3")
+ .addRow(3, 30, "item3")
+ .addRow(3, 30, "item3")
+ .addRow(3, 30, "item3")
+ .addRow(3, 30, "item3")
+ .addRow(3, 30, "item3")
+ .addRow(3, 30, "item3")
+ .addRow(3, 30, "item3")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .addRow((long)11)
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .addRow((long)374)
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, strAggBatch.getRecordCount());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+ // Release memory for row sets
+ nonEmptyInputRowSet2.clear();
+ expectedRowSet2.clear();
+ expectedRowSet1.clear();
+ }
+
+ /**
+ Repeats t6_testStreamingAggrOkFollowedByNone with no group by
+ */
+ @Test
+ public void t16_testStreamingAggrOkFollowedByNone() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(3, 30, "item3")
+ .addRow(4, 40, "item4")
+ .addRow(4, 40, "item4")
+ .addRow(5, 50, "item5")
+ .addRow(5, 50, "item5")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .addRow((long)11)
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .addRow((long)253)
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, strAggBatch.getRecordCount());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+ // Release memory for row sets
+ nonEmptyInputRowSet2.clear();
+ expectedRowSet2.clear();
+ expectedRowSet1.clear();
+ }
+
+ /**
+ Repeats t7_testStreamingAggrMultipleEMITOutcome with no group by
+ */
+ @Test
+ public void t17_testStreamingAggrMultipleEMITOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .addRow(3, 30, "item3")
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(1, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+ nonEmptyInputRowSet2.clear();
+ }
+
+ /**
+ Repeats t8_testStreamingAggrMultipleInputToSingleOutputBatch with no group by
+ */
+ @Test
+ public void t18_testStreamingAggrMultipleInputToSingleOutputBatch() {
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .addRow((long)33)
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, strAggBatch.getRecordCount());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+ nonEmptyInputRowSet2.clear();
+ }
+
+
+ /**
+ Repeats t9_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome with no group by
+ */
+ @Test
+ public void t19_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item1")
+ .addRow(13, 130, "item13")
+ .addRow(13, 130, "item13")
+ .addRow(13, 130, "item13")
+ .addRow(130, 1300, "item130")
+ .addRow(0, 0, "item130")
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(23, 230, "item23")
+ .addRow(3, 33, "item3")
+ .addRow(7, 70, "item7")
+ .addRow(17, 170, "item7")
+ .build();
+
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .addRow((long)2445)
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet3.container());
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+ nonEmptyInputRowSet2.clear();
+ nonEmptyInputRowSet3.clear();
+ expectedRowSet.clear();
+ }
+
+ /**
+ Repeats t10_testStreamingAggrWithEmptyDataSet with no group by
+ */
+ @Test
+ public void t20_testStreamingAggrWithEmptyDataSet() {
+ inputContainer.add(emptyInputRowSet.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+ }
+
}