You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by me...@apache.org on 2015/04/27 22:48:02 UTC
[4/4] drill git commit: DRILL-2277: Fix streaming aggregate when
input batch is empty
DRILL-2277: Fix streaming aggregate when input batch is empty
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3689522d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3689522d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3689522d
Branch: refs/heads/master
Commit: 3689522d4a7035a966f19695a678c6881fdaeba6
Parents: 5189635
Author: Mehant Baid <me...@gmail.com>
Authored: Wed Apr 22 17:49:12 2015 -0700
Committer: Mehant Baid <me...@gmail.com>
Committed: Mon Apr 27 13:08:18 2015 -0700
----------------------------------------------------------------------
.../impl/aggregate/StreamingAggBatch.java | 79 ++++++++++++++++++--
.../exec/fn/impl/TestAggregateFunctions.java | 14 ++++
2 files changed, 86 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/3689522d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 33d2c7a..ed5b415 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -19,10 +19,12 @@ package org.apache.drill.exec.physical.impl.aggregate;
import java.io.IOException;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -48,6 +50,8 @@ import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
import com.sun.codemodel.JExpr;
@@ -60,7 +64,22 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private final RecordBatch incoming;
private boolean done = false;
private boolean first = true;
- private boolean schemaBuilt = false;
+ private int recordCount = 0;
+
+ /*
+ * DRILL-2277, DRILL-2411: For straight aggregates without a group by clause we need to perform special handling when
+ * the incoming batch is empty. In the case of the empty input into the streaming aggregate we need
+ * to return a single batch with one row. For count we need to return 0 and for all other aggregate
+ * functions like sum, avg etc we need to return an explicit row with NULL. Since we correctly allocate the type of
+ * the outgoing vectors (required for count and nullable for other aggregate functions) all we really need to do
+ * is simply set the record count to be 1 in such cases. For nullable vectors we don't need to do anything because
+ * if we don't set anything the output will be NULL, however for required vectors we explicitly zero out the vector
+ * since we don't zero it out while allocating it.
+ *
+ * We maintain some state to remember that we have done such special handling.
+ */
+ private boolean specialBatchSent = false;
+ private static final int SPECIAL_BATCH_COUNT = 1;
public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(popConfig, context);
@@ -69,13 +88,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
@Override
public int getRecordCount() {
- if (done) {
- return 0;
- }
- if (aggregator == null) {
+ if (done || aggregator == null) {
return 0;
}
- return aggregator.getOutputCount();
+ return recordCount;
}
@Override
@@ -95,7 +111,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
@Override
public IterOutcome innerNext() {
- // this is only called on the first batch. Beyond this, the aggregator manages batches.
+
+ // if a special batch has been sent, we have no data in the incoming so exit early
+ if (specialBatchSent) {
+ return IterOutcome.NONE;
+ }
+
+ // this is only called on the first batch. Beyond this, the aggregator manages batches.
if (aggregator == null || first) {
IterOutcome outcome;
if (first && incoming.getRecordCount() > 0) {
@@ -107,6 +129,14 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
logger.debug("Next outcome of {}", outcome);
switch (outcome) {
case NONE:
+ if (first && popConfig.getKeys().length == 0) {
+ // if we have a straight aggregate and empty input batch, we need to handle it in a different way
+ constructSpecialBatch();
+ first = false;
+ // set state to indicate the fact that we have sent a special batch and input is empty
+ specialBatchSent = true;
+ return IterOutcome.OK;
+ }
case NOT_YET:
case STOP:
return outcome;
@@ -125,6 +155,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
while (true) {
AggOutcome out = aggregator.doWork();
+ recordCount = aggregator.getOutputCount();
logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
switch (out) {
case CLEANUP_AND_RETURN:
@@ -156,6 +187,40 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
+ /**
+ * Method is invoked when we have a straight aggregate (no group by expression) and our input is empty.
+ * In this case we construct an outgoing batch with record count as 1. For the nullable vectors we don't set anything
+ * as we want the output to be NULL. For the required vectors (only for count()) we set the value to be zero since
+ * we don't zero out our buffers initially while allocating them.
+ */
+ private void constructSpecialBatch() {
+ int exprIndex = 0;
+ for (VectorWrapper vw: container) {
+ ValueVector vv = vw.getValueVector();
+ AllocationHelper.allocateNew(vv, SPECIAL_BATCH_COUNT);
+ vv.getMutator().setValueCount(SPECIAL_BATCH_COUNT);
+ if (vv.getField().getType().getMode() == TypeProtos.DataMode.REQUIRED) {
+ if (vv instanceof FixedWidthVector) {
+ /*
+ * The only case we should have a required vector in the aggregate is for count function whose output is
+ * always a FixedWidthVector (BigIntVector). Zero out the vector.
+ */
+ ((FixedWidthVector) vv).zeroVector();
+ } else {
+ /*
+ * If we are in this else block it means that we have a required vector which is of variable length. We
+ * should not be here, raising an error since we have set the record count to be 1 and not cleared the
+ * buffer
+ */
+ throw new DrillRuntimeException("FixedWidth vectors is the expected output vector type. " +
+ "Corresponding expression: " + popConfig.getExprs()[exprIndex].toString());
+ }
+ }
+ exprIndex++;
+ }
+ container.setRecordCount(SPECIAL_BATCH_COUNT);
+ recordCount = SPECIAL_BATCH_COUNT;
+ }
/**
* Creates a new Aggregator based on the current schema. If setup fails, this method is responsible for cleaning up
http://git-wip-us.apache.org/repos/asf/drill/blob/3689522d/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index 3b0fee0..01db7c2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -206,4 +206,18 @@ public class TestAggregateFunctions extends BaseTestQuery {
.baselineValues(333.56708470261117d)
.go();
}
+
+ @Test
+ public void testAggregateWithEmptyInput() throws Exception {
+ String query = "select " +
+ "count(employee_id) col1, avg(employee_id) col2, sum(employee_id) col3 " +
+ "from cp.`employee.json` where 1 = 0";
+
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("col1", "col2", "col3")
+ .baselineValues(0l, null, null)
+ .go();
+ }
}