You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by me...@apache.org on 2015/03/18 22:18:48 UTC

[3/7] drill git commit: DRILL-2279: Raise exception if schema change is encountered in hash and streaming aggregate

DRILL-2279: Raise exception if schema change is encountered in hash and streaming aggregate


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7a56df13
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7a56df13
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7a56df13

Branch: refs/heads/master
Commit: 7a56df134f0b43323173776861603ef5f786bb03
Parents: bfbc0e0
Author: Mehant Baid <me...@gmail.com>
Authored: Wed Feb 18 18:37:39 2015 -0800
Committer: Mehant Baid <me...@gmail.com>
Committed: Tue Mar 17 18:05:53 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/aggregate/HashAggBatch.java  |  9 ++++-----
 .../exec/physical/impl/aggregate/StreamingAggBatch.java   | 10 ++++------
 2 files changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/7a56df13/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 5946d43..c29fbf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -134,11 +134,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
         IterOutcome outcome = aggregator.getOutcome();
         return aggregator.getOutcome();
       case UPDATE_AGGREGATOR:
-        aggregator = null;
-        if (!createAggregator()) {
-          return IterOutcome.STOP;
-        }
-        continue;
+        context.fail(new SchemaChangeException("Hash aggregate does not support schema changes"));
+        cleanup();
+        killIncoming(false);
+        return IterOutcome.STOP;
       default:
         throw new IllegalStateException(String.format("Unknown state %s.", out));
       }

http://git-wip-us.apache.org/repos/asf/drill/blob/7a56df13/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 4acf3d6..33d2c7a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -145,12 +145,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
         first = false;
         return outcome;
       case UPDATE_AGGREGATOR:
-        first = false;
-        aggregator = null;
-        if (!createAggregator()) {
-          return IterOutcome.STOP;
-      }
-      continue;
+        context.fail(new SchemaChangeException("Streaming aggregate does not support schema changes"));
+        cleanup();
+        killIncoming(false);
+        return IterOutcome.STOP;
       default:
         throw new IllegalStateException(String.format("Unknown state %s.", out));
       }