You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by me...@apache.org on 2015/04/27 22:48:02 UTC

[4/4] drill git commit: DRILL-2277: Fix streaming aggregate when input batch is empty

DRILL-2277: Fix streaming aggregate when input batch is empty


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3689522d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3689522d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3689522d

Branch: refs/heads/master
Commit: 3689522d4a7035a966f19695a678c6881fdaeba6
Parents: 5189635
Author: Mehant Baid <me...@gmail.com>
Authored: Wed Apr 22 17:49:12 2015 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Mon Apr 27 13:08:18 2015 -0700

----------------------------------------------------------------------
 .../impl/aggregate/StreamingAggBatch.java       | 79 ++++++++++++++++++--
 .../exec/fn/impl/TestAggregateFunctions.java    | 14 ++++
 2 files changed, 86 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3689522d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 33d2c7a..ed5b415 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -19,10 +19,12 @@ package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -48,6 +50,8 @@ import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.sun.codemodel.JExpr;
@@ -60,7 +64,22 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   private final RecordBatch incoming;
   private boolean done = false;
   private boolean first = true;
-  private boolean schemaBuilt = false;
+  private int recordCount = 0;
+
+  /*
+   * DRILL-2277, DRILL-2411: For straight aggregates without a group by clause we need to perform special handling when
+   * the incoming batch is empty. In the case of the empty input into the streaming aggregate we need
+   * to return a single batch with one row. For count we need to return 0 and for all other aggregate
+   * functions like sum, avg etc we need to return an explicit row with NULL. Since we correctly allocate the type of
+   * the outgoing vectors (required for count and nullable for other aggregate functions) all we really need to do
+   * is simply set the record count to be 1 in such cases. For nullable vectors we don't need to do anything because
+   * if we don't set anything the output will be NULL, however for required vectors we explicitly zero out the vector
+   * since we don't zero it out while allocating it.
+   *
+   * We maintain some state to remember that we have done such special handling.
+   */
+  private boolean specialBatchSent = false;
+  private static final int SPECIAL_BATCH_COUNT = 1;
 
   public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
     super(popConfig, context);
@@ -69,13 +88,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
   @Override
   public int getRecordCount() {
-    if (done) {
-      return 0;
-    }
-    if (aggregator == null) {
+    if (done || aggregator == null) {
       return 0;
     }
-    return aggregator.getOutputCount();
+    return recordCount;
   }
 
   @Override
@@ -95,7 +111,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
   @Override
   public IterOutcome innerNext() {
-      // this is only called on the first batch. Beyond this, the aggregator manages batches.
+
+    // if a special batch has been sent, we have no data in the incoming so exit early
+    if (specialBatchSent) {
+      return IterOutcome.NONE;
+    }
+
+    // this is only called on the first batch. Beyond this, the aggregator manages batches.
     if (aggregator == null || first) {
       IterOutcome outcome;
       if (first && incoming.getRecordCount() > 0) {
@@ -107,6 +129,14 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
       logger.debug("Next outcome of {}", outcome);
       switch (outcome) {
       case NONE:
+        if (first && popConfig.getKeys().length == 0) {
+          // if we have a straight aggregate and empty input batch, we need to handle it in a different way
+          constructSpecialBatch();
+          first = false;
+          // set state to indicate the fact that we have sent a special batch and input is empty
+          specialBatchSent = true;
+          return IterOutcome.OK;
+        }
       case NOT_YET:
       case STOP:
         return outcome;
@@ -125,6 +155,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
 
     while (true) {
       AggOutcome out = aggregator.doWork();
+      recordCount = aggregator.getOutputCount();
       logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
       switch (out) {
       case CLEANUP_AND_RETURN:
@@ -156,6 +187,40 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
   }
 
 
+  /**
+   * Method is invoked when we have a straight aggregate (no group by expression) and our input is empty.
+   * In this case we construct an outgoing batch with record count as 1. For the nullable vectors we don't set anything
+   * as we want the output to be NULL. For the required vectors (only for count()) we set the value to be zero since
+   * we don't zero out our buffers initially while allocating them.
+   */
+  private void constructSpecialBatch() {
+    int exprIndex = 0;
+    for (VectorWrapper vw: container) {
+      ValueVector vv = vw.getValueVector();
+      AllocationHelper.allocateNew(vv, SPECIAL_BATCH_COUNT);
+      vv.getMutator().setValueCount(SPECIAL_BATCH_COUNT);
+      if (vv.getField().getType().getMode() == TypeProtos.DataMode.REQUIRED) {
+        if (vv instanceof FixedWidthVector) {
+          /*
+           * The only case we should have a required vector in the aggregate is for count function whose output is
+           * always a FixedWidthVector (BigIntVector). Zero out the vector.
+           */
+          ((FixedWidthVector) vv).zeroVector();
+        } else {
+          /*
+           * If we are in this else block it means that we have a required vector which is of variable length. We
+           * should not be here, raising an error since we have set the record count to be 1 and not cleared the
+           * buffer
+           */
+          throw new DrillRuntimeException("FixedWidth vectors is the expected output vector type. " +
+              "Corresponding expression: " + popConfig.getExprs()[exprIndex].toString());
+        }
+      }
+      exprIndex++;
+    }
+    container.setRecordCount(SPECIAL_BATCH_COUNT);
+    recordCount = SPECIAL_BATCH_COUNT;
+  }
 
   /**
    * Creates a new Aggregator based on the current schema. If setup fails, this method is responsible for cleaning up

http://git-wip-us.apache.org/repos/asf/drill/blob/3689522d/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index 3b0fee0..01db7c2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -206,4 +206,18 @@ public class TestAggregateFunctions extends BaseTestQuery {
         .baselineValues(333.56708470261117d)
         .go();
   }
+
+  @Test
+  public void testAggregateWithEmptyInput() throws Exception {
+    String query = "select " +
+        "count(employee_id) col1, avg(employee_id) col2, sum(employee_id) col3 " +
+        "from cp.`employee.json` where 1 = 0";
+
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("col1", "col2", "col3")
+        .baselineValues(0l, null, null)
+        .go();
+  }
 }