You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2018/06/15 21:02:12 UTC

hive git commit: HIVE-19875: increase LLAP IO queue size for perf (Prasanth Jayachandran reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master ccfca8ec7 -> c93a15553


HIVE-19875: increase LLAP IO queue size for perf (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c93a1555
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c93a1555
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c93a1555

Branch: refs/heads/master
Commit: c93a15553a1cc383888a3ec1d27557bc0857fadd
Parents: ccfca8e
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Jun 15 14:01:24 2018 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Jun 15 14:01:24 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 +-
 .../hive/llap/io/api/impl/LlapRecordReader.java | 25 ++++++++++++++++----
 2 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c93a1555/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 99bb400..ef173de 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3881,7 +3881,7 @@ public class HiveConf extends Configuration {
         "MR LineRecordRedader into LLAP cache, if this feature is enabled. Safety flag."),
     LLAP_ORC_ENABLE_TIME_COUNTERS("hive.llap.io.orc.time.counters", true,
         "Whether to enable time counters for LLAP IO layer (time spent in HDFS, etc.)"),
-    LLAP_IO_VRB_QUEUE_LIMIT_BASE("hive.llap.io.vrb.queue.limit.base", 10000,
+    LLAP_IO_VRB_QUEUE_LIMIT_BASE("hive.llap.io.vrb.queue.limit.base", 50000,
         "The default queue size for VRBs produced by a LLAP IO thread when the processing is\n" +
         "slower than the IO. The actual queue size is set per fragment, and is adjusted down\n" +
         "from the base, depending on the schema."),

http://git-wip-us.apache.org/repos/asf/hive/blob/c93a1555/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
index cb57a11..201c097 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.NullWritable;
@@ -163,7 +164,9 @@ class LlapRecordReader
 
     int queueLimitBase = getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_BASE, job, daemonConf);
     int queueLimitMin =  getQueueVar(ConfVars.LLAP_IO_VRB_QUEUE_LIMIT_MIN, job, daemonConf);
-    int limit = determineQueueLimit(queueLimitBase, queueLimitMin, rbCtx.getRowColumnTypeInfos());
+    final boolean decimal64Support = HiveConf.getVar(job, ConfVars.HIVE_VECTORIZED_INPUT_FORMAT_SUPPORTS_ENABLED)
+      .equalsIgnoreCase("decimal_64");
+    int limit = determineQueueLimit(queueLimitBase, queueLimitMin, rbCtx.getRowColumnTypeInfos(), decimal64Support);
     LOG.info("Queue limit for LlapRecordReader is " + limit);
     this.queue = new LinkedBlockingQueue<>(limit);
 
@@ -199,14 +202,14 @@ class LlapRecordReader
   private static final int COL_WEIGHT_COMPLEX = 16, COL_WEIGHT_HIVEDECIMAL = 4,
       COL_WEIGHT_STRING = 8;
   private static int determineQueueLimit(
-      int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos) {
+    int queueLimitBase, int queueLimitMin, TypeInfo[] typeInfos, final boolean decimal64Support) {
     // If the values are equal, the queue limit is fixed.
     if (queueLimitBase == queueLimitMin) return queueLimitBase;
     // If there are no columns (projection only join?) just assume no weight.
     if (typeInfos == null || typeInfos.length == 0) return queueLimitBase;
     double totalWeight = 0;
     for (TypeInfo ti : typeInfos) {
-      int colWeight = 1;
+      int colWeight;
       if (ti.getCategory() != Category.PRIMITIVE) {
         colWeight = COL_WEIGHT_COMPLEX;
       } else {
@@ -217,8 +220,22 @@ class LlapRecordReader
         case VARCHAR:
         case STRING:
           colWeight = COL_WEIGHT_STRING;
+          break;
         case DECIMAL:
-          colWeight = COL_WEIGHT_HIVEDECIMAL;
+          boolean useDecimal64 = false;
+          if (ti instanceof DecimalTypeInfo) {
+            DecimalTypeInfo dti = (DecimalTypeInfo) ti;
+            if (dti.getPrecision() <= TypeDescription.MAX_DECIMAL64_PRECISION && decimal64Support) {
+              useDecimal64 = true;
+            }
+          }
+          // decimal_64 column vectors gets the same weight as long column vectors
+          if (useDecimal64) {
+            colWeight = 1;
+          } else {
+            colWeight = COL_WEIGHT_HIVEDECIMAL;
+          }
+          break;
         default:
           colWeight = 1;
         }