You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/11/07 19:54:09 UTC

svn commit: r1637432 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec: ./ mapjoin/ mr/ persistence/ tez/

Author: sershe
Date: Fri Nov  7 18:54:08 2014
New Revision: 1637432

URL: http://svn.apache.org/r1637432
Log:
HIVE-8556 : introduce overflow control and sanity check to BytesBytesMapJoin (Sergey Shelukhin, reviewed by Mostafa Mokhtar and Prasanth J)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java Fri Nov  7 18:54:08 2014
@@ -33,6 +33,6 @@ public interface HashTableLoader {
 
   void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp);
 
-  void load(MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes)
-      throws HiveException;
+  void load(MapJoinTableContainer[] mapJoinTables,
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException;
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Nov  7 18:54:08 2014
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.HashTableLoaderFactory;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
@@ -187,7 +188,9 @@ public class MapJoinOperator extends Abs
     }
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
     loader.init(getExecContext(), hconf, this);
-    loader.load(mapJoinTables, mapJoinTableSerdes);
+    long memUsage = (long)(MapJoinMemoryExhaustionHandler.getMaxHeapSize()
+        * conf.getHashTableMemoryUsage());
+    loader.load(mapJoinTables, mapJoinTableSerdes, memUsage);
     if (!conf.isBucketMapJoin()) {
       /*
        * The issue with caching in case of bucket map join is that different tasks

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java Fri Nov  7 18:54:08 2014
@@ -55,22 +55,30 @@ public class MapJoinMemoryExhaustionHand
     this.console = console;
     this.maxMemoryUsage = maxMemoryUsage;
     this.memoryMXBean = ManagementFactory.getMemoryMXBean();
-    long maxHeapSize = memoryMXBean.getHeapMemoryUsage().getMax();
+    this.maxHeapSize = getMaxHeapSize(memoryMXBean);
+    percentageNumberFormat = NumberFormat.getInstance();
+    percentageNumberFormat.setMinimumFractionDigits(2);
+    LOG.info("JVM Max Heap Size: " + this.maxHeapSize);
+  }
+
+  public static long getMaxHeapSize() {
+    return getMaxHeapSize(ManagementFactory.getMemoryMXBean());
+  }
+
+  private static long getMaxHeapSize(MemoryMXBean bean) {
+    long maxHeapSize = bean.getHeapMemoryUsage().getMax();
     /*
      * According to the javadoc, getMax() can return -1. In this case
      * default to 200MB. This will probably never actually happen.
      */
     if(maxHeapSize == -1) {
-      this.maxHeapSize = 200L * 1024L * 1024L;
       LOG.warn("MemoryMXBean.getHeapMemoryUsage().getMax() returned -1, " +
           "defaulting maxHeapSize to 200MB");
-    } else {
-      this.maxHeapSize = maxHeapSize;
+      return 200L * 1024L * 1024L;
     }
-    percentageNumberFormat = NumberFormat.getInstance();
-    percentageNumberFormat.setMinimumFractionDigits(2);
-    LOG.info("JVM Max Heap Size: " + this.maxHeapSize);
+    return maxHeapSize;
   }
+
   /**
    * Throws MapJoinMemoryExhaustionException when the JVM has consumed the
    * configured percentage of memory. The arguments are used simply for the error

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java Fri Nov  7 18:54:08 2014
@@ -72,7 +72,7 @@ public class HashTableLoader implements 
   @Override
   public void load(
       MapJoinTableContainer[] mapJoinTables,
-      MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
 
     String currentInputPath = context.getCurrentInputPath().toString();
     LOG.info("******* Load from HashTable for input file: " + currentInputPath);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java Fri Nov  7 18:54:08 2014
@@ -149,13 +149,27 @@ public final class BytesBytesMultiHashMa
 
   /** We have 39 bits to store list pointer from the first record; this is size limit */
   final static long MAX_WB_SIZE = ((long)1) << 38;
+  /** 8 Gb of refs is the max capacity if memory limit is not specified. If someone has 100s of
+   * Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */
+  private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024;
 
