You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by su...@apache.org on 2015/07/20 22:12:59 UTC
[49/50] [abbrv] hive git commit: HIVE-11262: Skip MapJoin processing
if the join hash table is empty (Jason Dere, reviewed by Vikram Dixit)
HIVE-11262: Skip MapJoin processing if the join hash table is empty (Jason Dere, reviewed by Vikram Dixit)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/941610f2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/941610f2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/941610f2
Branch: refs/heads/spark
Commit: 941610f2a343273d448e5344ee759c3cc7032863
Parents: a5cc034
Author: Jason Dere <jd...@hortonworks.com>
Authored: Mon Jul 20 10:47:37 2015 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Mon Jul 20 10:47:37 2015 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/MapJoinOperator.java | 59 ++++++++++++++++++--
.../persistence/HybridHashTableContainer.java | 27 +++++++++
.../persistence/MapJoinBytesTableContainer.java | 5 ++
.../exec/persistence/MapJoinTableContainer.java | 5 ++
.../fast/VectorMapJoinFastHashTable.java | 5 ++
.../fast/VectorMapJoinFastTableContainer.java | 5 ++
.../hashtable/VectorMapJoinHashTable.java | 4 ++
.../VectorMapJoinOptimizedHashTable.java | 4 ++
8 files changed, 108 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 15cafdd..a40f0a9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -55,6 +55,8 @@ import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -176,8 +178,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
}
});
result.add(future);
- } else if (mapContext == null || mapContext.getLocalWork() == null
- || mapContext.getLocalWork().getInputFileChangeSensitive() == false) {
+ } else if (!isInputFileChangeSensitive(mapContext)) {
loadHashTable(mapContext, mrContext);
hashTblInitedOnce = true;
}
@@ -276,9 +277,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
ExecMapperContext mapContext, MapredContext mrContext) throws HiveException {
loadCalled = true;
- if (this.hashTblInitedOnce
- && ((mapContext == null) || (mapContext.getLocalWork() == null) || (mapContext
- .getLocalWork().getInputFileChangeSensitive() == false))) {
+ if (canSkipReload(mapContext)) {
// no need to reload
return new ImmutablePair<MapJoinTableContainer[], MapJoinTableContainerSerDe[]>(
mapJoinTables, mapJoinTableSerdes);
@@ -306,6 +305,11 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
+ if (canSkipJoinProcessing(mapContext)) {
+ LOG.info("Skipping big table join processing for " + this.toString());
+ this.setDone(true);
+ }
+
return pair;
}
@@ -611,7 +615,7 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
}
container.setTotalInMemRowCount(container.getTotalInMemRowCount()
- + restoredHashMap.getNumValues() + kvContainer.size());
+ + restoredHashMap.getNumValues());
kvContainer.clear();
spilledMapJoinTables[pos] = new MapJoinBytesTableContainer(restoredHashMap);
@@ -656,4 +660,47 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
public OperatorType getType() {
return OperatorType.MAPJOIN;
}
+
+ protected boolean isInputFileChangeSensitive(ExecMapperContext mapContext) {
+ return !(mapContext == null
+ || mapContext.getLocalWork() == null
+ || mapContext.getLocalWork().getInputFileChangeSensitive() == false);
+ }
+
+ protected boolean canSkipReload(ExecMapperContext mapContext) {
+ return (this.hashTblInitedOnce && !isInputFileChangeSensitive(mapContext));
+ }
+
+ // If the loaded hash table is empty, for some conditions we can skip processing the big table rows.
+ protected boolean canSkipJoinProcessing(ExecMapperContext mapContext) {
+ if (!canSkipReload(mapContext)) {
+ return false;
+ }
+
+ JoinCondDesc[] joinConds = getConf().getConds();
+ if (joinConds.length > 0) {
+ for (JoinCondDesc joinCond : joinConds) {
+ if (joinCond.getType() != JoinDesc.INNER_JOIN) {
+ return false;
+ }
+ }
+ } else {
+ return false;
+ }
+
+ boolean skipJoinProcessing = false;
+ for (int idx = 0; idx < mapJoinTables.length; ++idx) {
+ if (idx == getConf().getPosBigTable()) {
+ continue;
+ }
+ MapJoinTableContainer mapJoinTable = mapJoinTables[idx];
+ if (mapJoinTable.size() == 0) {
+ // If any table is empty, an inner join involving the tables should yield 0 rows.
+ LOG.info("Hash table number " + idx + " is empty");
+ skipJoinProcessing = true;
+ break;
+ }
+ }
+ return skipJoinProcessing;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index e338a31..0a6461f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -110,6 +110,7 @@ public class HybridHashTableContainer
int threshold; // Used to create an empty BytesBytesMultiHashMap
float loadFactor; // Same as above
int wbSize; // Same as above
+ int rowsOnDisk; // How many rows saved to the on-disk hashmap (if on disk)
/* It may happen that there's not enough memory to instantiate a hashmap for the partition.
* In that case, we don't create the hashmap, but pretend the hashmap is directly "spilled".
@@ -149,6 +150,10 @@ public class HybridHashTableContainer
restoredHashMap.expandAndRehashToTarget(initialCapacity);
}
+ // some bookkeeping
+ rowsOnDisk = 0;
+ hashMapOnDisk = false;
+
input.close();
inputStream.close();
Files.delete(hashMapLocalPath);
@@ -197,6 +202,8 @@ public class HybridHashTableContainer
} catch (Throwable ignored) {
}
hashMapLocalPath = null;
+ rowsOnDisk = 0;
+ hashMapOnDisk = false;
}
if (sidefileKVContainer != null) {
@@ -214,6 +221,16 @@ public class HybridHashTableContainer
matchfileRowBytesContainer = null;
}
}
+
+ public int size() {
+ if (isHashMapOnDisk()) {
+ // Rows are in a combination of the on-disk hashmap and the sidefile
+ return rowsOnDisk + (sidefileKVContainer != null ? sidefileKVContainer.size() : 0);
+ } else {
+ // All rows should be in the in-memory hashmap
+ return hashMap.size();
+ }
+ }
}
public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable,
@@ -507,6 +524,7 @@ public class HybridHashTableContainer
memoryUsed -= memFreed;
LOG.info("Memory usage after spilling: " + memoryUsed);
+ partition.rowsOnDisk = inMemRowCount;
totalInMemRowCount -= inMemRowCount;
partition.hashMap.clear();
return memFreed;
@@ -959,4 +977,13 @@ public class HybridHashTableContainer
numPartitionsInMem + " partitions in memory have been processed; " +
numPartitionsOnDisk + " partitions have been spilled to disk and will be processed next.");
}
+
+ @Override
+ public int size() {
+ int totalSize = 0;
+ for (HashPartition hashPartition : hashPartitions) {
+ totalSize += hashPartition.size();
+ }
+ return totalSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
index 83a1521..5df8e2b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
@@ -736,4 +736,9 @@ public class MapJoinBytesTableContainer
public boolean hasSpill() {
return false;
}
+
+ @Override
+ public int size() {
+ return hashMap.size();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
index 9d8cbcb..869aefd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
@@ -106,4 +106,9 @@ public interface MapJoinTableContainer {
* This is only applicable for HybridHashTableContainer.
*/
boolean hasSpill();
+
+ /**
+ * Return the size of the hash table
+ */
+ int size();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
index fbe6b4c..666d666 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTable.java
@@ -65,4 +65,9 @@ public abstract class VectorMapJoinFastHashTable implements VectorMapJoinHashTab
this.loadFactor = loadFactor;
this.writeBuffersSize = writeBuffersSize;
}
+
+ @Override
+ public int size() {
+ return keysAssigned;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
index 4b1d6f6..f2080f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java
@@ -213,6 +213,11 @@ public class VectorMapJoinFastTableContainer implements VectorMapJoinTableContai
return false;
}
+ @Override
+ public int size() {
+ return VectorMapJoinFastHashTable.size();
+ }
+
/*
@Override
public com.esotericsoftware.kryo.io.Output getHybridBigTableSpillOutput(int partitionId) {
http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
index 7e219ec..c7e585c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/hashtable/VectorMapJoinHashTable.java
@@ -40,4 +40,8 @@ public interface VectorMapJoinHashTable {
void putRow(BytesWritable currentKey, BytesWritable currentValue)
throws SerDeException, HiveException, IOException;
+ /**
+ * Get hash table size
+ */
+ int size();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/941610f2/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
index a2d4e4c..b2b86d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedHashTable.java
@@ -92,4 +92,8 @@ public abstract class VectorMapJoinOptimizedHashTable implements VectorMapJoinHa
adapatorDirectAccess = (ReusableGetAdaptorDirectAccess) hashMapRowGetter;
}
+ @Override
+ public int size() {
+ return originalTableContainer.size();
+ }
}