You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/04/22 20:15:33 UTC

[1/3] hive git commit: HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/branch-1 130293e56 -> 9a2bd0c48
  refs/heads/branch-2.0 75723a5d8 -> 5cbada84d
  refs/heads/master aac9263b4 -> 145e253df


HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/145e253d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/145e253d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/145e253d

Branch: refs/heads/master
Commit: 145e253df9c05e4e725c6aeab172ac0885bf5384
Parents: aac9263
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 22 10:55:53 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 22 10:55:53 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/GroupByOperator.java  |  1 +
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java      | 15 ++++++++++-----
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/145e253d/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index e39b75e..47b5793 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -1102,6 +1102,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
         throw new HiveException(e);
       }
     }
+    hashAggregations = null;
   }
 
   // Group by contains the columns needed - no need to aggregate from children

http://git-wip-us.apache.org/repos/asf/hive/blob/145e253d/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 1b8e7d2..ba71a1e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -115,7 +115,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
   protected transient String[] inputAliases;  // input aliases of this RS for join (used for PPD)
   protected transient boolean useUniformHash = false;
   // picks topN K:V pairs from input.
-  protected transient TopNHash reducerHash = new TopNHash();
+  protected transient TopNHash reducerHash;
   protected transient HiveKey keyWritable = new HiveKey();
   protected transient ObjectInspector keyObjectInspector;
   protected transient ObjectInspector valueObjectInspector;
@@ -237,7 +237,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
       float memUsage = conf.getTopNMemoryUsage();
 
       if (limit >= 0 && memUsage > 0) {
-        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
+        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash();
         reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
       }
 
@@ -385,8 +385,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
        */
       boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row);
 
-      // Try to store the first key. If it's not excluded, we will proceed.
-      int firstIndex = reducerHash.tryStoreKey(firstKey, partKeyNull);
+      // Try to store the first key.
+      // if TopNHashes aren't active, always forward
+      // if TopNHashes are active, proceed if not already excluded (i.e order by limit)
+      final int firstIndex =
+          (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, partKeyNull) : TopNHash.FORWARD;
       if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
       // Compute value and hashcode - we'd either store or forward them.
       BytesWritable value = makeValueWritable(row);
@@ -394,6 +397,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
       if (firstIndex == TopNHash.FORWARD) {
         collect(firstKey, value);
       } else {
+        // invariant: reducerHash != null
         assert firstIndex >= 0;
         reducerHash.storeValue(firstIndex, firstKey.hashCode(), value, false);
       }
@@ -563,12 +567,13 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
 
   @Override
   protected void closeOp(boolean abort) throws HiveException {
-    if (!abort) {
+    if (!abort && reducerHash != null) {
       reducerHash.flush();
     }
     super.closeOp(abort);
     out = null;
     random = null;
+    reducerHash = null;
     if (isLogInfoEnabled) {
       LOG.info(toString() + ": records written - " + numRows);
     }


[3/3] hive git commit: HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)

Posted by se...@apache.org.
HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9a2bd0c4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9a2bd0c4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9a2bd0c4

Branch: refs/heads/branch-1
Commit: 9a2bd0c48179f2b654b69adbbe88cd5485492658
Parents: 130293e
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 22 10:55:53 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 22 11:05:35 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/GroupByOperator.java  |  1 +
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java      | 15 ++++++++++-----
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9a2bd0c4/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 9867739..083d4fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -1095,6 +1095,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
         throw new HiveException(e);
       }
     }
+    hashAggregations = null;
   }
 
   // Group by contains the columns needed - no need to aggregate from children

http://git-wip-us.apache.org/repos/asf/hive/blob/9a2bd0c4/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 82d4078..23497d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -118,7 +118,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
   protected transient String[] inputAliases;  // input aliases of this RS for join (used for PPD)
   protected transient boolean useUniformHash = false;
   // picks topN K:V pairs from input.
-  protected transient TopNHash reducerHash = new TopNHash();
+  protected transient TopNHash reducerHash;
   protected transient HiveKey keyWritable = new HiveKey();
   protected transient ObjectInspector keyObjectInspector;
   protected transient ObjectInspector valueObjectInspector;
@@ -236,7 +236,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
       float memUsage = conf.getTopNMemoryUsage();
 
       if (limit >= 0 && memUsage > 0) {
-        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
+        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash();
         reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
       }
 
@@ -376,8 +376,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
        */
       boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row);
 
