You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2017/05/17 06:14:37 UTC
hive git commit: HIVE-16582: HashTableLoader should log info about
the input, rows,
size etc. (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 045b8da50 -> 5a0b42537
HIVE-16582: HashTableLoader should log info about the input, rows, size etc. (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5a0b4253
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5a0b4253
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5a0b4253
Branch: refs/heads/master
Commit: 5a0b42537f97e76430bcf0a62c9c6fa26c4b1e01
Parents: 045b8da
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Tue May 16 23:14:26 2017 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Tue May 16 23:14:26 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/MapJoinOperator.java | 4 +
.../hadoop/hive/ql/exec/MemoryMonitorInfo.java | 166 +++++++++++++++++++
.../hive/ql/exec/SerializationUtilities.java | 1 +
.../hive/ql/exec/tez/HashTableLoader.java | 68 +++++---
.../fast/VectorMapJoinFastHashTableLoader.java | 73 ++++----
.../hive/ql/optimizer/ConvertJoinMapJoin.java | 48 +++---
.../hive/ql/optimizer/MapJoinProcessor.java | 4 +-
.../hive/ql/optimizer/SharedScanOptimizer.java | 4 +-
.../calcite/translator/HiveOpConverter.java | 2 +-
.../physical/GenMRSkewJoinProcessor.java | 2 +-
.../physical/GenSparkSkewJoinProcessor.java | 2 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 +-
.../apache/hadoop/hive/ql/plan/JoinDesc.java | 23 +--
.../apache/hadoop/hive/ql/plan/MapJoinDesc.java | 5 +-
.../hadoop/hive/ql/exec/TestOperators.java | 57 ++-----
.../clientpositive/llap/tez_smb_main.q.out | 18 +-
16 files changed, 328 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index 07aa2ea..4971707 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@ -132,6 +132,10 @@ public class MapJoinOperator extends AbstractMapJoinOperator<MapJoinDesc> implem
return HashTableLoaderFactory.getLoader(hconf);
}
+ public String getCacheKey() {
+ return cacheKey;
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
this.hconf = hconf;
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryMonitorInfo.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryMonitorInfo.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryMonitorInfo.java
new file mode 100644
index 0000000..24c5799
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MemoryMonitorInfo.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.io.Serializable;
+
+/**
+ * Contains information required for memory usage monitoring.
+ **/
+
+public class MemoryMonitorInfo implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // Variables for LLAP hash table loading memory monitor
+ private boolean isLlap;
+ private int executorsPerNode;
+ private int maxExecutorsOverSubscribeMemory;
+ private double memoryOverSubscriptionFactor;
+ private long noConditionalTaskSize;
+ private long adjustedNoConditionalTaskSize;
+ private long memoryCheckInterval;
+ private double hashTableInflationFactor;
+ private long threshold;
+
+ public MemoryMonitorInfo() {
+ }
+
+ public MemoryMonitorInfo(boolean isLlap, int executorsPerNode, int maxExecutorsOverSubscribeMemory,
+ double memoryOverSubscriptionFactor, long noConditionalTaskSize, long adjustedNoConditionalTaskSize,
+ long memoryCheckInterval, double hashTableInflationFactor) {
+ this.isLlap = isLlap;
+ this.executorsPerNode = executorsPerNode;
+ this.maxExecutorsOverSubscribeMemory = maxExecutorsOverSubscribeMemory;
+ this.memoryOverSubscriptionFactor = memoryOverSubscriptionFactor;
+ this.noConditionalTaskSize = noConditionalTaskSize;
+ this.adjustedNoConditionalTaskSize = adjustedNoConditionalTaskSize;
+ this.memoryCheckInterval = memoryCheckInterval;
+ this.hashTableInflationFactor = hashTableInflationFactor;
+ this.threshold = (long) (hashTableInflationFactor * adjustedNoConditionalTaskSize);
+ }
+
+ public MemoryMonitorInfo(MemoryMonitorInfo memoryMonitorInfo) {
+ this.isLlap = memoryMonitorInfo.isLlap;
+ this.executorsPerNode = memoryMonitorInfo.executorsPerNode;
+ this.maxExecutorsOverSubscribeMemory = memoryMonitorInfo.maxExecutorsOverSubscribeMemory;
+ this.memoryOverSubscriptionFactor = memoryMonitorInfo.memoryOverSubscriptionFactor;
+ this.noConditionalTaskSize = memoryMonitorInfo.noConditionalTaskSize;
+ this.adjustedNoConditionalTaskSize = memoryMonitorInfo.adjustedNoConditionalTaskSize;
+ this.memoryCheckInterval = memoryMonitorInfo.memoryCheckInterval;
+ this.hashTableInflationFactor = memoryMonitorInfo.hashTableInflationFactor;
+ this.threshold = memoryMonitorInfo.threshold;
+ }
+
+ public int getExecutorsPerNode() {
+ return executorsPerNode;
+ }
+
+ public void setExecutorsPerNode(final int executorsPerNode) {
+ this.executorsPerNode = executorsPerNode;
+ }
+
+ public int getMaxExecutorsOverSubscribeMemory() {
+ return maxExecutorsOverSubscribeMemory;
+ }
+
+ public void setMaxExecutorsOverSubscribeMemory(final int maxExecutorsOverSubscribeMemory) {
+ this.maxExecutorsOverSubscribeMemory = maxExecutorsOverSubscribeMemory;
+ }
+
+ public double getMemoryOverSubscriptionFactor() {
+ return memoryOverSubscriptionFactor;
+ }
+
+ public void setMemoryOverSubscriptionFactor(final double memoryOverSubscriptionFactor) {
+ this.memoryOverSubscriptionFactor = memoryOverSubscriptionFactor;
+ }
+
+ public long getNoConditionalTaskSize() {
+ return noConditionalTaskSize;
+ }
+
+ public void setNoConditionalTaskSize(final long noConditionalTaskSize) {
+ this.noConditionalTaskSize = noConditionalTaskSize;
+ }
+
+ public long getAdjustedNoConditionalTaskSize() {
+ return adjustedNoConditionalTaskSize;
+ }
+
+ public void setAdjustedNoConditionalTaskSize(final long adjustedNoConditionalTaskSize) {
+ this.adjustedNoConditionalTaskSize = adjustedNoConditionalTaskSize;
+ }
+
+ public long getMemoryCheckInterval() {
+ return memoryCheckInterval;
+ }
+
+ public void setMemoryCheckInterval(final long memoryCheckInterval) {
+ this.memoryCheckInterval = memoryCheckInterval;
+ }
+
+ public double getHashTableInflationFactor() {
+ return hashTableInflationFactor;
+ }
+
+ public void setHashTableInflationFactor(final double hashTableInflationFactor) {
+ this.hashTableInflationFactor = hashTableInflationFactor;
+ }
+
+ public long getThreshold() {
+ return threshold;
+ }
+
+ public void setLlap(final boolean llap) {
+ isLlap = llap;
+ }
+
+ public boolean isLlap() {
+ return isLlap;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+ sb.append(" isLlap: ").append(isLlap);
+ sb.append(" executorsPerNode: ").append(executorsPerNode);
+ sb.append(" maxExecutorsOverSubscribeMemory: ").append(maxExecutorsOverSubscribeMemory);
+ sb.append(" memoryOverSubscriptionFactor: ").append(memoryOverSubscriptionFactor);
+ sb.append(" memoryCheckInterval: ").append(memoryCheckInterval);
+ sb.append(" noConditionalTaskSize: ").append(noConditionalTaskSize);
+ sb.append(" adjustedNoConditionalTaskSize: ").append(adjustedNoConditionalTaskSize);
+ sb.append(" hashTableInflationFactor: ").append(hashTableInflationFactor);
+ sb.append(" threshold: ").append(threshold);
+ sb.append(" }");
+ return sb.toString();
+ }
+
+ public boolean doMemoryMonitoring() {
+ return isLlap && hashTableInflationFactor > 0.0d && noConditionalTaskSize > 0 &&
+ memoryCheckInterval > 0;
+ }
+
+ public long getEffectiveThreshold(final long maxMemoryPerExecutor) {
+ // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
+ // available for container/executor
+ return (long) Math.max(threshold, (2.0 / 3.0) * maxMemoryPerExecutor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index a29dd85..8902f6c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -250,6 +250,7 @@ public class SerializationUtilities {
kryo.register(SparkEdgeProperty.class);
kryo.register(SparkWork.class);
kryo.register(Pair.class);
+ kryo.register(MemoryMonitorInfo.class);
// This must be called after all the explicit register calls.
return kryo.processHooks(kryoTypeHooks, globalHook);
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
index 7011d23..5bb9d7e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +65,7 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
private Configuration hconf;
private MapJoinDesc desc;
private TezContext tezContext;
+ private String cacheKey;
@Override
public void init(ExecMapperContext context, MapredContext mrContext, Configuration hconf,
@@ -70,6 +73,7 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
this.tezContext = (TezContext) mrContext;
this.hconf = hconf;
this.desc = joinOp.getConf();
+ this.cacheKey = joinOp.getCacheKey();
}
@Override
@@ -147,25 +151,36 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
}
nwayConf.setNumberOfPartitions(numPartitions);
}
- final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
- final long memoryCheckInterval = HiveConf.getLongVar(hconf,
- HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
- final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
- long numEntries = 0;
- long noCondTaskSize = desc.getNoConditionalTaskSize();
- boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0;
+ MemoryMonitorInfo memoryMonitorInfo = desc.getMemoryMonitorInfo();
+ boolean doMemCheck = false;
+ long effectiveThreshold = 0;
+ if (memoryMonitorInfo != null) {
+ effectiveThreshold = memoryMonitorInfo.getEffectiveThreshold(desc.getMaxMemoryAvailable());
+
+ // hash table loading happens in server side, LlapDecider could kick out some fragments to run outside of LLAP.
+ // Flip the flag at runtime in case if we are running outside of LLAP
+ if (!LlapDaemonInfo.INSTANCE.isLlap()) {
+ memoryMonitorInfo.setLlap(false);
+ }
+ if (memoryMonitorInfo.doMemoryMonitoring()) {
+ doMemCheck = true;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Memory monitoring for hash table loader enabled. {}", memoryMonitorInfo);
+ }
+ }
+ }
+
if (!doMemCheck) {
- LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " +
- "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval);
- } else {
- LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ",
- noCondTaskSize, inflationFactor);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Not doing hash table memory monitoring. {}", memoryMonitorInfo);
+ }
}
for (int pos = 0; pos < mapJoinTables.length; pos++) {
if (pos == desc.getPosBigTable()) {
continue;
}
+ long numEntries = 0;
String inputName = parentToInput.get(pos);
LogicalInput input = tezContext.getInput(inputName);
@@ -219,36 +234,39 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable
tableContainer = new HashMapWrapper(hconf, keyCount);
}
- LOG.info("Using tableContainer: " + tableContainer.getClass().getSimpleName());
+ LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {}", inputName,
+ cacheKey, tableContainer.getClass().getSimpleName(), pos);
tableContainer.setSerde(keyCtx, valCtx);
while (kvReader.next()) {
tableContainer.putRow((Writable) kvReader.getCurrentKey(), (Writable) kvReader.getCurrentValue());
numEntries++;
- if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) {
+ if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
final long estMemUsage = tableContainer.getEstimatedMemorySize();
- final long threshold = (long) (inflationFactor * noCondTaskSize);
- // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
- // available for container/executor
- final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable());
if (estMemUsage > effectiveThreshold) {
- String msg = "Hash table loading exceeded memory limits." +
- " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize +
- " inflationFactor: " + inflationFactor + " threshold: " + threshold +
- " effectiveThreshold: " + effectiveThreshold;
+ String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+ " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
+ " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
LOG.error(msg);
throw new MapJoinMemoryExhaustionError(msg);
} else {
if (LOG.isInfoEnabled()) {
- LOG.info("Checking hash table loader memory usage.. numEntries: {} estimatedMemoryUsage: {} " +
- "effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold);
+ LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
+ "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
+ effectiveThreshold);
}
}
}
}
tableContainer.seal();
- LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos);
mapJoinTables[pos] = tableContainer;
+ if (doMemCheck) {
+ LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} estimatedMemoryUsage: {}",
+ inputName, cacheKey, numEntries, tableContainer.getEstimatedMemorySize());
+ } else {
+ LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {}", inputName, cacheKey,
+ numEntries);
+ }
} catch (Exception e) {
throw new HiveException(e);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
index b015e43..6c1ae2c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastHashTableLoader.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Map;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +53,7 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
private Configuration hconf;
protected MapJoinDesc desc;
private TezContext tezContext;
+ private String cacheKey;
@Override
public void init(ExecMapperContext context, MapredContext mrContext,
@@ -58,6 +61,7 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
this.tezContext = (TezContext) mrContext;
this.hconf = hconf;
this.desc = joinOp.getConf();
+ this.cacheKey = joinOp.getCacheKey();
}
@Override
@@ -68,26 +72,36 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
Map<Integer, String> parentToInput = desc.getParentToInput();
Map<Integer, Long> parentKeyCounts = desc.getParentKeyCounts();
- final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
- final long memoryCheckInterval = HiveConf.getLongVar(hconf,
- HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
- final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE));
- long numEntries = 0;
- long noCondTaskSize = desc.getNoConditionalTaskSize();
- boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0;
- if (!doMemCheck) {
- LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " +
- "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval);
- } else {
- LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ",
- noCondTaskSize, inflationFactor);
+ MemoryMonitorInfo memoryMonitorInfo = desc.getMemoryMonitorInfo();
+ boolean doMemCheck = false;
+ long effectiveThreshold = 0;
+ if (memoryMonitorInfo != null) {
+ effectiveThreshold = memoryMonitorInfo.getEffectiveThreshold(desc.getMaxMemoryAvailable());
+
+ // hash table loading happens in server side, LlapDecider could kick out some fragments to run outside of LLAP.
+ // Flip the flag at runtime in case if we are running outside of LLAP
+ if (!LlapDaemonInfo.INSTANCE.isLlap()) {
+ memoryMonitorInfo.setLlap(false);
+ }
+ if (memoryMonitorInfo.doMemoryMonitoring()) {
+ doMemCheck = true;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Memory monitoring for hash table loader enabled. {}", memoryMonitorInfo);
+ }
+ }
}
+ if (!doMemCheck) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Not doing hash table memory monitoring. {}", memoryMonitorInfo);
+ }
+ }
for (int pos = 0; pos < mapJoinTables.length; pos++) {
if (pos == desc.getPosBigTable()) {
continue;
}
+ long numEntries = 0;
String inputName = parentToInput.get(pos);
LogicalInput input = tezContext.getInput(inputName);
@@ -108,41 +122,42 @@ public class VectorMapJoinFastHashTableLoader implements org.apache.hadoop.hive.
VectorMapJoinFastTableContainer vectorMapJoinFastTableContainer =
new VectorMapJoinFastTableContainer(desc, hconf, keyCount);
- LOG.info("Using vectorMapJoinFastTableContainer: " + vectorMapJoinFastTableContainer.getClass().getSimpleName());
+ LOG.info("Loading hash table for input: {} cacheKey: {} tableContainer: {} smallTablePos: {}", inputName,
+ cacheKey, vectorMapJoinFastTableContainer.getClass().getSimpleName(), pos);
vectorMapJoinFastTableContainer.setSerde(null, null); // No SerDes here.
while (kvReader.next()) {
vectorMapJoinFastTableContainer.putRow((BytesWritable)kvReader.getCurrentKey(),
(BytesWritable)kvReader.getCurrentValue());
numEntries++;
- if (doMemCheck && numEntries >= memoryCheckInterval) {
- if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) {
+ if (doMemCheck && (numEntries % memoryMonitorInfo.getMemoryCheckInterval() == 0)) {
final long estMemUsage = vectorMapJoinFastTableContainer.getEstimatedMemorySize();
- final long threshold = (long) (inflationFactor * noCondTaskSize);
- // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory
- // available for container/executor
- final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable());
if (estMemUsage > effectiveThreshold) {
- String msg = "VectorMapJoin Hash table loading exceeded memory limits." +
- " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize +
- " inflationFactor: " + inflationFactor + " threshold: " + threshold +
- " effectiveThreshold: " + effectiveThreshold;
+ String msg = "Hash table loading exceeded memory limits for input: " + inputName +
+ " numEntries: " + numEntries + " estimatedMemoryUsage: " + estMemUsage +
+ " effectiveThreshold: " + effectiveThreshold + " memoryMonitorInfo: " + memoryMonitorInfo;
LOG.error(msg);
throw new MapJoinMemoryExhaustionError(msg);
} else {
if (LOG.isInfoEnabled()) {
- LOG.info("Checking vector mapjoin hash table loader memory usage.. numEntries: {} " +
- "estimatedMemoryUsage: {} effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold);
+ LOG.info("Checking hash table loader memory usage for input: {} numEntries: {} " +
+ "estimatedMemoryUsage: {} effectiveThreshold: {}", inputName, numEntries, estMemUsage,
+ effectiveThreshold);
}
}
- }
}
}
vectorMapJoinFastTableContainer.seal();
mapJoinTables[pos] = vectorMapJoinFastTableContainer;
- LOG.info("Finished loading hashtable using " + vectorMapJoinFastTableContainer.getClass() +
- ". Small table position: " + pos);
+ if (doMemCheck) {
+ LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {} " +
+ "estimatedMemoryUsage: {}", inputName, cacheKey, numEntries,
+ vectorMapJoinFastTableContainer.getEstimatedMemorySize());
+ } else {
+ LOG.info("Finished loading hash table for input: {} cacheKey: {} numEntries: {}", inputName, cacheKey,
+ numEntries);
+ }
} catch (IOException e) {
throw new HiveException(e);
} catch (SerDeException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
index 0eec78e..98fec77 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
import org.apache.hadoop.hive.ql.exec.MuxOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -97,10 +98,9 @@ public class ConvertJoinMapJoin implements NodeProcessor {
JoinOperator joinOp = (JoinOperator) nd;
long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
-
// adjust noconditional task size threshold for LLAP
- maxSize = getNoConditionalTaskSizeForLlap(maxSize, context.conf);
- joinOp.getConf().setNoConditionalTaskSize(maxSize);
+ MemoryMonitorInfo memoryMonitorInfo = getMemoryMonitorInfo(maxSize, context.conf);
+ joinOp.getConf().setMemoryMonitorInfo(memoryMonitorInfo);
TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
@@ -172,7 +172,12 @@ public class ConvertJoinMapJoin implements NodeProcessor {
}
@VisibleForTesting
- public long getNoConditionalTaskSizeForLlap(final long maxSize, final HiveConf conf) {
+ public MemoryMonitorInfo getMemoryMonitorInfo(final long maxSize, final HiveConf conf) {
+ final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
+ final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY);
+ final long memoryCheckInterval = conf.getLongVar(ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
+ final float inflationFactor = conf.getFloatVar(ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
+ final MemoryMonitorInfo memoryMonitorInfo;
if ("llap".equalsIgnoreCase(conf.getVar(ConfVars.HIVE_EXECUTION_MODE))) {
LlapClusterStateForCompile llapInfo = LlapClusterStateForCompile.getClusterInfo(conf);
llapInfo.initClusterInfo();
@@ -190,24 +195,23 @@ public class ConvertJoinMapJoin implements NodeProcessor {
executorsPerNode = numExecutorsPerNodeFromCluster;
}
}
- final int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
- if (numSessions > 0) {
- final int availableSlotsPerQuery = (int) ((double) executorsPerNode / numSessions);
- final double overSubscriptionFactor = conf.getFloatVar(ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
- final int maxSlotsPerQuery = conf.getIntVar(ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY);
- final int slotsPerQuery = Math.min(maxSlotsPerQuery, availableSlotsPerQuery);
- final long llapMaxSize = (long) (maxSize + (maxSize * overSubscriptionFactor * slotsPerQuery));
- LOG.info("No conditional task size adjusted for LLAP. executorsPerNode: {}, numSessions: {}, " +
- "availableSlotsPerQuery: {}, overSubscriptionFactor: {}, maxSlotsPerQuery: {}, slotsPerQuery: {}, " +
- "noconditionalTaskSize: {}, adjustedNoconditionalTaskSize: {}", executorsPerNode, numSessions,
- availableSlotsPerQuery, overSubscriptionFactor, maxSlotsPerQuery, slotsPerQuery, maxSize, llapMaxSize);
- return Math.max(maxSize, llapMaxSize);
- } else {
- LOG.warn(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname + " returned value {}. Returning {}" +
- " as no conditional task size for LLAP.", numSessions, maxSize);
- }
+
+ // bounded by max executors
+ final int slotsPerQuery = Math.min(maxSlotsPerQuery, executorsPerNode);
+ final long llapMaxSize = (long) (maxSize + (maxSize * overSubscriptionFactor * slotsPerQuery));
+ // prevents under subscription
+ final long adjustedMaxSize = Math.max(maxSize, llapMaxSize);
+ memoryMonitorInfo = new MemoryMonitorInfo(true, executorsPerNode, maxSlotsPerQuery,
+ overSubscriptionFactor, maxSize, adjustedMaxSize, memoryCheckInterval, inflationFactor);
+ } else {
+ // for non-LLAP mode most of these are not relevant. Only noConditionalTaskSize is used by shared scan optimizer.
+ memoryMonitorInfo = new MemoryMonitorInfo(false, 1, maxSlotsPerQuery, overSubscriptionFactor, maxSize, maxSize,
+ memoryCheckInterval, inflationFactor);
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Memory monitor info set to : {}", memoryMonitorInfo);
}
- return maxSize;
+ return memoryMonitorInfo;
}
@SuppressWarnings("unchecked")
@@ -282,7 +286,7 @@ public class ConvertJoinMapJoin implements NodeProcessor {
null, joinDesc.getExprs(), null, null,
joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null,
- joinDesc.getNoConditionalTaskSize(), joinDesc.getInMemoryDataSize());
+ joinDesc.getMemoryMonitorInfo(), joinDesc.getInMemoryDataSize());
mapJoinDesc.setNullSafes(joinDesc.getNullSafes());
mapJoinDesc.setFilterMap(joinDesc.getFilterMap());
mapJoinDesc.setResidualFilterExprs(joinDesc.getResidualFilterExprs());
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
index f01fb9c..d84a1e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
@@ -432,7 +432,7 @@ public class MapJoinProcessor extends Transform {
smbJoinDesc.getOutputColumnNames(),
bigTablePos, smbJoinDesc.getConds(),
smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix(),
- smbJoinDesc.getNoConditionalTaskSize(), smbJoinDesc.getInMemoryDataSize());
+ smbJoinDesc.getMemoryMonitorInfo(), smbJoinDesc.getInMemoryDataSize());
mapJoinDesc.setStatistics(smbJoinDesc.getStatistics());
@@ -1186,7 +1186,7 @@ public class MapJoinProcessor extends Transform {
new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs,
valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters,
op.getConf().getNoOuterJoin(), dumpFilePrefix,
- op.getConf().getNoConditionalTaskSize(), op.getConf().getInMemoryDataSize());
+ op.getConf().getMemoryMonitorInfo(), op.getConf().getInMemoryDataSize());
mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
mapJoinDescriptor.setTagOrder(tagOrder);
mapJoinDescriptor.setNullSafes(desc.getNullSafes());
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
index 5964fd4..3349fc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SharedScanOptimizer.java
@@ -402,10 +402,10 @@ public class SharedScanOptimizer extends Transform {
if (op instanceof MapJoinOperator) {
MapJoinOperator mop = (MapJoinOperator) op;
dataSize = StatsUtils.safeAdd(dataSize, mop.getConf().getInMemoryDataSize());
- if (dataSize > mop.getConf().getNoConditionalTaskSize()) {
+ if (dataSize > mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize()) {
// Size surpasses limit, we cannot convert
LOG.debug("accumulated data size: {} / max size: {}",
- dataSize, mop.getConf().getNoConditionalTaskSize());
+ dataSize, mop.getConf().getMemoryMonitorInfo().getAdjustedNoConditionalTaskSize());
return false;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
index b9b600d..471675b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/HiveOpConverter.java
@@ -1003,7 +1003,7 @@ public class HiveOpConverter {
// 4. We create the join operator with its descriptor
JoinDesc desc = new JoinDesc(exprMap, outputColumnNames, noOuterJoin, joinCondns,
- filters, joinExpressions, 0);
+ filters, joinExpressions, null);
desc.setReversedExprs(reversedExprs);
desc.setFilterMap(filterMap);
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
index 53abb21..e0ccd04 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
@@ -282,7 +282,7 @@ public final class GenMRSkewJoinProcessor {
newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc,joinDescriptor
.getOutputColumnNames(), i, joinDescriptor.getConds(),
joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix,
- joinDescriptor.getNoConditionalTaskSize(), joinDescriptor.getInMemoryDataSize());
+ joinDescriptor.getMemoryMonitorInfo(), joinDescriptor.getInMemoryDataSize());
mapJoinDescriptor.setTagOrder(tags);
mapJoinDescriptor.setHandleSkewJoin(false);
mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes());
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
index a5f0b2a..6b9d5b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
@@ -241,7 +241,7 @@ public class GenSparkSkewJoinProcessor {
newJoinValues, newJoinValueTblDesc, newJoinValueTblDesc, joinDescriptor
.getOutputColumnNames(), i, joinDescriptor.getConds(),
joinDescriptor.getFilters(), joinDescriptor.getNoOuterJoin(), dumpFilePrefix,
- joinDescriptor.getNoConditionalTaskSize(), joinDescriptor.getInMemoryDataSize());
+ joinDescriptor.getMemoryMonitorInfo(), joinDescriptor.getInMemoryDataSize());
mapJoinDescriptor.setTagOrder(tags);
mapJoinDescriptor.setHandleSkewJoin(false);
mapJoinDescriptor.setNullSafes(joinDescriptor.getNullSafes());
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index c6b67d1..eb7ef00 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -8125,7 +8125,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
JoinDesc desc = new JoinDesc(exprMap, outputColumnNames,
- join.getNoOuterJoin(), joinCondns, filterMap, joinKeys, 0);
+ join.getNoOuterJoin(), joinCondns, filterMap, joinKeys, null);
desc.setReversedExprs(reversedExprs);
desc.setFilterMap(join.getFilterMap());
// For outer joins, add filters that apply to more than one input
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
index 12e1ff5..eae80a7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.parse.QBJoinTree;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
@@ -111,7 +112,7 @@ public class JoinDesc extends AbstractOperatorDesc {
protected transient long inMemoryDataSize;
// non-transient field, used at runtime to kill a task if it exceeded memory limits when running in LLAP
- protected long noConditionalTaskSize;
+ protected MemoryMonitorInfo memoryMonitorInfo;
public JoinDesc() {
}
@@ -119,14 +120,14 @@ public class JoinDesc extends AbstractOperatorDesc {
public JoinDesc(final Map<Byte, List<ExprNodeDesc>> exprs,
List<String> outputColumnNames, final boolean noOuterJoin,
final JoinCondDesc[] conds, final Map<Byte, List<ExprNodeDesc>> filters,
- ExprNodeDesc[][] joinKeys, final long noConditionalTaskSize) {
+ ExprNodeDesc[][] joinKeys, final MemoryMonitorInfo memoryMonitorInfo) {
this.exprs = exprs;
this.outputColumnNames = outputColumnNames;
this.noOuterJoin = noOuterJoin;
this.conds = conds;
this.filters = filters;
this.joinKeys = joinKeys;
- this.noConditionalTaskSize = noConditionalTaskSize;
+ this.memoryMonitorInfo = memoryMonitorInfo;
resetOrder();
}
@@ -153,7 +154,9 @@ public class JoinDesc extends AbstractOperatorDesc {
ret.setHandleSkewJoin(handleSkewJoin);
ret.setSkewKeyDefinition(getSkewKeyDefinition());
ret.setTagOrder(getTagOrder().clone());
- ret.setNoConditionalTaskSize(getNoConditionalTaskSize());
+ if (getMemoryMonitorInfo() != null) {
+ ret.setMemoryMonitorInfo(new MemoryMonitorInfo(getMemoryMonitorInfo()));
+ }
if (getKeyTableDesc() != null) {
ret.setKeyTableDesc((TableDesc) getKeyTableDesc().clone());
}
@@ -204,8 +207,8 @@ public class JoinDesc extends AbstractOperatorDesc {
this.filterMap = clone.filterMap;
this.residualFilterExprs = clone.residualFilterExprs;
this.statistics = clone.statistics;
- this.noConditionalTaskSize = clone.noConditionalTaskSize;
this.inMemoryDataSize = clone.inMemoryDataSize;
+ this.memoryMonitorInfo = clone.memoryMonitorInfo;
}
public Map<Byte, List<ExprNodeDesc>> getExprs() {
@@ -691,14 +694,12 @@ public class JoinDesc extends AbstractOperatorDesc {
streamAliases = joinDesc.streamAliases == null ? null : new ArrayList<String>(joinDesc.streamAliases);
}
- private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(JoinDesc.class);
-
- public long getNoConditionalTaskSize() {
- return noConditionalTaskSize;
+ public MemoryMonitorInfo getMemoryMonitorInfo() {
+ return memoryMonitorInfo;
}
- public void setNoConditionalTaskSize(final long noConditionalTaskSize) {
- this.noConditionalTaskSize = noConditionalTaskSize;
+ public void setMemoryMonitorInfo(final MemoryMonitorInfo memoryMonitorInfo) {
+ this.memoryMonitorInfo = memoryMonitorInfo;
}
public long getInMemoryDataSize() {
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
index f387e6a..0d8e459 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
@@ -31,6 +31,7 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.MemoryMonitorInfo;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
import org.apache.hadoop.hive.ql.plan.Explain.Vectorization;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
@@ -113,8 +114,8 @@ public class MapJoinDesc extends JoinDesc implements Serializable {
final List<TableDesc> valueTblDescs, final List<TableDesc> valueFilteredTblDescs, List<String> outputColumnNames,
final int posBigTable, final JoinCondDesc[] conds,
final Map<Byte, List<ExprNodeDesc>> filters, boolean noOuterJoin, String dumpFilePrefix,
- final long noConditionalTaskSize, final long inMemoryDataSize) {
- super(values, outputColumnNames, noOuterJoin, conds, filters, null, noConditionalTaskSize);
+ final MemoryMonitorInfo memoryMonitorInfo, final long inMemoryDataSize) {
+ super(values, outputColumnNames, noOuterJoin, conds, filters, null, memoryMonitorInfo);
vectorDesc = null;
this.keys = keys;
this.keyTblDesc = keyTblDesc;
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
index 0287ff2..f62ba34 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
@@ -446,9 +446,9 @@ public class TestOperators extends TestCase {
long defaultNoConditionalTaskSize = 1024L * 1024L * 1024L;
HiveConf hiveConf = new HiveConf();
- // execution mode not set, default is returned
- long gotSize = convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf);
- assertEquals(defaultNoConditionalTaskSize, gotSize);
+ // execution mode not set, null is returned
+ assertEquals(defaultNoConditionalTaskSize, convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize,
+ hiveConf).getAdjustedNoConditionalTaskSize());
hiveConf.set(HiveConf.ConfVars.HIVE_EXECUTION_MODE.varname, "llap");
// default executors is 4, max slots is 3. so 3 * 20% of noconditional task size will be oversubscribed
@@ -457,7 +457,8 @@ public class TestOperators extends TestCase {
int maxSlots = 3;
long expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * maxSlots));
assertEquals(expectedSize,
- convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
+ convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf)
+ .getAdjustedNoConditionalTaskSize());
// num executors is less than max executors per query (which is not expected case), default executors will be
// chosen. 4 * 20% of noconditional task size will be oversubscribed
@@ -465,40 +466,18 @@ public class TestOperators extends TestCase {
hiveConf.set(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname, "5");
expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * chosenSlots));
assertEquals(expectedSize,
- convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
-
- // 2 concurrent sessions, 4 executors. 2 * 20% of noconditional task size will be oversubscribed
- hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname);
- hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "2");
- expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 2));
- assertEquals(expectedSize,
- convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
-
- // 4 concurrent sessions, 4 executors. 1 * 20% of noconditional task size will be oversubscribed
- hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname);
- hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "4");
- expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 1));
- assertEquals(expectedSize,
- convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
-
- // 8 concurrent sessions, 4 executors. default noconditioanl task will be used (no oversubscription)
- hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname);
- hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "8");
- assertEquals(defaultNoConditionalTaskSize,
- convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
-
- // 2 * 120% of noconditional task size will be oversubscribed
- hiveConf.unset(HiveConf.ConfVars.LLAP_MEMORY_OVERSUBSCRIPTION_MAX_EXECUTORS_PER_QUERY.varname);
- hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "2");
- hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR.varname, "1.2");
- fraction = hiveConf.getFloatVar(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_OVERSUBSCRIBE_FACTOR);
- expectedSize = (long) (defaultNoConditionalTaskSize + (defaultNoConditionalTaskSize * fraction * 2));
- assertEquals(expectedSize,
- convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
-
- // 0 value for number of sessions
- hiveConf.set(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE.varname, "0");
- assertEquals(defaultNoConditionalTaskSize,
- convertJoinMapJoin.getNoConditionalTaskSizeForLlap(defaultNoConditionalTaskSize, hiveConf));
+ convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf)
+ .getAdjustedNoConditionalTaskSize());
+
+ // disable memory checking
+ hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "0");
+ assertFalse(
+ convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf).doMemoryMonitoring());
+
+ // invalid inflation factor
+ hiveConf.set(HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL.varname, "10000");
+ hiveConf.set(HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR.varname, "0.0f");
+ assertFalse(
+ convertJoinMapJoin.getMemoryMonitorInfo(defaultNoConditionalTaskSize, hiveConf).doMemoryMonitoring());
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/5a0b4253/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out b/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out
index 4f9c95a..66d7aec 100644
--- a/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out
+++ b/ql/src/test/results/clientpositive/llap/tez_smb_main.q.out
@@ -692,11 +692,11 @@ STAGE PLANS:
Tez
#### A masked pattern was here ####
Edges:
- Map 2 <- Map 1 (CUSTOM_EDGE), Map 4 (BROADCAST_EDGE)
+ Map 2 <- Map 4 (BROADCAST_EDGE)
Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE)
#### A masked pattern was here ####
Vertices:
- Map 1
+ Map 2
Map Operator Tree:
TableScan
alias: a
@@ -708,15 +708,6 @@ STAGE PLANS:
expressions: key (type: int), value (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
- Reduce Output Operator
- key expressions: _col0 (type: int)
- sort order: +
- Map-reduce partition columns: _col0 (type: int)
- Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col1 (type: string)
- Execution mode: llap
- LLAP IO: no inputs
- Map 2
Map Operator Tree:
TableScan
alias: b
@@ -728,15 +719,13 @@ STAGE PLANS:
expressions: key (type: int)
outputColumnNames: _col0
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
- Map Join Operator
+ Merge Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col0 (type: int)
1 _col0 (type: int)
outputColumnNames: _col1
- input vertices:
- 0 Map 1
Statistics: Num rows: 550 Data size: 5843 Basic stats: COMPLETE Column stats: NONE
Map Join Operator
condition map:
@@ -757,7 +746,6 @@ STAGE PLANS:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
value expressions: _col0 (type: bigint)
Execution mode: llap
- LLAP IO: no inputs
Map 4
Map Operator Tree:
TableScan