You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/04/22 20:15:35 UTC

[3/3] hive git commit: HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)

HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9a2bd0c4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9a2bd0c4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9a2bd0c4

Branch: refs/heads/branch-1
Commit: 9a2bd0c48179f2b654b69adbbe88cd5485492658
Parents: 130293e
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 22 10:55:53 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 22 11:05:35 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/GroupByOperator.java  |  1 +
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java      | 15 ++++++++++-----
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9a2bd0c4/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 9867739..083d4fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -1095,6 +1095,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
         throw new HiveException(e);
       }
     }
+    hashAggregations = null;
   }
 
   // Group by contains the columns needed - no need to aggregate from children

http://git-wip-us.apache.org/repos/asf/hive/blob/9a2bd0c4/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 82d4078..23497d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -118,7 +118,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
   protected transient String[] inputAliases;  // input aliases of this RS for join (used for PPD)
   protected transient boolean useUniformHash = false;
   // picks topN K:V pairs from input.
-  protected transient TopNHash reducerHash = new TopNHash();
+  protected transient TopNHash reducerHash;
   protected transient HiveKey keyWritable = new HiveKey();
   protected transient ObjectInspector keyObjectInspector;
   protected transient ObjectInspector valueObjectInspector;
@@ -236,7 +236,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
       float memUsage = conf.getTopNMemoryUsage();
 
       if (limit >= 0 && memUsage > 0) {
-        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
+        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash();
         reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
       }
 
@@ -376,8 +376,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
        */
       boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row);
 
-      // Try to store the first key. If it's not excluded, we will proceed.
-      int firstIndex = reducerHash.tryStoreKey(firstKey, partKeyNull);
+      // Try to store the first key.
+      // if TopNHashes aren't active, always forward
+      // if TopNHashes are active, proceed if not already excluded (i.e order by limit)
+      final int firstIndex =
+          (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, partKeyNull) : TopNHash.FORWARD;
       if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
       // Compute value and hashcode - we'd either store or forward them.
       BytesWritable value = makeValueWritable(row);
@@ -385,6 +388,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
       if (firstIndex == TopNHash.FORWARD) {
         collect(firstKey, value);
       } else {
+        // invariant: reducerHash != null
         assert firstIndex >= 0;
         reducerHash.storeValue(firstIndex, firstKey.hashCode(), value, false);
       }
@@ -561,12 +565,13 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
 
   @Override
   protected void closeOp(boolean abort) throws HiveException {
-    if (!abort) {
+    if (!abort && reducerHash != null) {
       reducerHash.flush();
     }
     super.closeOp(abort);
     out = null;
     random = null;
+    reducerHash = null;
     if (isLogInfoEnabled) {
       LOG.info(toString() + ": records written - " + numRows);
     }