You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/04/22 20:15:33 UTC
[1/3] hive git commit: HIVE-13240 : GroupByOperator: Drop the hash
aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/branch-1 130293e56 -> 9a2bd0c48
refs/heads/branch-2.0 75723a5d8 -> 5cbada84d
refs/heads/master aac9263b4 -> 145e253df
HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/145e253d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/145e253d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/145e253d
Branch: refs/heads/master
Commit: 145e253df9c05e4e725c6aeab172ac0885bf5384
Parents: aac9263
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 22 10:55:53 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 22 10:55:53 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/GroupByOperator.java | 1 +
.../hadoop/hive/ql/exec/ReduceSinkOperator.java | 15 ++++++++++-----
2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/145e253d/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index e39b75e..47b5793 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -1102,6 +1102,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
throw new HiveException(e);
}
}
+ hashAggregations = null;
}
// Group by contains the columns needed - no need to aggregate from children
http://git-wip-us.apache.org/repos/asf/hive/blob/145e253d/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 1b8e7d2..ba71a1e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -115,7 +115,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD)
protected transient boolean useUniformHash = false;
// picks topN K:V pairs from input.
- protected transient TopNHash reducerHash = new TopNHash();
+ protected transient TopNHash reducerHash;
protected transient HiveKey keyWritable = new HiveKey();
protected transient ObjectInspector keyObjectInspector;
protected transient ObjectInspector valueObjectInspector;
@@ -237,7 +237,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
float memUsage = conf.getTopNMemoryUsage();
if (limit >= 0 && memUsage > 0) {
- reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
+ reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash();
reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
}
@@ -385,8 +385,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
*/
boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row);
- // Try to store the first key. If it's not excluded, we will proceed.
- int firstIndex = reducerHash.tryStoreKey(firstKey, partKeyNull);
+ // Try to store the first key.
+ // if TopNHashes aren't active, always forward
+ // if TopNHashes are active, proceed if not already excluded (i.e order by limit)
+ final int firstIndex =
+ (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, partKeyNull) : TopNHash.FORWARD;
if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
// Compute value and hashcode - we'd either store or forward them.
BytesWritable value = makeValueWritable(row);
@@ -394,6 +397,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
if (firstIndex == TopNHash.FORWARD) {
collect(firstKey, value);
} else {
+ // invariant: reducerHash != null
assert firstIndex >= 0;
reducerHash.storeValue(firstIndex, firstKey.hashCode(), value, false);
}
@@ -563,12 +567,13 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
@Override
protected void closeOp(boolean abort) throws HiveException {
- if (!abort) {
+ if (!abort && reducerHash != null) {
reducerHash.flush();
}
super.closeOp(abort);
out = null;
random = null;
+ reducerHash = null;
if (isLogInfoEnabled) {
LOG.info(toString() + ": records written - " + numRows);
}
[3/3] hive git commit: HIVE-13240 : GroupByOperator: Drop the hash
aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)
Posted by se...@apache.org.
HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9a2bd0c4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9a2bd0c4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9a2bd0c4
Branch: refs/heads/branch-1
Commit: 9a2bd0c48179f2b654b69adbbe88cd5485492658
Parents: 130293e
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 22 10:55:53 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 22 11:05:35 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/GroupByOperator.java | 1 +
.../hadoop/hive/ql/exec/ReduceSinkOperator.java | 15 ++++++++++-----
2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/9a2bd0c4/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 9867739..083d4fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -1095,6 +1095,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
throw new HiveException(e);
}
}
+ hashAggregations = null;
}
// Group by contains the columns needed - no need to aggregate from children
http://git-wip-us.apache.org/repos/asf/hive/blob/9a2bd0c4/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 82d4078..23497d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -118,7 +118,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD)
protected transient boolean useUniformHash = false;
// picks topN K:V pairs from input.
- protected transient TopNHash reducerHash = new TopNHash();
+ protected transient TopNHash reducerHash;
protected transient HiveKey keyWritable = new HiveKey();
protected transient ObjectInspector keyObjectInspector;
protected transient ObjectInspector valueObjectInspector;
@@ -236,7 +236,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
float memUsage = conf.getTopNMemoryUsage();
if (limit >= 0 && memUsage > 0) {
- reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
+ reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash();
reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
}
@@ -376,8 +376,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
*/
boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row);
- // Try to store the first key. If it's not excluded, we will proceed.
- int firstIndex = reducerHash.tryStoreKey(firstKey, partKeyNull);
+ // Try to store the first key.
+ // if TopNHashes aren't active, always forward
+ // if TopNHashes are active, proceed if not already excluded (i.e order by limit)
+ final int firstIndex =
+ (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, partKeyNull) : TopNHash.FORWARD;
if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
// Compute value and hashcode - we'd either store or forward them.
BytesWritable value = makeValueWritable(row);
@@ -385,6 +388,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
if (firstIndex == TopNHash.FORWARD) {
collect(firstKey, value);
} else {
+ // invariant: reducerHash != null
assert firstIndex >= 0;
reducerHash.storeValue(firstIndex, firstKey.hashCode(), value, false);
}
@@ -561,12 +565,13 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
@Override
protected void closeOp(boolean abort) throws HiveException {
- if (!abort) {
+ if (!abort && reducerHash != null) {
reducerHash.flush();
}
super.closeOp(abort);
out = null;
random = null;
+ reducerHash = null;
if (isLogInfoEnabled) {
LOG.info(toString() + ": records written - " + numRows);
}
[2/3] hive git commit: HIVE-13240 : GroupByOperator: Drop the hash
aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)
Posted by se...@apache.org.
HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5cbada84
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5cbada84
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5cbada84
Branch: refs/heads/branch-2.0
Commit: 5cbada84d0b020245936648f7785d4b3112b6934
Parents: 75723a5
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 22 10:55:53 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 22 11:00:15 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/GroupByOperator.java | 1 +
.../hadoop/hive/ql/exec/ReduceSinkOperator.java | 15 ++++++++++-----
2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/5cbada84/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 0839b42..b88722f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -1104,6 +1104,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
throw new HiveException(e);
}
}
+ hashAggregations = null;
}
// Group by contains the columns needed - no need to aggregate from children
http://git-wip-us.apache.org/repos/asf/hive/blob/5cbada84/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 24642af..27dbf59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -115,7 +115,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD)
protected transient boolean useUniformHash = false;
// picks topN K:V pairs from input.
- protected transient TopNHash reducerHash = new TopNHash();
+ protected transient TopNHash reducerHash;
protected transient HiveKey keyWritable = new HiveKey();
protected transient ObjectInspector keyObjectInspector;
protected transient ObjectInspector valueObjectInspector;
@@ -242,7 +242,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
float memUsage = conf.getTopNMemoryUsage();
if (limit >= 0 && memUsage > 0) {
- reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
+ reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash();
reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
}
@@ -381,8 +381,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
*/
boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row);
- // Try to store the first key. If it's not excluded, we will proceed.
- int firstIndex = reducerHash.tryStoreKey(firstKey, partKeyNull);
+ // Try to store the first key.
+ // if TopNHashes aren't active, always forward
+ // if TopNHashes are active, proceed if not already excluded (i.e order by limit)
+ final int firstIndex =
+ (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, partKeyNull) : TopNHash.FORWARD;
if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
// Compute value and hashcode - we'd either store or forward them.
BytesWritable value = makeValueWritable(row);
@@ -390,6 +393,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
if (firstIndex == TopNHash.FORWARD) {
collect(firstKey, value);
} else {
+ // invariant: reducerHash != null
assert firstIndex >= 0;
reducerHash.storeValue(firstIndex, firstKey.hashCode(), value, false);
}
@@ -566,12 +570,13 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
@Override
protected void closeOp(boolean abort) throws HiveException {
- if (!abort) {
+ if (!abort && reducerHash != null) {
reducerHash.flush();
}
super.closeOp(abort);
out = null;
random = null;
+ reducerHash = null;
if (isLogInfoEnabled) {
LOG.info(toString() + ": records written - " + numRows);
}