You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2016/05/05 06:10:04 UTC
hive git commit: HIVE-12837 : Better memory estimation/allocation for
hybrid grace hash join during hash table loading (Wei Zheng,
reviewed by Vikram Dixit K)
Repository: hive
Updated Branches:
refs/heads/master a88050bd9 -> cbebb4d78
HIVE-12837 : Better memory estimation/allocation for hybrid grace hash join during hash table loading (Wei Zheng, reviewed by Vikram Dixit K)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cbebb4d7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cbebb4d7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cbebb4d7
Branch: refs/heads/master
Commit: cbebb4d78064a9098e4145a0f7532f08885c9b27
Parents: a88050b
Author: Wei Zheng <we...@apache.org>
Authored: Wed May 4 23:09:08 2016 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Wed May 4 23:09:08 2016 -0700
----------------------------------------------------------------------
.../persistence/HybridHashTableContainer.java | 60 +++++++++++++++-----
.../ql/exec/persistence/KeyValueContainer.java | 4 ++
2 files changed, 51 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cbebb4d7/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index f5da5a4..5552dfb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -90,6 +90,7 @@ public class HybridHashTableContainer
private boolean lastPartitionInMem; // only one (last one) partition is left in memory
private final int memoryCheckFrequency; // how often (# of rows apart) to check if memory is full
private final HybridHashTableConf nwayConf; // configuration for n-way join
+ private int writeBufferSize; // write buffer size for BytesBytesMultiHashMap
/** The OI used to deserialize values. We never deserialize keys. */
private LazyBinaryStructObjectInspector internalValueOi;
@@ -294,7 +295,6 @@ public class HybridHashTableContainer
this.spillLocalDirs = spillLocalDirs;
this.nwayConf = nwayConf;
- int writeBufferSize;
int numPartitions;
if (nwayConf == null) { // binary join
numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize);
@@ -327,7 +327,9 @@ public class HybridHashTableContainer
writeBufferSize : Integer.highestOneBit(writeBufferSize);
// Cap WriteBufferSize to avoid large preallocations
- writeBufferSize = writeBufferSize < minWbSize ? minWbSize : Math.min(maxWbSize, writeBufferSize);
+ // We also want to limit the size of writeBuffer, because we normally have 16 partitions, that
+ // makes spilling prediction (isMemoryFull) to be too defensive which results in unnecessary spilling
+ writeBufferSize = writeBufferSize < minWbSize ? minWbSize : Math.min(maxWbSize / numPartitions, writeBufferSize);
this.bloom1 = new BloomFilter(newKeyCount);
@@ -417,6 +419,11 @@ public class HybridHashTableContainer
for (HashPartition hp : hashPartitions) {
if (hp.hashMap != null) {
memUsed += hp.hashMap.memorySize();
+ } else {
+ // also include the still-in-memory sidefile, before it has been truely spilled
+ if (hp.sidefileKVContainer != null) {
+ memUsed += hp.sidefileKVContainer.numRowsInReadBuffer() * tableRowSize;
+ }
}
}
return memoryUsed = memUsed;
@@ -454,6 +461,8 @@ public class HybridHashTableContainer
private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper,
Writable currentKey, Writable currentValue) throws SerDeException, IOException {
+ boolean putToSidefile = false; // by default we put row into partition in memory
+
// Next, put row into corresponding hash partition
int keyHash = keyValueHelper.getHashFromKey();
int partitionId = keyHash & (hashPartitions.length - 1);
@@ -461,15 +470,13 @@ public class HybridHashTableContainer
bloom1.addLong(keyHash);
- if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) {
- KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer();
- kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue);
- } else {
- hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation
- totalInMemRowCount++;
-
- if ((totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0 && // check periodically
- !lastPartitionInMem) { // If this is the only partition in memory, proceed without check
+ if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { // destination on disk
+ putToSidefile = true;
+ } else { // destination in memory
+ if (!lastPartitionInMem && // If this is the only partition in memory, proceed without check
+ (hashPartition.size() == 0 || // Destination partition being empty indicates a write buffer
+ // will be allocated, thus need to check if memory is full
+ (totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0)) { // check periodically
if (isMemoryFull()) {
if ((numPartitionsSpilled == hashPartitions.length - 1) ) {
LOG.warn("This LAST partition in memory won't be spilled!");
@@ -479,9 +486,16 @@ public class HybridHashTableContainer
int biggest = biggestPartition();
spillPartition(biggest);
this.setSpill(true);
+ if (partitionId == biggest) { // destination hash partition has just be spilled
+ putToSidefile = true;
+ }
} else { // n-way join
LOG.info("N-way spilling: spill tail partition from previously loaded small tables");
+ int biggest = nwayConf.getNextSpillPartition();
memoryThreshold += nwayConf.spill();
+ if (biggest != 0 && partitionId == biggest) { // destination hash partition has just be spilled
+ putToSidefile = true;
+ }
LOG.info("Memory threshold has been increased to: " + memoryThreshold);
}
numPartitionsSpilled++;
@@ -490,6 +504,15 @@ public class HybridHashTableContainer
}
}
+ // Now we know where to put row
+ if (putToSidefile) {
+ KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer();
+ kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue);
+ } else {
+ hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation
+ totalInMemRowCount++;
+ }
+
return null; // there's no key to return
}
@@ -513,11 +536,21 @@ public class HybridHashTableContainer
}
/**
- * Check if the memory threshold is reached
+ * Check if the memory threshold is about to be reached.
+ * Since all the write buffer will be lazily allocated in BytesBytesMultiHashMap, we need to
+ * consider those as well.
* @return true if memory is full, false if not
*/
private boolean isMemoryFull() {
- return refreshMemoryUsed() >= memoryThreshold;
+ int numPartitionsInMem = 0;
+
+ for (HashPartition hp : hashPartitions) {
+ if (!hp.isHashMapOnDisk()) {
+ numPartitionsInMem++;
+ }
+ }
+
+ return refreshMemoryUsed() + writeBufferSize * numPartitionsInMem >= memoryThreshold;
}
/**
@@ -561,6 +594,7 @@ public class HybridHashTableContainer
new com.esotericsoftware.kryo.io.Output(outputStream);
Kryo kryo = SerializationUtilities.borrowKryo();
try {
+ LOG.info("Trying to spill hash partition " + partitionId + " ...");
kryo.writeObject(output, partition.hashMap); // use Kryo to serialize hashmap
output.close();
outputStream.close();
http://git-wip-us.apache.org/repos/asf/hive/blob/cbebb4d7/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
index e2b22d3..72faf8b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
@@ -215,6 +215,10 @@ public class KeyValueContainer {
return row;
}
+ public int numRowsInReadBuffer() {
+ return rowsInReadBuffer;
+ }
+
public int size() {
return rowsInReadBuffer + rowsOnDisk;
}