You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@drill.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/08/01 18:37:01 UTC

[jira] [Commented] (DRILL-6631) Wrong result from LateralUnnest query with aggregation and order by

    [ https://issues.apache.org/jira/browse/DRILL-6631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565795#comment-16565795 ] 

ASF GitHub Bot commented on DRILL-6631:
---------------------------------------

ilooner closed pull request #1399: DRILL-6631: Streaming agg causes queries with Lateral and Unnest to r…
URL: https://github.com/apache/drill/pull/1399
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 882c36d746d..70880c62efa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -188,16 +188,19 @@ public void buildSchema() throws SchemaChangeException {
   public IterOutcome innerNext() {
 
     // if a special batch has been sent, we have no data in the incoming so exit early
-    if ( done || specialBatchSent) {
+    if (done || specialBatchSent) {
+      assert (sendEmit != true); // if special batch sent with emit then flag will not be set
       return NONE;
     }
 
     // We sent an OK_NEW_SCHEMA and also encountered the end of a data set. So we need to send
     // an EMIT with an empty batch now
     if (sendEmit) {
+      first = false; // first is set only in the case when we see a NONE after an empty first (and only) batch
       sendEmit = false;
       firstBatchForDataSet = true;
       recordCount = 0;
+      specialBatchSent = false;
       return EMIT;
     }
 
@@ -212,15 +215,19 @@ public IterOutcome innerNext() {
       logger.debug("Next outcome of {}", lastKnownOutcome);
       switch (lastKnownOutcome) {
         case NONE:
-          if (firstBatchForDataSet && popConfig.getKeys().size() == 0) {
+
+          if (first && popConfig.getKeys().size() == 0) {
             // if we have a straight aggregate and empty input batch, we need to handle it in a different way
+            // Wewant to produce the special batch only if we got a NONE as the first outcome after
+            // OK_NEW_SCHEMA. If we get a NONE immediately after we see an EMIT, then we have already handled
+            // the case of the empty batch
             constructSpecialBatch();
             // set state to indicate the fact that we have sent a special batch and input is empty
             specialBatchSent = true;
             // If outcome is NONE then we send the special batch in the first iteration and the NONE
             // outcome in the next iteration. If outcome is EMIT, we can send the special
             // batch and the EMIT outcome at the same time.
-            return getFinalOutcome();
+            return IterOutcome.OK;
           }
           // else fall thru
         case OUT_OF_MEMORY:
@@ -238,13 +245,12 @@ public IterOutcome innerNext() {
           // we have to do the special handling
           if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && incoming.getRecordCount() == 0) {
             constructSpecialBatch();
-            // set state to indicate the fact that we have sent a special batch and input is empty
-            specialBatchSent = true;
             firstBatchForDataSet = true; // reset on the next iteration
             // If outcome is NONE then we send the special batch in the first iteration and the NONE
             // outcome in the next iteration. If outcome is EMIT, we can send the special
-            // batch and the EMIT outcome at the same time.
-            return getFinalOutcome();
+            // batch and the EMIT outcome at the same time. (unless the finalOutcome is OK_NEW_SCHEMA)
+            IterOutcome finalOutcome =  getFinalOutcome();
+            return finalOutcome;
           }
           // else fall thru
         case OK:
@@ -269,13 +275,6 @@ public IterOutcome innerNext() {
           }
         }
       }
-      // We sent an EMIT in the previous iteration, so we must be starting a new data set
-      if (firstBatchForDataSet) {
-        done = false;
-        sendEmit = false;
-        specialBatchSent = false;
-        firstBatchForDataSet = false;
-      }
     }
     AggOutcome aggOutcome = aggregator.doWork(lastKnownOutcome);
     recordCount = aggregator.getOutputCount();
@@ -296,14 +295,15 @@ public IterOutcome innerNext() {
         if (firstBatchForDataSet && popConfig.getKeys().size() == 0 && recordCount == 0) {
           // if we have a straight aggregate and empty input batch, we need to handle it in a different way
           constructSpecialBatch();
-          // set state to indicate the fact that we have sent a special batch and input is empty
-          specialBatchSent = true;
           // If outcome is NONE then we send the special batch in the first iteration and the NONE
           // outcome in the next iteration. If outcome is EMIT, we can send the special
           // batch and the EMIT outcome at the same time.
-          return getFinalOutcome();
+
+          IterOutcome finalOutcome =  getFinalOutcome();
+          return finalOutcome;
         }
         firstBatchForDataSet = true;
+        firstBatchForSchema = false;
         if(first) {
           first = false;
         }
@@ -332,9 +332,8 @@ public IterOutcome innerNext() {
           }
         } else if (lastKnownOutcome == OK && first) {
           lastKnownOutcome = OK_NEW_SCHEMA;
-        } else if (lastKnownOutcome != IterOutcome.OUT_OF_MEMORY) {
-          first = false;
         }
+        first = false;
         return lastKnownOutcome;
       case UPDATE_AGGREGATOR:
         // We could get this either between data sets or within a data set.
@@ -629,12 +628,12 @@ private IterOutcome getFinalOutcome() {
     }
     if (firstBatchForSchema) {
       outcomeToReturn = OK_NEW_SCHEMA;
+      sendEmit = true;
       firstBatchForSchema = false;
     } else if (lastKnownOutcome == EMIT) {
       firstBatchForDataSet = true;
       outcomeToReturn = EMIT;
     } else {
-      // get the outcome to return before calling refresh since that resets the lastKnowOutcome to OK
       outcomeToReturn = (recordCount == 0) ? NONE : OK;
     }
     return outcomeToReturn;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index a752c7e7fcf..916585088f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -40,7 +40,7 @@
 
   // First batch after build schema phase
   private boolean first = true;
-  private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA.
+  private boolean firstBatchForSchema = false; // true if the current batch came in with an OK_NEW_SCHEMA.
   private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set
 
   private boolean newSchema = false;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
index 75c4598baf2..2183efa1db5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestStreamingAggEmitOutcome.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.physical.impl.agg;
 
 import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
 import org.apache.drill.exec.physical.impl.BaseTestOpBatchEmitOutcome;
@@ -33,6 +34,8 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.util.ArrayList;
+
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
 import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
 import static org.junit.Assert.assertEquals;
@@ -42,6 +45,7 @@
 public class TestStreamingAggEmitOutcome extends BaseTestOpBatchEmitOutcome {
   //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestStreamingAggEmitOutcome.class);
   protected static TupleMetadata resultSchema;
+  protected static TupleMetadata resultSchemaNoGroupBy;
 
   @BeforeClass
   public static void setUpBeforeClass2() throws Exception {
@@ -49,6 +53,9 @@ public static void setUpBeforeClass2() throws Exception {
         .add("name", TypeProtos.MinorType.VARCHAR)
         .addNullable("total_sum", TypeProtos.MinorType.BIGINT)
         .buildSchema();
+    resultSchemaNoGroupBy = new SchemaBuilder()
+        .addNullable("total_sum", TypeProtos.MinorType.BIGINT)
+        .buildSchema();
   }
 
   /**
@@ -611,4 +618,550 @@ public void t10_testStreamingAggrWithEmptyDataSet() {
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
     assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
   }
+
+  /*******************************************************
+   * Tests for EMIT with empty batches and no group by
+   * (Tests t1-t8 are repeated with no group by)
+   *******************************************************/
+
+
+  /**
+   * Repeats t1_testStreamingAggrEmptyBatchEmitOutcome with no group by
+   */
+  @Test
+  public void t11_testStreamingAggrEmptyBatchEmitOutcome() {
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(OK_NEW_SCHEMA);
+    inputOutcomes.add(EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+
+  /**
+   Repeats t2_testStreamingAggrNonEmptyBatchEmitOutcome with no group by
+   */
+  @Test
+  public void t12_testStreamingAggrNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(13, 130, "item13")
+        .addRow(13, 130, "item13")
+        .addRow(2, 20, "item2")
+        .addRow(2, 20, "item2")
+        .addRow(4, 40, "item4")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)385)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    // Data before EMIT is returned with an OK_NEW_SCHEMA.
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    // EMIT comes with an empty batch
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   Repeats t3_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome with no group by
+   */
+  @Test
+  public void t13_testStreamingAggrEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(13, 130, "item13")
+        .addRow(0, 1300, "item13")
+        .addRow(2, 20, "item2")
+        .addRow(0, 2000, "item2")
+        .addRow(4, 40, "item4")
+        .addRow(0, 4000, "item4")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)7509)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   Repeats t4_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome with no group by
+   */
+  @Test
+  public void t14_testStreamingAggrMultipleEmptyBatchFollowedByNonEmptyBatchEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(13, 130, "item13")
+        .addRow(0, 0, "item13")
+        .addRow(1, 33000, "item13")
+        .addRow(2, 20, "item2")
+        .addRow(0, 0, "item2")
+        .addRow(1, 11000, "item2")
+        .addRow(4, 40, "item4")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)44211)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   Repeats t5_testStreamingAgrResetsAfterFirstEmitOutcome with no group by
+   */
+  @Test
+  public void t15_testStreamingAgrResetsAfterFirstEmitOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(2, 20, "item2")
+        .addRow(2, 20, "item2")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .addRow(3, 30, "item3")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)11)
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)374)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   Repeats t6_testStreamingAggrOkFollowedByNone with no group by
+   */
+  @Test
+  public void t16_testStreamingAggrOkFollowedByNone() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(2, 20, "item2")
+        .addRow(3, 30, "item3")
+        .addRow(4, 40, "item4")
+        .addRow(4, 40, "item4")
+        .addRow(5, 50, "item5")
+        .addRow(5, 50, "item5")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet1 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)11)
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet2 = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)253)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet1).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet2).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    // Release memory for row sets
+    nonEmptyInputRowSet2.clear();
+    expectedRowSet2.clear();
+    expectedRowSet1.clear();
+  }
+
+  /**
+   Repeats t7_testStreamingAggrMultipleEMITOutcome with no group by
+   */
+  @Test
+  public void t17_testStreamingAggrMultipleEMITOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(2, 20, "item2")
+        .addRow(3, 30, "item3")
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(1, strAggBatch.getRecordCount());
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+  /**
+   Repeats t8_testStreamingAggrMultipleInputToSingleOutputBatch with no group by
+   */
+  @Test
+  public void t18_testStreamingAggrMultipleInputToSingleOutputBatch() {
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(2, 20, "item2")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)33)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.EMIT);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.EMIT);
+    assertEquals(0, strAggBatch.getRecordCount());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+  }
+
+
+  /**
+   Repeats t9_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome with no group by
+   */
+  @Test
+  public void t19_testStreamingAgr_WithEmptyNonEmptyBatchesAndOKOutcome() {
+    final RowSet.SingleRowSet nonEmptyInputRowSet2 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(2, 20, "item1")
+        .addRow(13, 130, "item13")
+        .addRow(13, 130, "item13")
+        .addRow(13, 130, "item13")
+        .addRow(130, 1300, "item130")
+        .addRow(0, 0, "item130")
+        .build();
+
+    final RowSet.SingleRowSet nonEmptyInputRowSet3 = operatorFixture.rowSetBuilder(inputSchema)
+        .addRow(23, 230, "item23")
+        .addRow(3, 33, "item3")
+        .addRow(7, 70, "item7")
+        .addRow(17, 170, "item7")
+        .build();
+
+    final RowSet.SingleRowSet expectedRowSet = operatorFixture.rowSetBuilder(resultSchemaNoGroupBy)
+        .addRow((long)2445)
+        .build();
+
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet2.container());
+    inputContainer.add(emptyInputRowSet.container());
+    inputContainer.add(nonEmptyInputRowSet3.container());
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+    inputOutcomes.add(RecordBatch.IterOutcome.OK);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertEquals(1, strAggBatch.getRecordCount());
+
+    RowSet actualRowSet = DirectRowSet.fromContainer(strAggBatch.getContainer());
+    new RowSetComparison(expectedRowSet).verify(actualRowSet);
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+
+    nonEmptyInputRowSet2.clear();
+    nonEmptyInputRowSet3.clear();
+    expectedRowSet.clear();
+  }
+
+  /**
+   Repeats t10_testStreamingAggrWithEmptyDataSet with no group by
+   */
+  @Test
+  public void t20_testStreamingAggrWithEmptyDataSet() {
+    inputContainer.add(emptyInputRowSet.container());
+
+    inputOutcomes.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+    final MockRecordBatch mockInputBatch = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext,
+        inputContainer, inputOutcomes, emptyInputRowSet.container().getSchema());
+
+    final StreamingAggregate streamAggrConfig = new StreamingAggregate(null,
+        new ArrayList<NamedExpression>(),
+        parseExprs("sum(id_left+cost_left)", "total_sum"),
+        1.0f);
+
+    final StreamingAggBatch strAggBatch = new StreamingAggBatch(streamAggrConfig, mockInputBatch,
+        operatorFixture.getFragmentContext());
+
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.OK);
+    assertTrue(strAggBatch.next() == RecordBatch.IterOutcome.NONE);
+  }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Wrong result from LateralUnnest query with aggregation and order by
> -------------------------------------------------------------------
>
>                 Key: DRILL-6631
>                 URL: https://issues.apache.org/jira/browse/DRILL-6631
>             Project: Apache Drill
>          Issue Type: Bug
>    Affects Versions: 1.14.0
>            Reporter: Parth Chandra
>            Assignee: Parth Chandra
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.15.0
>
>
> Reported by Chun:
> The following query gives correct result:
> {noformat}
> 0: jdbc:drill:zk=10.10.30.166:5181> select customer.c_custkey, customer.c_name, orders.totalprice from customer, lateral (select sum(t.o.o_totalprice) as totalprice from unnest(customer.c_orders) t(o) WHERE t.o.o_totalprice in (89230.03,270087.44,246408.53,82657.72,153941.38,65277.06,180309.76)) orders where customer.c_custkey = 101276;
> +------------+---------------------+-------------+
> | c_custkey  |       c_name        | totalprice  |
> +------------+---------------------+-------------+
> | 101276     | Customer#000101276  | 82657.72    |
> +------------+---------------------+-------------+
> 1 row selected (6.184 seconds)
> {noformat}
> But if I remove the where clause and replace it with order by and limit, I got the following empty result set. This is wrong.
> {noformat}
> 0: jdbc:drill:zk=10.10.30.166:5181> select customer.c_custkey, customer.c_name, orders.totalprice from customer, lateral (select sum(t.o.o_totalprice) as totalprice from unnest(customer.c_orders) t(o) WHERE t.o.o_totalprice in (89230.03,270087.44,246408.53,82657.72,153941.38,65277.06,180309.76)) orders order by customer.c_custkey limit 50;
> +------------+---------+-------------+
> | c_custkey  | c_name  | totalprice  |
> +------------+---------+-------------+
> +------------+---------+-------------+
> No rows selected (2.753 seconds)
> {noformat}
> Here is the plan for the query giving the correct result:
> {noformat}
> 00-00    Screen : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 472783.35, cumulative cost = {8242193.7349999985 rows, 4.102185433499999E7 cpu, 0.0 io, 5.809561804799999E9 network, 0.0 memory}, id = 14410
> 00-01      Project(c_custkey=[$0], c_name=[$1], totalprice=[$2]) : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 472783.35, cumulative cost = {8194915.3999999985 rows, 4.097457599999999E7 cpu, 0.0 io, 5.809561804799999E9 network, 0.0 memory}, id = 14409
> 00-02        UnionExchange : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 472783.35, cumulative cost = {7722132.049999999 rows, 3.9556225949999996E7 cpu, 0.0 io, 5.809561804799999E9 network, 0.0 memory}, id = 14408
> 01-01          LateralJoin(correlation=[$cor1], joinType=[inner], requiredColumns=[{0}], column excluded from output: =[`c_orders`]) : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 472783.35, cumulative cost = {7249348.699999999 rows, 3.577395915E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 14407
> 01-03            SelectionVectorRemover : rowType = RecordType(ANY c_orders, ANY c_custkey, ANY c_name): rowcount = 472783.35, cumulative cost = {6776561.35 rows, 2.442713975E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 14403
> 01-05              Filter(condition=[=($1, 101276)]) : rowType = RecordType(ANY c_orders, ANY c_custkey, ANY c_name): rowcount = 472783.35, cumulative cost = {6303778.0 rows, 2.39543564E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 14402
> 01-07                Scan(groupscan=[EasyGroupScan [selectionRoot=maprfs:/drill/testdata/lateral/tpchsf1/json/customer, numFiles=10, columns=[`c_orders`, `c_custkey`, `c_name`], files=[maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_6.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_4.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_3.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_7.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_5.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_2.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_0.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_8.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_1.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_9.json]]]) : rowType = RecordType(ANY c_orders, ANY c_custkey, ANY c_name): rowcount = 3151889.0, cumulative cost = {3151889.0 rows, 9455667.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 14401
> 01-02            StreamAgg(group=[{}], totalprice=[SUM($0)]) : rowType = RecordType(ANY totalprice): rowcount = 1.0, cumulative cost = {4.0 rows, 19.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 14406
> 01-04              Filter(condition=[OR(=($0, 89230.03), =($0, 270087.44), =($0, 246408.53), =($0, 82657.72), =($0, 153941.38), =($0, 65277.06), =($0, 180309.76))]) : rowType = RecordType(ANY ITEM): rowcount = 1.0, cumulative cost = {3.0 rows, 7.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 14405
> 01-06                Project(ITEM=[ITEM($0, 'o_totalprice')]) : rowType = RecordType(ANY ITEM): rowcount = 1.0, cumulative cost = {2.0 rows, 2.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 14404
> 01-08                  Unnest [srcOp=01-01]  : rowType = RecordType(ANY c_orders): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 14240
> {noformat}
> And here is the plan for the query giving wrong result:
> {noformat}
> 00-00    Screen : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 50.0, cumulative cost = {2.2063382E7 rows, 2.6342088596853146E8 cpu, 0.0 io, 7.7460824064E10 network, 0.0 memory}, id = 15076
> 00-01      Project(c_custkey=[$0], c_name=[$1], totalprice=[$2]) : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 50.0, cumulative cost = {2.2063377E7 rows, 2.6342088096853146E8 cpu, 0.0 io, 7.7460824064E10 network, 0.0 memory}, id = 15075
> 00-02        SelectionVectorRemover : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 50.0, cumulative cost = {2.2063327E7 rows, 2.6342073096853146E8 cpu, 0.0 io, 7.7460824064E10 network, 0.0 memory}, id = 15074
> 00-03          Limit(fetch=[50]) : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 50.0, cumulative cost = {2.2063277E7 rows, 2.6342068096853146E8 cpu, 0.0 io, 7.7460824064E10 network, 0.0 memory}, id = 15073
> 00-04            SingleMergeExchange(sort0=[0]) : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 3151889.0, cumulative cost = {2.2063227E7 rows, 2.6342048096853146E8 cpu, 0.0 io, 7.7460824064E10 network, 0.0 memory}, id = 15072
> 01-01              OrderedMuxExchange(sort0=[0]) : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 3151889.0, cumulative cost = {1.8911338E7 rows, 2.1299025696853146E8 cpu, 0.0 io, 3.8730412032E10 network, 0.0 memory}, id = 15071
> 02-01                SelectionVectorRemover : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 3151889.0, cumulative cost = {1.5759449E7 rows, 2.0983836796853146E8 cpu, 0.0 io, 3.8730412032E10 network, 0.0 memory}, id = 15070
> 02-02                  TopN(limit=[50]) : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 3151889.0, cumulative cost = {1.260756E7 rows, 2.0668647896853146E8 cpu, 0.0 io, 3.8730412032E10 network, 0.0 memory}, id = 15069
> 02-03                    HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 3151889.0, cumulative cost = {9455671.0 rows, 1.35531246E8 cpu, 0.0 io, 3.8730412032E10 network, 0.0 memory}, id = 15068
> 03-01                      LateralJoin(correlation=[$cor1], joinType=[inner], requiredColumns=[{0}], column excluded from output: =[`c_orders`]) : rowType = RecordType(ANY c_custkey, ANY c_name, ANY totalprice): rowcount = 3151889.0, cumulative cost = {6303782.0 rows, 8.5101022E7 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 15067
> 03-03                        Scan(groupscan=[EasyGroupScan [selectionRoot=maprfs:/drill/testdata/lateral/tpchsf1/json/customer, numFiles=10, columns=[`c_orders`, `c_custkey`, `c_name`], files=[maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_6.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_4.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_3.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_7.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_5.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_2.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_0.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_8.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_1.json, maprfs:///drill/testdata/lateral/tpchsf1/json/customer/0_0_9.json]]]) : rowType = RecordType(ANY c_orders, ANY c_custkey, ANY c_name): rowcount = 3151889.0, cumulative cost = {3151889.0 rows, 9455667.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 15063
> 03-02                        StreamAgg(group=[{}], totalprice=[SUM($0)]) : rowType = RecordType(ANY totalprice): rowcount = 1.0, cumulative cost = {4.0 rows, 19.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 15066
> 03-04                          Filter(condition=[OR(=($0, 89230.03), =($0, 270087.44), =($0, 246408.53), =($0, 82657.72), =($0, 153941.38), =($0, 65277.06), =($0, 180309.76))]) : rowType = RecordType(ANY ITEM): rowcount = 1.0, cumulative cost = {3.0 rows, 7.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 15065
> 03-05                            Project(ITEM=[ITEM($0, 'o_totalprice')]) : rowType = RecordType(ANY ITEM): rowcount = 1.0, cumulative cost = {2.0 rows, 2.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 15064
> 03-06                              Unnest [srcOp=03-01]  : rowType = RecordType(ANY c_orders): rowcount = 1.0, cumulative cost = {1.0 rows, 1.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 14860{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)