You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2015/07/20 22:12:59 UTC

[49/50] [abbrv] hive git commit: HIVE-11262: Skip MapJoin processing if the join hash table is empty (Jason Dere, reviewed by Vikram Dixit)

HIVE-11262: Skip MapJoin processing if the join hash table is empty (Jason Dere, reviewed by Vikram Dixit)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/941610f2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/941610f2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/941610f2

Branch: refs/heads/spark
Commit: 941610f2a343273d448e5344ee759c3cc7032863
Parents: a5cc034
Author: Jason Dere <jd...@hortonworks.com>
Authored: Mon Jul 20 10:47:37 2015 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Mon Jul 20 10:47:37 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/MapJoinOperator.java    | 59 ++++++++++++++++++--
 .../persistence/HybridHashTableContainer.java   | 27 +++++++++
 .../persistence/MapJoinBytesTableContainer.java |  5 ++
 .../exec/persistence/MapJoinTableContainer.java |  5 ++
 .../fast/VectorMapJoinFastHashTable.java        |  5 ++
 .../fast/VectorMapJoinFastTableContainer.java   |  5 ++
 .../hashtable/VectorMapJoinHashTable.java       |  4 ++
 .../VectorMapJoinOptimizedHashTable.java        |  4 ++
 8 files changed, 108 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 15cafdd..a40f0a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -55,6 +55,8 @@ import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -176,8 +178,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
                 }
               });
       result.add(future);
-    } else if (mapContext == null || mapContext.getLocalWork() == null
-        || mapContext.getLocalWork().getInputFileChangeSensitive() == false) {
+    } else if (!isInputFileChangeSensitive(mapContext)) {
       loadHashTable(mapContext, mrContext);
       hashTblInitedOnce = true;
     }
@@ -276,9 +277,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
       ExecMapperContext mapContext, MapredContext mrContext) throws HiveException {
     loadCalled = true;
 
-    if (this.hashTblInitedOnce
-        && ((mapContext == null) || (mapContext.getLocalWork() == null) || (mapContext
-            .getLocalWork().getInputFileChangeSensitive() == false))) {
+    if (canSkipReload(mapContext)) {
       // no need to reload
       return new ImmutablePair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]>(
           mapJoinTables, mapJoinTableSerdes);
@@ -306,6 +305,11 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
 
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
 
+    if (canSkipJoinProcessing(mapContext)) {
+      LOG.info("Skipping big table join processing for " + this.toString());
+      this.setDone(true);
+    }
+
     return pair;
   }
 
@@ -611,7 +615,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
     }
 
     container.setTotalInMemRowCount(container.getTotalInMemRowCount()
-        + restoredHashMap.getNumValues() + kvContainer.size());
+        + restoredHashMap.getNumValues());
     kvContainer.clear();
 
     spilledMapJoinTables[pos] = new MapJoinBytesTableContainer(restoredHashMap);
@@ -656,4 +660,47 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
   public OperatorType getType() {
     return OperatorType.MAPJOIN;
   }
