You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/04/22 20:15:35 UTC
[3/3] hive git commit: HIVE-13240 : GroupByOperator: Drop the hash
aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)
HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9a2bd0c4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9a2bd0c4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9a2bd0c4
Branch: refs/heads/branch-1
Commit: 9a2bd0c48179f2b654b69adbbe88cd5485492658
Parents: 130293e
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 22 10:55:53 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 22 11:05:35 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/GroupByOperator.java | 1 +
.../hadoop/hive/ql/exec/ReduceSinkOperator.java | 15 ++++++++++-----
2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9a2bd0c4/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 9867739..083d4fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -1095,6 +1095,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
throw new HiveException(e);
}
}
+ hashAggregations = null;
}
// Group by contains the columns needed - no need to aggregate from children
http://git-wip-us.apache.org/repos/asf/hive/blob/9a2bd0c4/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 82d4078..23497d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -118,7 +118,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD)
protected transient boolean useUniformHash = false;
// picks topN K:V pairs from input.
- protected transient TopNHash reducerHash = new TopNHash();
+ protected transient TopNHash reducerHash;
protected transient HiveKey keyWritable = new HiveKey();
protected transient ObjectInspector keyObjectInspector;
protected transient ObjectInspector valueObjectInspector;
@@ -236,7 +236,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
float memUsage = conf.getTopNMemoryUsage();
if (limit >= 0 && memUsage > 0) {
- reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
+ reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash();
reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
}
@@ -376,8 +376,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
*/
boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row);
- // Try to store the first key. If it's not excluded, we will proceed.
- int firstIndex = reducerHash.tryStoreKey(firstKey, partKeyNull);
+ // Try to store the first key.
+ // if TopNHashes aren't active, always forward
+ // if TopNHashes are active, proceed if not already excluded (i.e order by limit)
+ final int firstIndex =
+ (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, partKeyNull) : TopNHash.FORWARD;
if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
// Compute value and hashcode - we'd either store or forward them.
BytesWritable value = makeValueWritable(row);
@@ -385,6 +388,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
if (firstIndex == TopNHash.FORWARD) {
collect(firstKey, value);
} else {
+ // invariant: reducerHash != null
assert firstIndex >= 0;
reducerHash.storeValue(firstIndex, firstKey.hashCode(), value, false);
}
@@ -561,12 +565,13 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
@Override
protected void closeOp(boolean abort) throws HiveException {
- if (!abort) {
+ if (!abort && reducerHash != null) {
reducerHash.flush();
}
super.closeOp(abort);
out = null;
random = null;
+ reducerHash = null;
if (isLogInfoEnabled) {
LOG.info(toString() + ": records written - " + numRows);
}