You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/06/15 21:02:12 UTC
hive git commit: HIVE-19875: increase LLAP IO queue size for perf
(Prasanth Jayachandran reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master ccfca8ec7 -> c93a15553
HIVE-19875: increase LLAP IO queue size for perf (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c93a1555
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c93a1555
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c93a1555
Branch: refs/heads/master
Commit: c93a15553a1cc383888a3ec1d27557bc0857fadd
Parents: ccfca8e
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Jun 15 14:01:24 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Jun 15 14:01:24 2018 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../hive/llap/io/api/impl/LlapRecordReader.java | 25 ++++++++++++++++----
2 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c93a1555/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 99bb400..ef173de 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3881,7 +3881,7 @@ public class HiveConf extends Configuration {
"MR LineRecordRedader into LLAP cache, if this feature is enabled. Safety flag."),
LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true,
"Whether to enable time counters for LLAP IO layer (time spent in HDFS, etc.)"),
- LLAP_IO_VRB_QUEUE_LIMIT_BASE("hive.llap.io.vrb.queue.limit.base", 10000,
+ LLAP_IO_VRB_QUEUE_LIMIT_BASE("hive.llap.io.vrb.queue.limit.base", 50000,
"The default queue size for VRBs produced by a LLAP IO thread when the processing is\n" +
"slower than the IO. The actual queue size is set per fragment, and is adjusted down\n" +
"from the base, depending on the schema."),
http://git-wip-us.apache.org/repos/asf/hive/blob/c93a1555/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index cb57a11..201c097 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.NullWritable;
@@ -163,7 +164,9 @@ class LlapRecordReader
int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE, job, daemonConf);
int queueLimitMin = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, daemonConf);
- int limit = determineQueueLimit(queueLimitBase, queueLimitMin, rbCtx.getRowColumnTypeInfos());
+ final boolean decimal64Support = HiveConf.getVar(job, ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED)
+ .equalsIgnoreCase("decimal_64");
+ int limit = determineQueueLimit(queueLimitBase, queueLimitMin, rbCtx.getRowColumnTypeInfos(), decimal64Support);
LOG.info("Queue limit for LlapRecordReader is " + limit);
this.queue = new LinkedBlockingQueue<>(limit);
@@ -199,14 +202,14 @@ class LlapRecordReader
private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL = 4,
COL_WEIGHT_STRING = 8;
private static int determineQueueLimit(
- int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos) {
+ int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos, final boolean decimal64Support) {
// If the values are equal, the queue limit is fixed.
if (queueLimitBase == queueLimitMin) return queueLimitBase;
// If there are no columns (projection only join?) just assume no weight.
if (typeInfos == null || typeInfos.length == 0) return queueLimitBase;
double totalWeight = 0;
for (TypeInfo ti : typeInfos) {
- int colWeight = 1;
+ int colWeight;
if (ti.getCategory() != Category.PRIMITIVE) {
colWeight = COL_WEIGHT_COMPLEX;
} else {
@@ -217,8 +220,22 @@ class LlapRecordReader
case VARCHAR:
case STRING:
colWeight = COL_WEIGHT_STRING;
+ break;
case DECIMAL:
- colWeight = COL_WEIGHT_HIVEDECIMAL;
+ boolean useDecimal64 = false;
+ if (ti instanceof DecimalTypeInfo) {
+ DecimalTypeInfo dti = (DecimalTypeInfo) ti;
+ if (dti.getPrecision() <= TypeDescription.MAX_DECIMAL64_PRECISION && decimal64Support) {
+ useDecimal64 = true;
+ }
+ }
+ // decimal_64 column vectors gets the same weight as long column vectors
+ if (useDecimal64) {
+ colWeight = 1;
+ } else {
+ colWeight = COL_WEIGHT_HIVEDECIMAL;
+ }
+ break;
default:
colWeight = 1;
}