+
+  protected boolean isInputFileChangeSensitive(ExecMapperContext mapContext) {
+    return !(mapContext == null
+        || mapContext.getLocalWork() == null
+        || mapContext.getLocalWork().getInputFileChangeSensitive() == false);
+  }
+
+  protected boolean canSkipReload(ExecMapperContext mapContext) {
+    return (this.hashTblInitedOnce && !isInputFileChangeSensitive(mapContext));
+  }
+
+  // If the loaded hash table is empty, for some conditions we can skip processing the big table rows.
+  protected boolean canSkipJoinProcessing(ExecMapperContext mapContext) {
+    if (!canSkipReload(mapContext)) {
+      return false;
+    }
+
+    JoinCondDesc[] joinConds = getConf().getConds();
+    if (joinConds.length > 0) {
+      for (JoinCondDesc joinCond : joinConds) {
+        if (joinCond.getType() != JoinDesc.INNER_JOIN) {
+          return false;
+        }
+      }
+    } else {
+      return false;
+    }
+
+    boolean skipJoinProcessing = false;
+    for (int idx = 0; idx < mapJoinTables.length; ++idx) {
+      if (idx == getConf().getPosBigTable()) {
+        continue;
+      }
+      MapJoinTableContainer mapJoinTable = mapJoinTables[idx];
+      if (mapJoinTable.size() == 0) {
+        // If any table is empty, an inner join involving the tables should yield 0 rows.
+        LOG.info("Hash table number " + idx + " is empty");
+        skipJoinProcessing = true;
+        break;
+      }
+    }
+    return skipJoinProcessing;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index e338a31..0a6461f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -110,6 +110,7 @@ public class HybridHashTableContainer
     int threshold;                          // Used to create an empty BytesBytesMultiHashMap
     float loadFactor;                       // Same as above
     int wbSize;                             // Same as above
+    int rowsOnDisk;                         // How many rows saved to the on-disk hashmap (if on disk)
 
     /* It may happen that there's not enough memory to instantiate a hashmap for the partition.
      * In that case, we don't create the hashmap, but pretend the hashmap is directly "spilled".
@@ -149,6 +150,10 @@ public class HybridHashTableContainer
           restoredHashMap.expandAndRehashToTarget(initialCapacity);
         }
 
+        // some bookkeeping
+        rowsOnDisk = 0;
+        hashMapOnDisk = false;
+
         input.close();
         inputStream.close();
         Files.delete(hashMapLocalPath);
@@ -197,6 +202,8 @@ public class HybridHashTableContainer
         } catch (Throwable ignored) {
         }
         hashMapLocalPath = null;
+        rowsOnDisk = 0;
+        hashMapOnDisk = false;
       }
 
       if (sidefileKVContainer != null) {
@@ -214,6 +221,16 @@ public class HybridHashTableContainer
         matchfileRowBytesContainer = null;
       }
     }
+
+    public int size() {
+      if (isHashMapOnDisk()) {
+        // Rows are in a combination of the on-disk hashmap and the sidefile
+        return rowsOnDisk + (sidefileKVContainer != null ? sidefileKVContainer.size() : 0);
+      } else {
+        // All rows should be in the in-memory hashmap
+        return hashMap.size();
+      }
+    }
   }
 
   public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable,
@@ -507,6 +524,7 @@ public class HybridHashTableContainer
     memoryUsed -= memFreed;
     LOG.info("Memory usage after spilling: " + memoryUsed);
 
+    partition.rowsOnDisk = inMemRowCount;
     totalInMemRowCount -= inMemRowCount;
     partition.hashMap.clear();
     return memFreed;
@@ -959,4 +977,13 @@ public class HybridHashTableContainer
         numPartitionsInMem + " partitions in memory have been processed; " +
         numPartitionsOnDisk + " partitions have been spilled to disk and will be processed next.");
   }
+
+  @Override
+  public int size() {
+    int totalSize = 0;
+    for (HashPartition hashPartition : hashPartitions) {
+      totalSize += hashPartition.size();
+    }
+    return totalSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
index 83a1521..5df8e2b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
@@ -736,4 +736,9 @@ public class MapJoinBytesTableContainer
   public boolean hasSpill() {
     return false;
   }
+
+  @Override
+  public int size() {
+    return hashMap.size();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
index 9d8cbcb..869aefd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
@@ -106,4 +106,9 @@ public interface MapJoinTableContainer {
    * This is only applicable for HybridHashTableContainer.
    */
   boolean hasSpill();
+
+  /**
+   * Return the size of the hash table
+   */
+  int size();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
index fbe6b4c..666d666 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
@@ -65,4 +65,9 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
     this.loadFactor = loadFactor;
     this.writeBuffersSize = writeBuffersSize;
   }
+
+  @Override
+  public int size() {
+    return keysAssigned;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index 4b1d6f6..f2080f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -213,6 +213,11 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
     return false;
   }
 
+  @Override
+  public int size() {
+    return VectorMapJoinFastHashTable.size();
+  }
+
   /*
   @Override
   public com.esotericsoftware.kryo.io.Output getHybridBigTableSpillOutput(int partitionId) {

http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
index 7e219ec..c7e585c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
@@ -40,4 +40,8 @@ public interface VectorMapJoinHashTable {
   void putRow(BytesWritable currentKey, BytesWritable currentValue)
       throws SerDeException, HiveException, IOException;
 
+  /**
+   * Get hash table size
+   */
+  int size();
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
index a2d4e4c..b2b86d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
@@ -92,4 +92,8 @@ public abstract class VectorMapJoinOptimizedHashTable implements VectorMapJoinHa
     adapatorDirectAccess = (ReusableGetAdaptorDirectAccess) hashMapRowGetter;
   }
 
+  @Override
+  public int size() {
+    return originalTableContainer.size();
+  }
 }