You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/08/04 01:01:23 UTC
[drill] 02/02: DRILL-6654: Data verification failure with lateral
unnest query having filter in and order by
This is an automated email from the ASF dual-hosted git repository.
timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 11f3c332bb4b51ad43053cb3b1fad5891bda2132
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Tue Jul 31 15:53:57 2018 -0700
DRILL-6654: Data verification failure with lateral unnest query having filter in and order by
closes #1418
---
.../impl/aggregate/StreamingAggTemplate.java | 2 +
.../impl/agg/TestStreamingAggEmitOutcome.java | 137 +++++++++++++++++++++
2 files changed, 139 insertions(+)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 9165850..f30616b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -136,12 +136,14 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
outcome = out;
return AggOutcome.RETURN_OUTCOME;
case EMIT:
+ outerOutcome = EMIT;
if (incoming.getRecordCount() == 0) {
// When we see an EMIT we let the agg record batch know that it should either
// send out an EMIT or an OK_NEW_SCHEMA, followed by an EMIT. To do that we simply return
// RETURN_AND_RESET with the outcome so the record batch can take care of it.
return setOkAndReturnEmit();
} else {
+ currentIndex = this.getVectorIndex(underlyingIndex);
break outer;
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
index 2183efa..cead984 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
import org.apache.drill.exec.physical.impl.MockRecordBatch;
import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
+import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.test.rowSet.DirectRowSet;
@@ -1164,4 +1165,140 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
}
+ /**
+ Repeats t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2 with no group by
+ */
+ @Test
+ public void t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2() {
+ TupleMetadata inputSchema_sv2 = new SchemaBuilder()
+ .add("id_left", TypeProtos.MinorType.INT)
+ .add("cost_left", TypeProtos.MinorType.INT)
+ .add("name_left", TypeProtos.MinorType.VARCHAR)
+ .withSVMode(BatchSchema.SelectionVectorMode.TWO_BYTE)
+ .buildSchema();
+
+ final RowSet.SingleRowSet emptyRowSet_Sv2 = operatorFixture.rowSetBuilder(inputSchema_sv2)
+ .withSv2()
+ .build();
+
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema_sv2)
+ .addSelection(false, 2, 20, "item2")
+ .addSelection(true, 3, 30, "item3")
+ .withSv2()
+ .build();
+
+ inputContainer.add(emptyRowSet_Sv2.container());
+ inputContainer.add(emptyRowSet_Sv2.container());
+ inputContainer.add(emptyRowSet_Sv2.container());
+ inputContainer.add(emptyRowSet_Sv2.container());
+ inputContainer.add(emptyRowSet_Sv2.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+ inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+ inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+ inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+ inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+ inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, inputContainerSv2, inputContainer.get(0).getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .addRow((long)33)
+ .build();
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(0, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ // For special batch.
+ assertEquals(1, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ nonEmptyInputRowSet2.clear();
+ emptyRowSet_Sv2.clear();
+ expectedRowSet.clear();
+ }
+
+ /**
+ Repeats t22_testStreamingAggrRunsOfEmpty_NonEmpty with no group by
+ */
+ @Test
+ public void t22_testStreamingAggrRunsOfEmpty_NonEmpty() {
+ final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+ .addRow(2, 20, "item2")
+ .build();
+
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(emptyInputRowSet.container());
+ inputContainer.add(nonEmptyInputRowSet2.container());
+
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+ inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+ final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+ .build();
+
+ final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+ inputContainer, inputOutcomes, inputContainer.get(0).getSchema());
+
+ final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+ new ArrayList<NamedExpression>(),
+ parseExprs("sum(id_left+cost_left)", "total_sum"),
+ 1.0f);
+
+ final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+ operatorFixture.getFragmentContext());
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(0, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(0, strAggBatch.getRecordCount());
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+ new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+ assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+ assertEquals(1, strAggBatch.getRecordCount());
+
+ nonEmptyInputRowSet2.clear();
+ expectedRowSet.clear();
+ }
}