You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ji...@apache.org on 2014/03/17 17:37:15 UTC

svn commit: r1578456 - /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java

Author: jitendra
Date: Mon Mar 17 16:37:15 2014
New Revision: 1578456

URL: http://svn.apache.org/r1578456
Log:
HIVE-6518: Add a GC canary to the VectorGroupByOperator to flush whenever a GC is triggered. (Gopal V via jitendra, reviewed by Remus, Gunther)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1578456&r1=1578455&r2=1578456&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Mon Mar 17 16:37:15 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.v
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryMXBean;
+import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -117,6 +118,9 @@ public class VectorGroupByOperator exten
    * Percent of entries to flush when memory threshold exceeded.
    */
   private transient float percentEntriesToFlush = 0.1f;
+  
+  private transient SoftReference<Object> gcCanary = new SoftReference<Object>(new Object());
+  private transient long gcCanaryFlushes = 0L;
 
   /**
    * The global key-aggregation hash map.
@@ -248,6 +252,10 @@ public class VectorGroupByOperator exten
     while (shouldFlush(batch)) {
       flush(false);
       
+      if(gcCanary.get() == null) {
+        gcCanaryFlushes++;
+        gcCanary = new SoftReference<Object>(new Object()); 
+      }
       //Validate that some progress is being made
       if (!(numEntriesHashTable < preFlushEntriesCount)) {
         if (LOG.isDebugEnabled()) {
@@ -281,11 +289,11 @@ public class VectorGroupByOperator exten
     int entriesFlushed = 0;
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb)",
+      LOG.debug(String.format("Flush %d %s entries:%d fixed:%d variable:%d (used:%dMb max:%dMb gcCanary: %s)",
           entriesToFlush, all ? "(all)" : "",
           numEntriesHashTable, fixedHashEntrySize, avgVariableSize,
           numEntriesHashTable * (fixedHashEntrySize + avgVariableSize)/1024/1024,
-          maxHashTblMemory/1024/1024));
+          maxHashTblMemory/1024/1024, gcCanary.get() == null ? "dead" : "alive"));
     }
 
     Object[] forwardCache = new Object[keyExpressions.length + aggregators.length];
@@ -349,8 +357,12 @@ public class VectorGroupByOperator exten
       updateAvgVariableSize(batch);
       numEntriesSinceCheck = 0;
     }
-    return numEntriesHashTable > this.maxHtEntries ||
-        numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory;
+    if(numEntriesHashTable > this.maxHtEntries ||
+        numEntriesHashTable * (fixedHashEntrySize + avgVariableSize) > maxHashTblMemory) {
+      return true;
+    } else {
+      return (gcCanary.get() == null);
+    }
   }
 
   /**
@@ -441,6 +453,9 @@ public class VectorGroupByOperator exten
 
   @Override
   public void closeOp(boolean aborted) throws HiveException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug(String.format("GC canary caused %d flushes", gcCanaryFlushes));
+    }
     if (!aborted) {
       flush(true);
     }