You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2016/05/06 20:42:57 UTC

[26/50] [abbrv] hive git commit: HIVE-12837 : Better memory estimation/allocation for hybrid grace hash join during hash table loading (Wei Zheng, reviewed by Vikram Dixit K)

HIVE-12837 : Better memory estimation/allocation for hybrid grace hash join during hash table loading (Wei Zheng, reviewed by Vikram Dixit K)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cbebb4d7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cbebb4d7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cbebb4d7

Branch: refs/heads/java8
Commit: cbebb4d78064a9098e4145a0f7532f08885c9b27
Parents: a88050b
Author: Wei Zheng <we...@apache.org>
Authored: Wed May 4 23:09:08 2016 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Wed May 4 23:09:08 2016 -0700

----------------------------------------------------------------------
 .../persistence/HybridHashTableContainer.java   | 60 +++++++++++++++-----
 .../ql/exec/persistence/KeyValueContainer.java  |  4 ++
 2 files changed, 51 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cbebb4d7/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index f5da5a4..5552dfb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -90,6 +90,7 @@ public class HybridHashTableContainer
   private boolean lastPartitionInMem;           // only one (last one) partition is left in memory
   private final int memoryCheckFrequency;       // how often (# of rows apart) to check if memory is full
   private final HybridHashTableConf nwayConf;         // configuration for n-way join
+  private int writeBufferSize;                  // write buffer size for BytesBytesMultiHashMap
 
   /** The OI used to deserialize values. We never deserialize keys. */
   private LazyBinaryStructObjectInspector internalValueOi;
@@ -294,7 +295,6 @@ public class HybridHashTableContainer
     this.spillLocalDirs = spillLocalDirs;
 
     this.nwayConf = nwayConf;
-    int writeBufferSize;
     int numPartitions;
     if (nwayConf == null) { // binary join
       numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize);
@@ -327,7 +327,9 @@ public class HybridHashTableContainer
         writeBufferSize : Integer.highestOneBit(writeBufferSize);
 
     // Cap WriteBufferSize to avoid large preallocations
-    writeBufferSize = writeBufferSize < minWbSize ? minWbSize : Math.min(maxWbSize, writeBufferSize);
+    // We also want to limit the size of writeBuffer, because we normally have 16 partitions, that
+    // makes spilling prediction (isMemoryFull) to be too defensive which results in unnecessary spilling
+    writeBufferSize = writeBufferSize < minWbSize ? minWbSize : Math.min(maxWbSize / numPartitions, writeBufferSize);
 
     this.bloom1 = new BloomFilter(newKeyCount);
 
@@ -417,6 +419,11 @@ public class HybridHashTableContainer
     for (HashPartition hp : hashPartitions) {
       if (hp.hashMap != null) {
         memUsed += hp.hashMap.memorySize();
+      } else {
+        // also include the still-in-memory sidefile, before it has been truely spilled
+        if (hp.sidefileKVContainer != null) {
+          memUsed += hp.sidefileKVContainer.numRowsInReadBuffer() * tableRowSize;
+        }
       }
     }
     return memoryUsed = memUsed;
@@ -454,6 +461,8 @@ public class HybridHashTableContainer
   private MapJoinKey internalPutRow(KeyValueHelper keyValueHelper,
           Writable currentKey, Writable currentValue) throws SerDeException, IOException {
 
+    boolean putToSidefile = false; // by default we put row into partition in memory
+
     // Next, put row into corresponding hash partition
     int keyHash = keyValueHelper.getHashFromKey();
     int partitionId = keyHash & (hashPartitions.length - 1);
@@ -461,15 +470,13 @@ public class HybridHashTableContainer
 
     bloom1.addLong(keyHash);
 
-    if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) {
-      KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer();
-      kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue);
-    } else {
-      hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation
-      totalInMemRowCount++;
-
-      if ((totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0 &&  // check periodically
-          !lastPartitionInMem) { // If this is the only partition in memory, proceed without check
+    if (isOnDisk(partitionId) || isHashMapSpilledOnCreation(partitionId)) { // destination on disk
+      putToSidefile = true;
+    } else {  // destination in memory
+      if (!lastPartitionInMem &&        // If this is the only partition in memory, proceed without check
+          (hashPartition.size() == 0 || // Destination partition being empty indicates a write buffer
+                                        // will be allocated, thus need to check if memory is full
+           (totalInMemRowCount & (this.memoryCheckFrequency - 1)) == 0)) {  // check periodically
         if (isMemoryFull()) {
           if ((numPartitionsSpilled == hashPartitions.length - 1) ) {
             LOG.warn("This LAST partition in memory won't be spilled!");
@@ -479,9 +486,16 @@ public class HybridHashTableContainer
               int biggest = biggestPartition();
               spillPartition(biggest);
               this.setSpill(true);
+              if (partitionId == biggest) { // destination hash partition has just be spilled
+                putToSidefile = true;
+              }
             } else {                // n-way join
               LOG.info("N-way spilling: spill tail partition from previously loaded small tables");
+              int biggest = nwayConf.getNextSpillPartition();
               memoryThreshold += nwayConf.spill();
+              if (biggest != 0 && partitionId == biggest) { // destination hash partition has just be spilled
+                putToSidefile = true;
+              }
               LOG.info("Memory threshold has been increased to: " + memoryThreshold);
             }
             numPartitionsSpilled++;
@@ -490,6 +504,15 @@ public class HybridHashTableContainer
       }
     }
 
+    // Now we know where to put row
+    if (putToSidefile) {
+      KeyValueContainer kvContainer = hashPartition.getSidefileKVContainer();
+      kvContainer.add((HiveKey) currentKey, (BytesWritable) currentValue);
+    } else {
+      hashPartition.hashMap.put(keyValueHelper, keyHash); // Pass along hashcode to avoid recalculation
+      totalInMemRowCount++;
+    }
+
     return null; // there's no key to return
   }
 
@@ -513,11 +536,21 @@ public class HybridHashTableContainer
   }
 
   /**
-   * Check if the memory threshold is reached
+   * Check if the memory threshold is about to be reached.
+   * Since all the write buffer will be lazily allocated in BytesBytesMultiHashMap, we need to
+   * consider those as well.
    * @return true if memory is full, false if not
    */
   private boolean isMemoryFull() {
-    return refreshMemoryUsed() >= memoryThreshold;
+    int numPartitionsInMem = 0;
+
+    for (HashPartition hp : hashPartitions) {
+      if (!hp.isHashMapOnDisk()) {
+        numPartitionsInMem++;
+      }
+    }
+
+    return refreshMemoryUsed() + writeBufferSize * numPartitionsInMem >= memoryThreshold;
   }
 
   /**
@@ -561,6 +594,7 @@ public class HybridHashTableContainer
         new com.esotericsoftware.kryo.io.Output(outputStream);
     Kryo kryo = SerializationUtilities.borrowKryo();
     try {
+      LOG.info("Trying to spill hash partition " + partitionId + " ...");
       kryo.writeObject(output, partition.hashMap);  // use Kryo to serialize hashmap
       output.close();
       outputStream.close();

http://git-wip-us.apache.org/repos/asf/hive/blob/cbebb4d7/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
index e2b22d3..72faf8b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/KeyValueContainer.java
@@ -215,6 +215,10 @@ public class KeyValueContainer {
     return row;
   }
 
+  public int numRowsInReadBuffer() {
+    return rowsInReadBuffer;
+  }
+
   public int size() {
     return rowsInReadBuffer + rowsOnDisk;
   }