You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/11/07 19:54:09 UTC
svn commit: r1637432 - in
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec: ./ mapjoin/ mr/
persistence/ tez/
Author: sershe
Date: Fri Nov 7 18:54:08 2014
New Revision: 1637432
URL: http://svn.apache.org/r1637432
Log:
HIVE-8556 : introduce overflow control and sanity check to BytesBytesMapJoin (Sergey Shelukhin, reviewed by Mostafa Mokhtar and Prasanth J)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableLoader.java Fri Nov 7 18:54:08 2014
@@ -33,6 +33,6 @@ public interface HashTableLoader {
void init(ExecMapperContext context, Configuration hconf, MapJoinOperator joinOp);
- void load(MapJoinTableContainer[] mapJoinTables, MapJoinTableContainerSerDe[] mapJoinTableSerdes)
- throws HiveException;
+ void load(MapJoinTableContainer[] mapJoinTables,
+ MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Nov 7 18:54:08 2014
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.HashTableLoaderFactory;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
@@ -187,7 +188,9 @@ public class MapJoinOperator extends Abs
}
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
loader.init(getExecContext(), hconf, this);
- loader.load(mapJoinTables, mapJoinTableSerdes);
+ long memUsage = (long)(MapJoinMemoryExhaustionHandler.getMaxHeapSize()
+ * conf.getHashTableMemoryUsage());
+ loader.load(mapJoinTables, mapJoinTableSerdes, memUsage);
if (!conf.isBucketMapJoin()) {
/*
* The issue with caching in case of bucket map join is that different tasks
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java Fri Nov 7 18:54:08 2014
@@ -55,22 +55,30 @@ public class MapJoinMemoryExhaustionHand
this.console = console;
this.maxMemoryUsage = maxMemoryUsage;
this.memoryMXBean = ManagementFactory.getMemoryMXBean();
- long maxHeapSize = memoryMXBean.getHeapMemoryUsage().getMax();
+ this.maxHeapSize = getMaxHeapSize(memoryMXBean);
+ percentageNumberFormat = NumberFormat.getInstance();
+ percentageNumberFormat.setMinimumFractionDigits(2);
+ LOG.info("JVM Max Heap Size: " + this.maxHeapSize);
+ }
+
+ public static long getMaxHeapSize() {
+ return getMaxHeapSize(ManagementFactory.getMemoryMXBean());
+ }
+
+ private static long getMaxHeapSize(MemoryMXBean bean) {
+ long maxHeapSize = bean.getHeapMemoryUsage().getMax();
/*
* According to the javadoc, getMax() can return -1. In this case
* default to 200MB. This will probably never actually happen.
*/
if(maxHeapSize == -1) {
- this.maxHeapSize = 200L * 1024L * 1024L;
LOG.warn("MemoryMXBean.getHeapMemoryUsage().getMax() returned -1, " +
"defaulting maxHeapSize to 200MB");
- } else {
- this.maxHeapSize = maxHeapSize;
+ return 200L * 1024L * 1024L;
}
- percentageNumberFormat = NumberFormat.getInstance();
- percentageNumberFormat.setMinimumFractionDigits(2);
- LOG.info("JVM Max Heap Size: " + this.maxHeapSize);
+ return maxHeapSize;
}
+
/**
* Throws MapJoinMemoryExhaustionException when the JVM has consumed the
* configured percentage of memory. The arguments are used simply for the error
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HashTableLoader.java Fri Nov 7 18:54:08 2014
@@ -72,7 +72,7 @@ public class HashTableLoader implements
@Override
public void load(
MapJoinTableContainer[] mapJoinTables,
- MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+ MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
String currentInputPath = context.getCurrentInputPath().toString();
LOG.info("******* Load from HashTable for input file: " + currentInputPath);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java Fri Nov 7 18:54:08 2014
@@ -149,13 +149,27 @@ public final class BytesBytesMultiHashMa
/** We have 39 bits to store list pointer from the first record; this is size limit */
final static long MAX_WB_SIZE = ((long)1) << 38;
+ /** 8 Gb of refs is the max capacity if memory limit is not specified. If someone has 100s of
+ * Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */
+ private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024;
- public BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) {
+ public BytesBytesMultiHashMap(int initialCapacity,
+ float loadFactor, int wbSize, long memUsage, int defaultCapacity) {
if (loadFactor < 0 || loadFactor > 1) {
throw new AssertionError("Load factor must be between (0, 1].");
}
+ assert initialCapacity > 0;
initialCapacity = (Long.bitCount(initialCapacity) == 1)
? initialCapacity : nextHighestPowerOfTwo(initialCapacity);
+ // 8 bytes per long in the refs, assume data will be empty. This is just a sanity check.
+ int maxCapacity = (memUsage <= 0) ? DEFAULT_MAX_CAPACITY
+ : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8);
+ if (maxCapacity < initialCapacity || initialCapacity <= 0) {
+ // Either initialCapacity is too large, or nextHighestPowerOfTwo overflows
+ initialCapacity = (Long.bitCount(maxCapacity) == 1)
+ ? maxCapacity : nextLowestPowerOfTwo(maxCapacity);
+ }
+
validateCapacity(initialCapacity);
startingHashBitCount = 63 - Long.numberOfLeadingZeros(initialCapacity);
this.loadFactor = loadFactor;
@@ -164,6 +178,11 @@ public final class BytesBytesMultiHashMa
resizeThreshold = (int)(initialCapacity * this.loadFactor);
}
+ @VisibleForTesting
+ BytesBytesMultiHashMap(int initialCapacity, float loadFactor, int wbSize) {
+ this(initialCapacity, loadFactor, wbSize, -1, 100000);
+ }
+
/** The source of keys and values to put into hashtable; avoids byte copying. */
public static interface KvSource {
/** Write key into output. */
@@ -644,6 +663,10 @@ public final class BytesBytesMultiHashMa
return Integer.highestOneBit(v) << 1;
}
+ private static int nextLowestPowerOfTwo(int v) {
+ return Integer.highestOneBit(v);
+ }
+
@VisibleForTesting
int getCapacity() {
return refs.length;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java Fri Nov 7 18:54:08 2014
@@ -60,17 +60,20 @@ public class MapJoinBytesTableContainer
private final List<Object> EMPTY_LIST = new ArrayList<Object>(0);
public MapJoinBytesTableContainer(Configuration hconf,
- MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException {
+ MapJoinObjectSerDeContext valCtx, long keyCount, long memUsage) throws SerDeException {
this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT),
HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR),
- HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE), valCtx, keyCount);
+ HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE),
+ valCtx, keyCount, memUsage);
}
private MapJoinBytesTableContainer(float keyCountAdj, int threshold, float loadFactor,
- int wbSize, MapJoinObjectSerDeContext valCtx, long keyCount) throws SerDeException {
- threshold = HashMapWrapper.calculateTableSize(keyCountAdj, threshold, loadFactor, keyCount);
- hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize);
+ int wbSize, MapJoinObjectSerDeContext valCtx, long keyCount, long memUsage)
+ throws SerDeException {
+ int newThreshold = HashMapWrapper.calculateTableSize(
+ keyCountAdj, threshold, loadFactor, keyCount);
+ hashMap = new BytesBytesMultiHashMap(newThreshold, loadFactor, wbSize, memUsage, threshold);
}
private LazyBinaryStructObjectInspector createInternalOi(
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1637432&r1=1637431&r2=1637432&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Fri Nov 7 18:54:08 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer;
@@ -69,7 +70,7 @@ public class HashTableLoader implements
@Override
public void load(
MapJoinTableContainer[] mapJoinTables,
- MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+ MapJoinTableContainerSerDe[] mapJoinTableSerdes, long memUsage) throws HiveException {
TezContext tezContext = (TezContext) MapredContext.get();
Map<Integer, String> parentToInput = desc.getParentToInput();
@@ -106,7 +107,7 @@ public class HashTableLoader implements
Long keyCountObj = parentKeyCounts.get(pos);
long keyCount = (keyCountObj == null) ? -1 : keyCountObj.longValue();
MapJoinTableContainer tableContainer = useOptimizedTables
- ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount)
+ ? new MapJoinBytesTableContainer(hconf, valCtx, keyCount, memUsage)
: new HashMapWrapper(hconf, keyCount);
while (kvReader.next()) {