You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ti...@apache.org on 2018/08/04 01:01:23 UTC

[drill] 02/02: DRILL-6654: Data verification failure with lateral unnest query having filter in and order by

This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 11f3c332bb4b51ad43053cb3b1fad5891bda2132
Author: Sorabh Hamirwasia <sh...@maprtech.com>
AuthorDate: Tue Jul 31 15:53:57 2018 -0700

    DRILL-6654: Data verification failure with lateral unnest query having filter in and order by
    
    closes #1418
---
 .../impl/aggregate/StreamingAggTemplate.java       |   2 +
 .../impl/agg/TestStreamingAggEmitOutcome.java      | 137 +++++++++++++++++++++
 2 files changed, 139 insertions(+)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 9165850..f30616b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -136,12 +136,14 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
                   outcome = out;
                   return AggOutcome.RETURN_OUTCOME;
                 case EMIT:
+                  outerOutcome = EMIT;
                   if (incoming.getRecordCount() == 0) {
                     // When we see an EMIT we let the  agg record batch know that it should either
                     // send out an EMIT or an OK_NEW_SCHEMA, followed by an EMIT. To do that we simply return
                     // RETURN_AND_RESET with the outcome so the record batch can take care of it.
                     return setOkAndReturnEmit();
                   } else {
+                    currentIndex = this.getVectorIndex(underlyingIndex);
                     break outer;
                   }
 
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
index 2183efa..cead984 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
 import org.apache.drill.exec.physical.impl.MockRecordBatch;
 import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch;
+import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.test.rowSet.DirectRowSet;
@@ -1164,4 +1165,140 @@ public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
   }
 
+  /**
+   Repeats t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2 with no group by
+   */
+  @Test
+  public void t21_testStreamingAggrRunsofEmpty_NonEmpty_Sv2() {
+    TupleMetadata inputSchema_sv2 = new SchemaBuilder()
+      .add("id_left", TypeProtos.MinorType.INT)
+      .add("cost_left", TypeProtos.MinorType.INT)
+      .add("name_left", TypeProtos.MinorType.VARCHAR)
+      .withSVMode(BatchSchema.SelectionVectorMode.TWO_BYTE)
+      .buildSchema();
+
+    final RowSet.SingleRowSet emptyRowSet_Sv2 = operatorFixture.rowSetBuilder(inputSchema_sv2)
+      .withSv2()
+      .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema_sv2)
+      .addSelection(false, 2, 20, "item2")
+      .addSelection(true, 3, 30, "item3")
+      .withSv2()
+      .build();
+
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(emptyRowSet_Sv2.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(emptyRowSet_Sv2.getSv2());
+    inputContainerSv2.add(nonEmptyInputRowSet2.getSv2());
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, inputContainerSv2, inputContainer.get(0).getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      new ArrayList<NamedExpression>(),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+      .addRow((long)33)
+      .build();
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // For special batch.
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    nonEmptyInputRowSet2.clear();
+    emptyRowSet_Sv2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   Repeats t22_testStreamingAggrRunsOfEmpty_NonEmpty with no group by
+   */
+  @Test
+  public void t22_testStreamingAggrRunsOfEmpty_NonEmpty() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+      .addRow(2, 20, "item2")
+      .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+      .build();
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+      inputContainer, inputOutcomes, inputContainer.get(0).getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+      new ArrayList<NamedExpression>(),
+      parseExprs("sum(id_left+cost_left)", "total_sum"),
+      1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+      operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
 }