-  public BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) {
+  public BytesBytesMultiHashMap(int initialCapacity,
+      float loadFactor, int wbSize, long memUsage, int defaultCapacity) {
     if (loadFactor < 0 || loadFactor > 1) {
       throw new AssertionError("Load factor must be between (0, 1].");
     }
+    assert initialCapacity > 0;
     initialCapacity = (Long.bitCount(initialCapacity) == 1)
         ? initialCapacity : nextHighestPowerOfTwo(initialCapacity);
+    // 8 bytes per long in the refs, assume data will be empty. This is just a sanity check.
+    int maxCapacity =  (memUsage <= 0) ? DEFAULT_MAX_CAPACITY
+        : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8);
+    if (maxCapacity < initialCapacity || initialCapacity <= 0) {
+      // Either initialCapacity is too large, or nextHighestPowerOfTwo overflows
+      initialCapacity = (Long.bitCount(maxCapacity) == 1)
+          ? maxCapacity : nextLowestPowerOfTwo(maxCapacity);
+    }
+
     validateCapacity(initialCapacity);
     startingHashBitCount = 63 - Long.numberOfLeadingZeros(initialCapacity);
     this.loadFactor = loadFactor;
@@ -164,6 +178,11 @@ public final class BytesBytesMultiHashMa
     resizeThreshold = (int)(initialCapacity * this.loadFactor);
   }
 
+  @VisibleForTesting
+  BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) {
+    this(initialCapacity, loadFactor, wbSize, -1, 100000);
+  }
+
   /** The source of keys and values to put into hashtable; avoids byte copying. */
   public static interface KvSource {
     /** Write key into output. */
@@ -644,6 +663,10 @@ public final class BytesBytesMultiHashMa
     return Integer.highestOneBit(v) << 1;
   }
 
+  private static int nextLowestPowerOfTwo(int v) {
+    return Integer.highestOneBit(v);
+  }
+
   @VisibleForTesting
   int getCapacity() {
     return refs.length;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java Fri Nov  7 18:54:08 2014
@@ -60,17 +60,20 @@ public class MapJoinBytesTableContainer 
   private final List<Object> EMPTY_LIST = new ArrayList<Object>(0);
 
   public MapJoinBytesTableContainer(Configuration hconf,
-      MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException {
+      MapJoinObjectSerDeContext valCtx, long keyCount, long memUsage) throws SerDeException {
     this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT),
         HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
         HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR),
-        HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), valCtx, keyCount);
+        HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE),
+        valCtx, keyCount, memUsage);
   }
 
   private MapJoinBytesTableContainer(float keyCountAdj, int threshold, float loadFactor,
-      int wbSize, MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException {
-    threshold = HashMapWrapper.calculateTableSize(keyCountAdj, threshold, loadFactor, keyCount);
-    hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize);
+      int wbSize, MapJoinObjectSerDeContext valCtx, long keyCount, long memUsage)
+          throws SerDeException {
+    int newThreshold = HashMapWrapper.calculateTableSize(
+        keyCountAdj, threshold, loadFactor, keyCount);
+    hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage, threshold);
   }
 
   private LazyBinaryStructObjectInspector createInternalOi(

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Fri Nov  7 18:54:08 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
 import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
@@ -69,7 +70,7 @@ public class HashTableLoader implements 
   @Override
   public void load(
       MapJoinTableContainer[] mapJoinTables,
-      MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+      MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
 
     TezContext tezContext = (TezContext) MapredContext.get();
     Map<Integer, String> parentToInput = desc.getParentToInput();
@@ -106,7 +107,7 @@ public class HashTableLoader implements 
         Long keyCountObj = parentKeyCounts.get(pos);
         long keyCount = (keyCountObj == null) ? -1 : keyCountObj.longValue();
         MapJoinTableContainer tableContainer = useOptimizedTables
-            ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount)
+            ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage)
             : new HashMapWrapper(hconf, keyCount);
 
         while (kvReader.next()) {