You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/04/28 04:58:30 UTC

[3/5] drill git commit: DRILL-2856: Fix StreamingAgg inifnite loop problem due to state management issue.

DRILL-2856: Fix StreamingAgg inifnite loop problem due to state management issue.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3f781be0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3f781be0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3f781be0

Branch: refs/heads/master
Commit: 3f781be01451e3fe7d312de49b3c8d3c6d73e02d
Parents: 3689522
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Apr 19 08:52:47 2015 -0700
Committer: vkorukanti <ve...@gmail.com>
Committed: Mon Apr 27 14:11:39 2015 -0700

----------------------------------------------------------------------
 .../physical/impl/aggregate/StreamingAggTemplate.java  | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3f781be0/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index bf27187..05560c4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -21,7 +21,6 @@ import javax.inject.Named;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -85,12 +84,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
 
       if (first) {
         this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex);
-      }
 
-      if (incoming.getRecordCount() == 0) {
-        outer: while (true) {
-          IterOutcome out = outgoing.next(0, incoming);
-          switch (out) {
+        // consume empty batches until we get one with data.
+        if (incoming.getRecordCount() == 0) {
+          outer: while (true) {
+            IterOutcome out = outgoing.next(0, incoming);
+            switch (out) {
             case OK_NEW_SCHEMA:
             case OK:
               if (incoming.getRecordCount() == 0) {
@@ -106,10 +105,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
               outcome = out;
               done = true;
               return AggOutcome.CLEANUP_AND_RETURN;
+            }
           }
         }
       }
 
+
       if (newSchema) {
         return AggOutcome.UPDATE_AGGREGATOR;
       }