You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/09/10 21:02:09 UTC

hive git commit: HIVE-11587 : Fix memory estimates for mapjoin hashtable (Wei Zheng, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master 701440734 -> b4be31f4a


HIVE-11587 : Fix memory estimates for mapjoin hashtable (Wei Zheng, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b4be31f4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b4be31f4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b4be31f4

Branch: refs/heads/master
Commit: b4be31f4aa497cd09dd1e513eabe951044a7ff73
Parents: 7014407
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Sep 10 11:57:36 2015 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Thu Sep 10 11:57:36 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 +
 .../hadoop/hive/ql/exec/MapJoinOperator.java    |  5 ++
 .../persistence/BytesBytesMultiHashMap.java     | 11 +++-
 .../persistence/HybridHashTableContainer.java   | 68 ++++++++++++--------
 .../hive/ql/exec/tez/HashTableLoader.java       |  7 +-
 .../apache/hadoop/hive/serde2/WriteBuffers.java | 10 ++-
 6 files changed, 68 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b4be31f4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d2c5885..7f29da2 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -767,6 +767,8 @@ public class HiveConf extends Configuration {
     HIVEMAPJOINUSEOPTIMIZEDTABLE("hive.mapjoin.optimized.hashtable", true,
         "Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,\n" +
         "because memory-optimized hashtable cannot be serialized."),
+    HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT("hive.mapjoin.optimized.hashtable.probe.percent",
+        (float) 0.5, "Probing space percentage of the optimized hashtable"),
     HIVEUSEHYBRIDGRACEHASHJOIN("hive.mapjoin.hybridgrace.hashtable", true, "Whether to use hybrid" +
         "grace hash join as the join method for mapjoin. Tez only."),
     HIVEHYBRIDGRACEHASHJOINMEMCHECKFREQ("hive.mapjoin.hybridgrace.memcheckfrequency", 1024, "For " +

http://git-wip-us.apache.org/repos/asf/hive/blob/b4be31f4/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 1b9d7ef..a9159a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -592,6 +592,11 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
 
     // Deserialize the on-disk hash table
     // We're sure this part is smaller than memory limit
+    if (rowCount <= 0) {
+      rowCount = 1024 * 1024; // Since rowCount is used later to instantiate a BytesBytesMultiHashMap
+                              // as the initialCapacity which cannot be 0, we provide a reasonable
+                              // positive number here
+    }
     BytesBytesMultiHashMap restoredHashMap = partition.getHashMapFromDisk(rowCount);
     rowCount += restoredHashMap.getNumValues();
     LOG.info("Hybrid Grace Hash Join: Deserializing spilled hash partition...");

http://git-wip-us.apache.org/repos/asf/hive/blob/b4be31f4/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
index 3bba890..77c7ead 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
@@ -153,9 +153,11 @@ public final class BytesBytesMultiHashMap {
   /** 8 Gb of refs is the max capacity if memory limit is not specified. If someone has 100s of
    * Gbs of memory (this might happen pretty soon) we'd need to string together arrays anyway. */
   private final static int DEFAULT_MAX_CAPACITY = 1024 * 1024 * 1024;
+  /** Make sure maxCapacity has a lower limit */
+  private final static int DEFAULT_MIN_MAX_CAPACITY = 16 * 1024 * 1024;
 
   public BytesBytesMultiHashMap(int initialCapacity,
-      float loadFactor, int wbSize, long memUsage) {
+      float loadFactor, int wbSize, long maxProbeSize) {
     if (loadFactor < 0 || loadFactor > 1) {
       throw new AssertionError("Load factor must be between (0, 1].");
     }
@@ -163,8 +165,11 @@ public final class BytesBytesMultiHashMap {
     initialCapacity = (Long.bitCount(initialCapacity) == 1)
         ? initialCapacity : nextHighestPowerOfTwo(initialCapacity);
     // 8 bytes per long in the refs, assume data will be empty. This is just a sanity check.
-    int maxCapacity =  (memUsage <= 0) ? DEFAULT_MAX_CAPACITY
-        : (int)Math.min((long)DEFAULT_MAX_CAPACITY, memUsage / 8);
+    int maxCapacity =  (maxProbeSize <= 0) ? DEFAULT_MAX_CAPACITY
+        : (int)Math.min((long)DEFAULT_MAX_CAPACITY, maxProbeSize / 8);
+    if (maxCapacity < DEFAULT_MIN_MAX_CAPACITY) {
+      maxCapacity = DEFAULT_MIN_MAX_CAPACITY;
+    }
     if (maxCapacity < initialCapacity || initialCapacity <= 0) {
       // Either initialCapacity is too large, or nextHighestPowerOfTwo overflows
       initialCapacity = (Long.bitCount(maxCapacity) == 1)

http://git-wip-us.apache.org/repos/asf/hive/blob/b4be31f4/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index ff64f52..52c02ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -76,7 +76,6 @@ public class HybridHashTableContainer
   private int totalInMemRowCount = 0;           // total number of small table rows in memory
   private long memoryThreshold;                 // the max memory limit that can be allocated
   private long memoryUsed;                      // the actual memory used
-  private int writeBufferSize;                  // write buffer size for this HybridHashTableContainer
   private final long tableRowSize;              // row size of the small table
   private boolean isSpilled;                    // whether there's any spilled partition
   private int toSpillPartitionId;               // the partition into which to spill the big table row;
@@ -107,7 +106,7 @@ public class HybridHashTableContainer
     Path hashMapLocalPath;                  // Local file system path for spilled hashMap
     boolean hashMapOnDisk;                  // Status of hashMap. true: on disk, false: in memory
     boolean hashMapSpilledOnCreation;       // When there's no enough memory, cannot create hashMap
-    int threshold;                          // Used to create an empty BytesBytesMultiHashMap
+    int initialCapacity;                    // Used to create an empty BytesBytesMultiHashMap
     float loadFactor;                       // Same as above
     int wbSize;                             // Same as above
     int rowsOnDisk;                         // How many rows saved to the on-disk hashmap (if on disk)
@@ -115,17 +114,17 @@ public class HybridHashTableContainer
     /* It may happen that there's not enough memory to instantiate a hashmap for the partition.
      * In that case, we don't create the hashmap, but pretend the hashmap is directly "spilled".
      */
-    public HashPartition(int threshold, float loadFactor, int wbSize, long memUsage,
+    public HashPartition(int initialCapacity, float loadFactor, int wbSize, long maxProbeSize,
                          boolean createHashMap) {
       if (createHashMap) {
-        // Hash map should be at least the size of our designated wbSize
-        memUsage = Math.max(memUsage, wbSize);
-        hashMap = new BytesBytesMultiHashMap(threshold, loadFactor, wbSize, memUsage);
+        // Probe space should be at least equal to the size of our designated wbSize
+        maxProbeSize = Math.max(maxProbeSize, wbSize);
+        hashMap = new BytesBytesMultiHashMap(initialCapacity, loadFactor, wbSize, maxProbeSize);
       } else {
         hashMapSpilledOnCreation = true;
         hashMapOnDisk = true;
       }
-      this.threshold = threshold;
+      this.initialCapacity = initialCapacity;
       this.loadFactor = loadFactor;
       this.wbSize = wbSize;
     }
@@ -138,18 +137,18 @@ public class HybridHashTableContainer
     /* Restore the hashmap from disk by deserializing it.
      * Currently Kryo is used for this purpose.
      */
-    public BytesBytesMultiHashMap getHashMapFromDisk(int initialCapacity)
+    public BytesBytesMultiHashMap getHashMapFromDisk(int rowCount)
         throws IOException, ClassNotFoundException {
       if (hashMapSpilledOnCreation) {
-        return new BytesBytesMultiHashMap(Math.max(threshold, initialCapacity) , loadFactor, wbSize, -1);
+        return new BytesBytesMultiHashMap(rowCount, loadFactor, wbSize, -1);
       } else {
         InputStream inputStream = Files.newInputStream(hashMapLocalPath);
         com.esotericsoftware.kryo.io.Input input = new com.esotericsoftware.kryo.io.Input(inputStream);
         Kryo kryo = Utilities.runtimeSerializationKryo.get();
         BytesBytesMultiHashMap restoredHashMap = kryo.readObject(input, BytesBytesMultiHashMap.class);
 
-        if (initialCapacity > 0) {
-          restoredHashMap.expandAndRehashToTarget(initialCapacity);
+        if (rowCount > 0) {
+          restoredHashMap.expandAndRehashToTarget(rowCount);
         }
 
         // some bookkeeping
@@ -237,7 +236,7 @@ public class HybridHashTableContainer
 
   public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable,
                                   long estimatedTableSize, HybridHashTableConf nwayConf)
- throws SerDeException, IOException {
+      throws SerDeException, IOException {
     this(HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEKEYCOUNTADJUSTMENT),
         HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD),
         HiveConf.getFloatVar(hconf,HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR),
@@ -245,12 +244,13 @@ public class HybridHashTableContainer
         HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE),
         HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHASHTABLEWBSIZE),
         HiveConf.getIntVar(hconf,HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS),
+        HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVEMAPJOINOPTIMIZEDTABLEPROBEPERCENT),
         estimatedTableSize, keyCount, memoryAvailable, nwayConf);
   }
 
   private HybridHashTableContainer(float keyCountAdj, int threshold, float loadFactor,
-      int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, long estimatedTableSize,
-      long keyCount, long memoryAvailable, HybridHashTableConf nwayConf)
+      int memCheckFreq, int minWbSize, int maxWbSize, int minNumParts, float probePercent,
+      long estimatedTableSize, long keyCount, long memoryAvailable, HybridHashTableConf nwayConf)
       throws SerDeException, IOException {
     directWriteHelper = new MapJoinBytesTableContainer.DirectKeyValueWriter();
 
@@ -262,10 +262,10 @@ public class HybridHashTableContainer
     memoryCheckFrequency = memCheckFreq;
 
     this.nwayConf = nwayConf;
+    int writeBufferSize;
     int numPartitions;
     if (nwayConf == null) { // binary join
-      numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize,
-          nwayConf);
+      numPartitions = calcNumPartitions(memoryThreshold, estimatedTableSize, minNumParts, minWbSize);
       writeBufferSize = (int)(estimatedTableSize / numPartitions);
     } else {                // n-way join
       // It has been calculated in HashTableLoader earlier, so just need to retrieve that number
@@ -302,21 +302,33 @@ public class HybridHashTableContainer
     int numPartitionsSpilledOnCreation = 0;
     memoryUsed = 0;
     int initialCapacity = Math.max(newKeyCount / numPartitions, threshold / numPartitions);
+    // maxCapacity should be calculated based on a percentage of memoryThreshold, which is to divide
+    // row size using long size
+    float probePercentage = (float) 8 / (tableRowSize + 8); // long_size / tableRowSize + long_size
+    if (probePercentage == 1) {
+      probePercentage = probePercent;
+    }
+    int maxCapacity = (int)(memoryThreshold * probePercentage);
     for (int i = 0; i < numPartitions; i++) {
       if (this.nwayConf == null ||                          // binary join
           nwayConf.getLoadedContainerList().size() == 0) {  // n-way join, first (biggest) small table
         if (i == 0) { // We unconditionally create a hashmap for the first hash partition
-          hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, memoryThreshold, true);
+          hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize,
+              maxCapacity, true);
         } else {
-          hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize, memoryThreshold,
-              memoryUsed + writeBufferSize < memoryThreshold);
+          // To check whether we have enough memory to allocate for another hash partition,
+          // we need to get the size of the first hash partition to get an idea.
+          hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize,
+              maxCapacity, memoryUsed + hashPartitions[0].hashMap.memorySize() < memoryThreshold);
         }
-      } else {                      // n-way join
+      } else {                                              // n-way join, all later small tables
         // For all later small tables, follow the same pattern of the previously loaded tables.
         if (this.nwayConf.doSpillOnCreation(i)) {
-          hashPartitions[i] = new HashPartition(threshold, loadFactor, writeBufferSize, memoryThreshold, false);
+          hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize,
+              maxCapacity, false);
         } else {
-          hashPartitions[i] = new HashPartition(threshold, loadFactor, writeBufferSize, memoryThreshold, true);
+          hashPartitions[i] = new HashPartition(initialCapacity, loadFactor, writeBufferSize,
+              maxCapacity, true);
         }
       }
 
@@ -513,7 +525,8 @@ public class HybridHashTableContainer
     Path path = Files.createTempFile("partition-" + partitionId + "-", null);
     OutputStream outputStream = Files.newOutputStream(path);
 
-    com.esotericsoftware.kryo.io.Output output = new com.esotericsoftware.kryo.io.Output(outputStream);
+    com.esotericsoftware.kryo.io.Output output =
+        new com.esotericsoftware.kryo.io.Output(outputStream);
     Kryo kryo = Utilities.runtimeSerializationKryo.get();
     kryo.writeObject(output, partition.hashMap);  // use Kryo to serialize hashmap
     output.close();
@@ -545,11 +558,10 @@ public class HybridHashTableContainer
    * @param dataSize total data size for the table
    * @param minNumParts minimum required number of partitions
    * @param minWbSize minimum required write buffer size
-   * @param nwayConf the n-way join configuration
    * @return number of partitions needed
    */
   public static int calcNumPartitions(long memoryThreshold, long dataSize, int minNumParts,
-      int minWbSize, HybridHashTableConf nwayConf) throws IOException {
+      int minWbSize) throws IOException {
     int numPartitions = minNumParts;
 
     if (memoryThreshold < minNumParts * minWbSize) {
@@ -803,7 +815,8 @@ public class HybridHashTableContainer
         return JoinUtil.JoinResult.SPILL;
       }
       else {
-        aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(output.getData(), 0, output.getLength(), hashMapResult);
+        aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(output.getData(), 0,
+            output.getLength(), hashMapResult);
         dummyRow = null;
         if (hashMapResult.hasRows()) {
           return JoinUtil.JoinResult.MATCH;
@@ -941,7 +954,8 @@ public class HybridHashTableContainer
         return JoinUtil.JoinResult.SPILL;
       }
       else {
-        aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(bytes, offset, length, hashMapResult);
+        aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(bytes, offset, length,
+            hashMapResult);
         dummyRow = null;
         if (hashMapResult.hasRows()) {
           return JoinUtil.JoinResult.MATCH;

http://git-wip-us.apache.org/repos/asf/hive/blob/b4be31f4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
index 2b6571b..f7d165a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
@@ -84,6 +84,7 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
 
     // Get the total available memory from memory manager
     long totalMapJoinMemory = desc.getMemoryNeeded();
+    LOG.info("Memory manager allocates " + totalMapJoinMemory + " bytes for the loading hashtable.");
     if (totalMapJoinMemory <= 0) {
       totalMapJoinMemory = HiveConf.getLongVar(
         hconf, HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
@@ -128,11 +129,9 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
       long memory = tableMemorySizes.get(biggest);
       int numPartitions = 0;
       try {
-        numPartitions = HybridHashTableContainer.calcNumPartitions(memory,
-            maxSize,
+        numPartitions = HybridHashTableContainer.calcNumPartitions(memory, maxSize,
             HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINNUMPARTITIONS),
-            HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE),
-            nwayConf);
+            HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHYBRIDGRACEHASHJOINMINWBSIZE));
       } catch (IOException e) {
         throw new HiveException(e);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/b4be31f4/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
index 05d9359..62250ec 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/WriteBuffers.java
@@ -61,7 +61,6 @@ public final class WriteBuffers implements RandomAccessOutput {
     this.offsetMask = this.wbSize - 1;
     this.maxSize = maxSize;
     writePos.bufferIndex = -1;
-    nextBufferToWrite();
   }
 
   public int readVInt() {
@@ -207,6 +206,9 @@ public final class WriteBuffers implements RandomAccessOutput {
 
   @Override
   public void write(byte[] b, int off, int len) {
+    if (writePos.bufferIndex == -1) {
+      nextBufferToWrite();
+    }
     int srcOffset = 0;
     while (srcOffset < len) {
       int toWrite = Math.min(len - srcOffset, wbSize - writePos.offset);
@@ -355,6 +357,9 @@ public final class WriteBuffers implements RandomAccessOutput {
 
 
   public long getWritePoint() {
+    if (writePos.bufferIndex == -1) {
+      nextBufferToWrite();
+    }
     return ((long)writePos.bufferIndex << wbSizeLog2) + writePos.offset;
   }
 
@@ -498,6 +503,9 @@ public final class WriteBuffers implements RandomAccessOutput {
   }
 
   public void seal() {
+    if (writePos.bufferIndex == -1) {
+      return;
+    }
     if (writePos.offset < (wbSize * 0.8)) { // arbitrary
       byte[] smallerBuffer = new byte[writePos.offset];
       System.arraycopy(writePos.buffer, 0, smallerBuffer, 0, writePos.offset);