-      // Try to store the first key. If it's not excluded, we will proceed.
-      int firstIndex = reducerHash.tryStoreKey(firstKey, partKeyNull);
+      // Try to store the first key.
+      // if TopNHashes aren't active, always forward
+      // if TopNHashes are active, proceed if not already excluded (i.e order by limit)
+      final int firstIndex =
+          (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, partKeyNull) : TopNHash.FORWARD;
       if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
       // Compute value and hashcode - we'd either store or forward them.
       BytesWritable value = makeValueWritable(row);
@@ -385,6 +388,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
       if (firstIndex == TopNHash.FORWARD) {
         collect(firstKey, value);
       } else {
+        // invariant: reducerHash != null
         assert firstIndex >= 0;
         reducerHash.storeValue(firstIndex, firstKey.hashCode(), value, false);
       }
@@ -561,12 +565,13 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
 
   @Override
   protected void closeOp(boolean abort) throws HiveException {
-    if (!abort) {
+    if (!abort && reducerHash != null) {
       reducerHash.flush();
     }
     super.closeOp(abort);
     out = null;
     random = null;
+    reducerHash = null;
     if (isLogInfoEnabled) {
       LOG.info(toString() + ": records written - " + numRows);
     }


[2/3] hive git commit: HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)

Posted by se...@apache.org.
HIVE-13240 : GroupByOperator: Drop the hash aggregates when closing operator (Gopal V, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5cbada84
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5cbada84
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5cbada84

Branch: refs/heads/branch-2.0
Commit: 5cbada84d0b020245936648f7785d4b3112b6934
Parents: 75723a5
Author: Sergey Shelukhin <se...@apache.org>
Authored: Fri Apr 22 10:55:53 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Fri Apr 22 11:00:15 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/GroupByOperator.java  |  1 +
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java      | 15 ++++++++++-----
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5cbada84/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
index 0839b42..b88722f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
@@ -1104,6 +1104,7 @@ public class GroupByOperator extends Operator<GroupByDesc> {
         throw new HiveException(e);
       }
     }
+    hashAggregations = null;
   }
 
   // Group by contains the columns needed - no need to aggregate from children

http://git-wip-us.apache.org/repos/asf/hive/blob/5cbada84/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 24642af..27dbf59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -115,7 +115,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
   protected transient String[] inputAliases;  // input aliases of this RS for join (used for PPD)
   protected transient boolean useUniformHash = false;
   // picks topN K:V pairs from input.
-  protected transient TopNHash reducerHash = new TopNHash();
+  protected transient TopNHash reducerHash;
   protected transient HiveKey keyWritable = new HiveKey();
   protected transient ObjectInspector keyObjectInspector;
   protected transient ObjectInspector valueObjectInspector;
@@ -242,7 +242,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
       float memUsage = conf.getTopNMemoryUsage();
 
       if (limit >= 0 && memUsage > 0) {
-        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
+        reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : new TopNHash();
         reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
       }
 
@@ -381,8 +381,11 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
        */
       boolean partKeyNull = conf.isPTFReduceSink() && partitionKeysAreNull(row);
 
-      // Try to store the first key. If it's not excluded, we will proceed.
-      int firstIndex = reducerHash.tryStoreKey(firstKey, partKeyNull);
+      // Try to store the first key.
+      // if TopNHashes aren't active, always forward
+      // if TopNHashes are active, proceed if not already excluded (i.e order by limit)
+      final int firstIndex =
+          (reducerHash != null) ? reducerHash.tryStoreKey(firstKey, partKeyNull) : TopNHash.FORWARD;
       if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
       // Compute value and hashcode - we'd either store or forward them.
       BytesWritable value = makeValueWritable(row);
@@ -390,6 +393,7 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
       if (firstIndex == TopNHash.FORWARD) {
         collect(firstKey, value);
       } else {
+        // invariant: reducerHash != null
         assert firstIndex >= 0;
         reducerHash.storeValue(firstIndex, firstKey.hashCode(), value, false);
       }
@@ -566,12 +570,13 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
 
   @Override
   protected void closeOp(boolean abort) throws HiveException {
-    if (!abort) {
+    if (!abort && reducerHash != null) {
       reducerHash.flush();
     }
     super.closeOp(abort);
     out = null;
     random = null;
+    reducerHash = null;
     if (isLogInfoEnabled) {
       LOG.info(toString() + ": records written - " + numRows);
     }