You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/04/28 04:58:30 UTC
[3/5] drill git commit: DRILL-2856: Fix StreamingAgg inifnite loop
problem due to state management issue.
DRILL-2856: Fix StreamingAgg inifnite loop problem due to state management issue.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3f781be0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3f781be0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3f781be0
Branch: refs/heads/master
Commit: 3f781be01451e3fe7d312de49b3c8d3c6d73e02d
Parents: 3689522
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Apr 19 08:52:47 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Mon Apr 27 14:11:39 2015 -0700
----------------------------------------------------------------------
.../physical/impl/aggregate/StreamingAggTemplate.java | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/3f781be0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index bf27187..05560c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -21,7 +21,6 @@ import javax.inject.Named;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorWrapper;
@@ -85,12 +84,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
if (first) {
this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex);
- }
- if (incoming.getRecordCount() == 0) {
- outer: while (true) {
- IterOutcome out = outgoing.next(0, incoming);
- switch (out) {
+ // consume empty batches until we get one with data.
+ if (incoming.getRecordCount() == 0) {
+ outer: while (true) {
+ IterOutcome out = outgoing.next(0, incoming);
+ switch (out) {
case OK_NEW_SCHEMA:
case OK:
if (incoming.getRecordCount() == 0) {
@@ -106,10 +105,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
outcome = out;
done = true;
return AggOutcome.CLEANUP_AND_RETURN;
+ }
}
}
}
+
if (newSchema) {
return AggOutcome.UPDATE_AGGREGATOR;
}