You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/09/10 21:02:09 UTC
hive git commit: HIVE-11587 : Fix memory estimates for mapjoin
hashtable (Wei Zheng, reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 701440734 -> b4be31f4a
HIVE-11587 : Fix memory estimates for mapjoin hashtable (Wei Zheng, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b4be31f4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b4be31f4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b4be31f4
Branch: refs/heads/master
Commit: b4be31f4aa497cd09dd1e513eabe951044a7ff73
Parents: 7014407
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Sep 10 11:57:36 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Sep 10 11:57:36 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../hadoop/hive/ql/exec/MapJoinOperator.java | 5 ++
.../persistence/BytesBytesMultiHashMap.java | 11 +++-
.../persistence/HybridHashTableContainer.java | 68 ++++++++++++--------
.../hive/ql/exec/tez/HashTableLoader.java | 7 +-
.../apache/hadoop/hive/serde2/WriteBuffers.java | 10 ++-
6 files changed, 68 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/b4be31f4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d2c5885..7f29da2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -767,6 +767,8 @@ public class HiveConf extends Configuration {
HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", true,
"Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,\n" +
"because memory-optimized hashtable cannot be serialized."),
+ HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT("hive.mapjoin.optimized.hashtable.probe.percent",
+ (float) 0.5, "Probing space percentage of the optimized hashtable"),
HIVEUSEHYBRIDGRACEHASHJOIN("hive.mapjoin.hybridgrace.hashtable", true, "Whether to use hybrid" +
"grace hash join as the join method for mapjoin. Tez only."),
HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ("hive.mapjoin.hybridgrace.memcheckfrequency", 1024, "For " +
http://git-wip-us.apache.org/repos/asf/hive/blob/b4be31f4/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 1b9d7ef..a9159a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -592,6 +592,11 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
// Deserialize the on-disk hash table
// We're sure this part is smaller than memory limit
+ if (rowCount <= 0) {
+ rowCount = 1024 * 1024; // Since rowCount is used later to instantiate a BytesBytesMultiHashMap
+ // as the initialCapacity which cannot be 0, we provide a reasonable
+ // positive number here
+ }
BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk(rowCount);
rowCount += restoredHashMap.getNumValues();
LOG.info("Hybrid Grace Hash Join: Deserializing spilled hash partition...");
http://git-wip-us.apache.org/repos/asf/hive/blob/b4be31f4/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
index 3bba890..77c7ead 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
@@ -153,9 +153,11 @@ public final class BytesBytesMultiHashMap {
/** 8 Gb of refs is the max capacity if memory limit is not specified. If someone has 100s of
* Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */
private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024;
+ /** Make sure maxCapacity has a lower limit */
+ private final static int DEFAULT_MIN_MAX_CAPACITY = 16 * 1024 * 1024;
public BytesBytesMultiHashMap(int initialCapacity,
- float loadFactor, int wbSize, long memUsage) {
+ float loadFactor, int wbSize, long maxProbeSize) {
if (loadFactor < 0 || loadFactor > 1) {
throw new AssertionError("Load factor must be between (0, 1].");
}
@@ -163,8 +165,11 @@ public final class BytesBytesMultiHashMap {
initialCapacity = (Long.bitCount(initialCapacity) == 1)
? initialCapacity : nextHighestPowerOfTwo(initialCapacity);
// 8 bytes per long in the refs, assume data will be empty. This is just a sanity check.
- int maxCapacity = (memUsage <= 0) ? DEFAULT_MAX_CAPACITY
- : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8);
+ int maxCapacity = (maxProbeSize <= 0) ? DEFAULT_MAX_CAPACITY
+ : (int)Math.min((long)DEFAULT_MAX_CAPACITY, maxProbeSize / 8);
+ if (maxCapacity < DEFAULT_MIN_MAX_CAPACITY) {
+ maxCapacity = DEFAULT_MIN_MAX_CAPACITY;
+ }
if (maxCapacity < initialCapacity || initialCapacity <= 0) {
// Either initialCapacity is too large, or nextHighestPowerOfTwo overflows
initialCapacity = (Long.bitCount(maxCapacity) == 1)
http://git-wip-us.apache.org/repos/asf/hive/blob/b4be31f4/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index ff64f52..52c02ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -76,7 +76,6 @@ public class HybridHashTableContainer
private int totalInMemRowCount = 0; // total number of small table rows in memory
private long memoryThreshold; // the max memory limit that can be allocated
private long memoryUsed; // the actual memory used
- private int writeBufferSize; // write buffer size for this HybridHashTableContainer
private final long tableRowSize; // row size of the small table
private boolean isSpilled; // whether there's any spilled partition
private int toSpillPartitionId; // the partition into which to spill the big table row;
@@ -107,7 +106,7 @@ public class HybridHashTableContainer
Path hashMapLocalPath; // Local file system path for spilled hashMap
boolean hashMapOnDisk; // Status of hashMap. true: on disk, false: in memory
boolean hashMapSpilledOnCreation; // When there's no enough memory, cannot create hashMap
- int threshold; // Used to create an empty BytesBytesMultiHashMap
+ int initialCapacity; // Used to create an empty BytesBytesMultiHashMap
float loadFactor; // Same as above
int wbSize; // Same as above
int rowsOnDisk; // How many rows saved to the on-disk hashmap (if on disk)
@@ -115,17 +114,17 @@ public class HybridHashTableContainer
/* It may happen that there's not enough memory to instantiate a hashmap for the partition.
* In that case, we don't create the hashmap, but pretend the hashmap is directly "spilled".
*/
- public HashPartition(int threshold, float loadFactor, int wbSize, long memUsage,
+ public HashPartition(int initialCapacity, float loadFactor, int wbSize, long maxProbeSize,
boolean createHashMap) {
if (createHashMap) {
- // Hash map should be at least the size of our designated wbSize
- memUsage = Math.max(memUsage, wbSize);
- hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize, memUsage);
+ // Probe space should be at least equal to the size of our designated wbSize
+ maxProbeSize = Math.max(maxProbeSize, wbSize);
+ hashMap = new BytesBytesMultiHashMap(initialCapacity, loadFactor, wbSize, maxProbeSize);
} else {
hashMapSpilledOnCreation = true;
hashMapOnDisk = true;
}
- this.threshold = threshold;
+ this.initialCapacity = initialCapacity;
this.loadFactor = loadFactor;
this.wbSize = wbSize;
}
@@ -138,18 +137,18 @@ public class HybridHashTableContainer
/* Restore the hashmap from disk by deserializing it.
* Currently Kryo is used for this purpose.
*/
- public BytesBytesMultiHashMap getHashMapFromDisk(int initialCapacity)
+ public BytesBytesMultiHashMap getHashMapFromDisk(int rowCount)
throws IOException, ClassNotFoundException {
if (hashMapSpilledOnCreation) {
- return new BytesBytesMultiHashMap(Math.max(threshold, initialCapacity) , loadFactor, wbSize, -1);
+ return new BytesBytesMultiHashMap(rowCount, loadFactor, wbSize, -1);
} else {
InputStream inputStream = Files.newInputStream(hashMapLocalPath);
com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream);
Kryo kryo = Utilities.runtimeSerializationKryo.get();
BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class);
- if (initialCapacity > 0) {
- restoredHashMap.expandAndRehashToTarget(initialCapacity);
+ if (rowCount > 0) {
+ restoredHashMap.expandAndRehashToTarget(rowCount);
}
// some bookkeeping
@@ -237,7 +236,7 @@ public class HybridHashTableContainer
public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable,
long estimatedTableSize, HybridHashTableConf nwayConf)
- throws SerDeException, IOException {
+ throws SerDeException, IOException {
this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT),
HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
HiveConf.getFloatVar(hconf,HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR),
@@ -245,12 +244,13 @@ public class HybridHashTableContainer
HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE),
HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHASHTABLEWBSIZE),
HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS),
+ HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT),
estimatedTableSize, keyCount, memoryAvailable, nwayConf);
}
private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor,
- int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, long estimatedTableSize,
- long keyCount, long memoryAvailable, HybridHashTableConf nwayConf)
+ int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, float probePercent,
+ long estimatedTableSize, long keyCount, long memoryAvailable, HybridHashTableConf nwayConf)
throws SerDeException, IOException {
directWriteHelper = new MapJoinBytesTableContainer.DirectKeyValueWriter();
@@ -262,10 +262,10 @@ public class HybridHashTableContainer
memoryCheckFrequency = memCheckFreq;
this.nwayConf = nwayConf;
+ int writeBufferSize;
int numPartitions;
if (nwayConf == null) { // binary join
- numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize,
- nwayConf);
+ numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize);
writeBufferSize = (int)(estimatedTableSize / numPartitions);
} else { // n-way join
// It has been calculated in HashTableLoader earlier, so just need to retrieve that number
@@ -302,21 +302,33 @@ public class HybridHashTableContainer
int numPartitionsSpilledOnCreation = 0;
memoryUsed = 0;
int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / numPartitions);
+ // maxCapacity should be calculated based on a percentage of memoryThreshold, which is to divide
+ // row size using long size
+ float probePercentage = (float) 8 / (tableRowSize + 8); // long_size / tableRowSize + long_size
+ if (probePercentage == 1) {
+ probePercentage = probePercent;
+ }
+ int maxCapacity = (int)(memoryThreshold * probePercentage);
for (int i = 0; i < numPartitions; i++) {
if (this.nwayConf == null || // binary join
nwayConf.getLoadedContainerList().size() == 0) { // n-way join, first (biggest) small table
if (i == 0) { // We unconditionally create a hashmap for the first hash partition
- hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, memoryThreshold, true);
+ hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize,
+ maxCapacity, true);
} else {
- hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, memoryThreshold,
- memoryUsed + writeBufferSize < memoryThreshold);
+ // To check whether we have enough memory to allocate for another hash partition,
+ // we need to get the size of the first hash partition to get an idea.
+ hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize,
+ maxCapacity, memoryUsed + hashPartitions[0].hashMap.memorySize() < memoryThreshold);
}
- } else { // n-way join
+ } else { // n-way join, all later small tables
// For all later small tables, follow the same pattern of the previously loaded tables.
if (this.nwayConf.doSpillOnCreation(i)) {
- hashPartitions[i] = new HashPartition(threshold, loadFactor, writeBufferSize, memoryThreshold, false);
+ hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize,
+ maxCapacity, false);
} else {
- hashPartitions[i] = new HashPartition(threshold, loadFactor, writeBufferSize, memoryThreshold, true);
+ hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize,
+ maxCapacity, true);
}
}
@@ -513,7 +525,8 @@ public class HybridHashTableContainer
Path path = Files.createTempFile("partition-" + partitionId + "-", null);
OutputStream outputStream = Files.newOutputStream(path);
- com.esotericsoftware.kryo.io.Output output = new com.esotericsoftware.kryo.io.Output(outputStream);
+ com.esotericsoftware.kryo.io.Output output =
+ new com.esotericsoftware.kryo.io.Output(outputStream);
Kryo kryo = Utilities.runtimeSerializationKryo.get();
kryo.writeObject(output, partition.hashMap); // use Kryo to serialize hashmap
output.close();
@@ -545,11 +558,10 @@ public class HybridHashTableContainer
* @param dataSize total data size for the table
* @param minNumParts minimum required number of partitions
* @param minWbSize minimum required write buffer size
- * @param nwayConf the n-way join configuration
* @return number of partitions needed
*/
public static int calcNumPartitions(long memoryThreshold, long dataSize, int minNumParts,
- int minWbSize, HybridHashTableConf nwayConf) throws IOException {
+ int minWbSize) throws IOException {
int numPartitions = minNumParts;
if (memoryThreshold < minNumParts * minWbSize) {
@@ -803,7 +815,8 @@ public class HybridHashTableContainer
return JoinUtil.JoinResult.SPILL;
}
else {
- aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(output.getData(), 0, output.getLength(), hashMapResult);
+ aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(output.getData(), 0,
+ output.getLength(), hashMapResult);
dummyRow = null;
if (hashMapResult.hasRows()) {
return JoinUtil.JoinResult.MATCH;
@@ -941,7 +954,8 @@ public class HybridHashTableContainer
return JoinUtil.JoinResult.SPILL;
}
else {
- aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(bytes, offset, length, hashMapResult);
+ aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(bytes, offset, length,
+ hashMapResult);
dummyRow = null;
if (hashMapResult.hasRows()) {
return JoinUtil.JoinResult.MATCH;
http://git-wip-us.apache.org/repos/asf/hive/blob/b4be31f4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
index 2b6571b..f7d165a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
@@ -84,6 +84,7 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
// Get the total available memory from memory manager
long totalMapJoinMemory = desc.getMemoryNeeded();
+ LOG.info("Memory manager allocates " + totalMapJoinMemory + " bytes for the loading hashtable.");
if (totalMapJoinMemory <= 0) {
totalMapJoinMemory = HiveConf.getLongVar(
hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
@@ -128,11 +129,9 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
long memory = tableMemorySizes.get(biggest);
int numPartitions = 0;
try {
- numPartitions = HybridHashTableContainer.calcNumPartitions(memory,
- maxSize,
+ numPartitions = HybridHashTableContainer.calcNumPartitions(memory, maxSize,
HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS),
- HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE),
- nwayConf);
+ HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE));
} catch (IOException e) {
throw new HiveException(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/b4be31f4/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
index 05d9359..62250ec 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
@@ -61,7 +61,6 @@ public final class WriteBuffers implements RandomAccessOutput {
this.offsetMask = this.wbSize - 1;
this.maxSize = maxSize;
writePos.bufferIndex = -1;
- nextBufferToWrite();
}
public int readVInt() {
@@ -207,6 +206,9 @@ public final class WriteBuffers implements RandomAccessOutput {
@Override
public void write(byte[] b, int off, int len) {
+ if (writePos.bufferIndex == -1) {
+ nextBufferToWrite();
+ }
int srcOffset = 0;
while (srcOffset < len) {
int toWrite = Math.min(len - srcOffset, wbSize - writePos.offset);
@@ -355,6 +357,9 @@ public final class WriteBuffers implements RandomAccessOutput {
public long getWritePoint() {
+ if (writePos.bufferIndex == -1) {
+ nextBufferToWrite();
+ }
return ((long)writePos.bufferIndex << wbSizeLog2) + writePos.offset;
}
@@ -498,6 +503,9 @@ public final class WriteBuffers implements RandomAccessOutput {
}
public void seal() {
+ if (writePos.bufferIndex == -1) {
+ return;
+ }
if (writePos.offset < (wbSize * 0.8)) { // arbitrary
byte[] smallerBuffer = new byte[writePos.offset];
System.arraycopy(writePos.buffer, 0, smallerBuffer, 0, writePos.offset);