You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by me...@apache.org on 2015/03/18 22:18:48 UTC
[3/7] drill git commit: DRILL-2279: Raise exception if schema change
is encountered in hash and streaming aggregate
DRILL-2279: Raise exception if schema change is encountered in hash and streaming aggregate
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7a56df13
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7a56df13
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7a56df13
Branch: refs/heads/master
Commit: 7a56df134f0b43323173776861603ef5f786bb03
Parents: bfbc0e0
Author: Mehant Baid <me...@gmail.com>
Authored: Wed Feb 18 18:37:39 2015 -0800
Committer: Mehant Baid <me...@gmail.com>
Committed: Tue Mar 17 18:05:53 2015 -0700
----------------------------------------------------------------------
.../drill/exec/physical/impl/aggregate/HashAggBatch.java | 9 ++++-----
.../exec/physical/impl/aggregate/StreamingAggBatch.java | 10 ++++------
2 files changed, 8 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/7a56df13/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 5946d43..c29fbf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -134,11 +134,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
IterOutcome outcome = aggregator.getOutcome();
return aggregator.getOutcome();
case UPDATE_AGGREGATOR:
- aggregator = null;
- if (!createAggregator()) {
- return IterOutcome.STOP;
- }
- continue;
+ context.fail(new SchemaChangeException("Hash aggregate does not support schema changes"));
+ cleanup();
+ killIncoming(false);
+ return IterOutcome.STOP;
default:
throw new IllegalStateException(String.format("Unknown state %s.", out));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/7a56df13/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 4acf3d6..33d2c7a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -145,12 +145,10 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
first = false;
return outcome;
case UPDATE_AGGREGATOR:
- first = false;
- aggregator = null;
- if (!createAggregator()) {
- return IterOutcome.STOP;
- }
- continue;
+ context.fail(new SchemaChangeException("Streaming aggregate does not support schema changes"));
+ cleanup();
+ killIncoming(false);
+ return IterOutcome.STOP;
default:
throw new IllegalStateException(String.format("Unknown state %s.", out));
}