You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/07/13 20:29:14 UTC

hive git commit: HIVE-14196: Disable LLAP IO when complex types are involved (Prasanth Jayachandran reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master 87101b73b -> 7f9438357


HIVE-14196: Disable LLAP IO when complex types are involved (Prasanth Jayachandran reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7f943835
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7f943835
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7f943835

Branch: refs/heads/master
Commit: 7f9438357656edfa39e2539abd69773c64c9af95
Parents: 87101b7
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Wed Jul 13 13:28:55 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Wed Jul 13 13:28:55 2016 -0700

----------------------------------------------------------------------
 .../hive/llap/io/api/impl/LlapInputFormat.java  | 12 +++
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 84 ++++++++++++++++++++
 .../hive/ql/optimizer/physical/Vectorizer.java  | 41 +++++-----
 .../org/apache/hadoop/hive/ql/plan/MapWork.java | 27 +++++++
 .../llap/vector_complex_join.q.out              |  4 +-
 .../hive/serde2/typeinfo/TypeInfoUtils.java     | 17 ++--
 6 files changed, 156 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7f943835/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index c4ffb9f..cc4e10b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hive.llap.io.api.impl;
 
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
 import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat;
 
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -30,11 +31,14 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -64,8 +68,10 @@ import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -116,6 +122,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
       useLlapIo = ((LlapAwareSplit)split).canUseLlapIo();
     }
     boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job);
+
+    // validate for supported types. Until we fix HIVE-14089 we need this check.
+    if (useLlapIo) {
+      useLlapIo = Utilities.checkLlapIOSupportedTypes(job);
+    }
+
     if (!useLlapIo) {
       LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
       return sourceInputFormat.getRecordReader(split, job, reporter);

http://git-wip-us.apache.org/repos/asf/hive/blob/7f943835/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 44a3699..a1f67f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -113,6 +113,7 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -140,6 +141,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.InputEstimator;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
@@ -159,6 +161,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.Serializer;
@@ -169,6 +172,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
@@ -3667,4 +3672,83 @@ public final class Utilities {
     }
     return value;
   }
+
+  /**
+   * Check if LLAP IO supports the column type that is being read
+   * @param conf - configuration
+   * @return false for types not supported by vectorization, true otherwise
+   */
+  public static boolean checkLlapIOSupportedTypes(final Configuration conf) {
+    final String[] readColumnNames = ColumnProjectionUtils.getReadColumnNames(conf);
+    final String columnNames = conf.get(serdeConstants.LIST_COLUMNS);
+    final String columnTypes = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+    if (columnNames == null || columnTypes == null || columnNames.isEmpty() ||
+        columnTypes.isEmpty()) {
+      LOG.warn("Column names ({}) or types ({}) is null. Skipping type checking for LLAP IO.",
+          columnNames, columnTypes);
+      return true;
+    }
+    final List<String> allColumnNames = Lists.newArrayList(columnNames.split(","));
+    final List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
+    final List<String> allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos);
+    return checkLlapIOSupportedTypes(Lists.newArrayList(readColumnNames), allColumnNames,
+        allColumnTypes);
+  }
+
+  /**
+   * Check if LLAP IO supports the column type that is being read
+   * @param readColumnNames - columns that will be read from the table/partition
+   * @param allColumnNames - all columns
+   * @param allColumnTypes - all column types
+   * @return false for types not supported by vectorization, true otherwise
+   */
+  public static boolean checkLlapIOSupportedTypes(final List<String> readColumnNames,
+      final List<String> allColumnNames, final List<String> allColumnTypes) {
+    final String[] readColumnTypes = getReadColumnTypes(readColumnNames, allColumnNames,
+        allColumnTypes);
+
+    if (readColumnTypes != null) {
+      for (String readColumnType : readColumnTypes) {
+        if (readColumnType != null) {
+          if (!Vectorizer.validateDataType(readColumnType,
+              VectorExpressionDescriptor.Mode.PROJECTION)) {
+            LOG.warn("Unsupported column type encountered ({}). Disabling LLAP IO.",
+                readColumnType);
+            return false;
+          }
+        }
+      }
+    } else {
+      LOG.warn("readColumnTypes is null. Skipping type checking for LLAP IO. " +
+          "readColumnNames: {} allColumnNames: {} allColumnTypes: {} readColumnTypes: {}",
+          readColumnNames, allColumnNames, allColumnTypes, readColumnTypes);
+    }
+    return true;
+  }
+
+  private static String[] getReadColumnTypes(final List<String> readColumnNames,
+      final List<String> allColumnNames, final List<String> allColumnTypes) {
+    if (readColumnNames == null || allColumnNames == null || allColumnTypes == null ||
+        readColumnNames.isEmpty() || allColumnNames.isEmpty() || allColumnTypes.isEmpty()) {
+      return null;
+    }
+    Map<String, String> columnNameToType = new HashMap<>();
+    List<TypeInfo> types = TypeInfoUtils.typeInfosFromTypeNames(allColumnTypes);
+    if (allColumnNames.size() != types.size()) {
+      LOG.warn("Column names count does not match column types count." +
+              " ColumnNames: {} [{}] ColumnTypes: {} [{}]", allColumnNames, allColumnNames.size(),
+          allColumnTypes, types.size());
+      return null;
+    }
+
+    for (int i = 0; i < allColumnNames.size(); i++) {
+      columnNameToType.put(allColumnNames.get(i), types.get(i).toString());
+    }
+
+    String[] result = new String[readColumnNames.size()];
+    for (int i = 0; i < readColumnNames.size(); i++) {
+      result[i] = columnNameToType.get(readColumnNames.get(i));
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/7f943835/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index f51a084..bce3853 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -186,25 +186,9 @@ public class Vectorizer implements PhysicalPlanResolver {
 
   protected static transient final Logger LOG = LoggerFactory.getLogger(Vectorizer.class);
 
-  Pattern supportedDataTypesPattern;
-  List<Task<? extends Serializable>> vectorizableTasks =
-      new ArrayList<Task<? extends Serializable>>();
-  Set<Class<?>> supportedGenericUDFs = new HashSet<Class<?>>();
-
-  Set<String> supportedAggregationUdfs = new HashSet<String>();
-
-  private HiveConf hiveConf;
-
-  private boolean isSpark;
-
-  boolean useVectorizedInputFileFormat;
-  boolean useVectorDeserialize;
-  boolean useRowDeserialize;
-
-  boolean isSchemaEvolution;
-
-  public Vectorizer() {
+  static Pattern supportedDataTypesPattern;
 
+  static {
     StringBuilder patternBuilder = new StringBuilder();
     patternBuilder.append("int");
     patternBuilder.append("|smallint");
@@ -235,6 +219,25 @@ public class Vectorizer implements PhysicalPlanResolver {
     patternBuilder.append("|varchar.*");
 
     supportedDataTypesPattern = Pattern.compile(patternBuilder.toString());
+  }
+
+  List<Task<? extends Serializable>> vectorizableTasks =
+      new ArrayList<Task<? extends Serializable>>();
+  Set<Class<?>> supportedGenericUDFs = new HashSet<Class<?>>();
+
+  Set<String> supportedAggregationUdfs = new HashSet<String>();
+
+  private HiveConf hiveConf;
+
+  private boolean isSpark;
+
+  boolean useVectorizedInputFileFormat;
+  boolean useVectorDeserialize;
+  boolean useRowDeserialize;
+
+  boolean isSchemaEvolution;
+
+  public Vectorizer() {
 
     supportedGenericUDFs.add(GenericUDFOPPlus.class);
     supportedGenericUDFs.add(GenericUDFOPMinus.class);
@@ -1928,7 +1931,7 @@ public class Vectorizer implements PhysicalPlanResolver {
     return new Pair<Boolean,Boolean>(true, outputIsPrimitive);
   }
 
-  private boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode) {
+  public static boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode) {
     type = type.toLowerCase();
     boolean result = supportedDataTypesPattern.matcher(type).matches();
     if (result && mode == VectorExpressionDescriptor.Mode.PROJECTION && type.equals("void")) {

http://git-wip-us.apache.org/repos/asf/hive/blob/7f943835/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index d908d48..8d329d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 
 import java.util.ArrayList;
@@ -31,6 +32,7 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
@@ -47,6 +49,9 @@ import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCo
 import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol;
 import org.apache.hadoop.hive.ql.parse.SplitSample;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.JobConf;
 
 import com.google.common.collect.Interner;
@@ -243,6 +248,28 @@ public class MapWork extends BaseWork {
         }
       }
     }
+
+    // check if the column types that are read are supported by LLAP IO
+    for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) {
+      if (hasLlap) {
+        final String alias = entry.getKey();
+        Operator<? extends OperatorDesc> op = entry.getValue();
+        PartitionDesc partitionDesc = aliasToPartnInfo.get(alias);
+        if (op instanceof TableScanOperator && partitionDesc != null &&
+            partitionDesc.getTableDesc() != null) {
+          final TableScanOperator tsOp = (TableScanOperator) op;
+          final List<String> readColumnNames = tsOp.getNeededColumns();
+          final Properties props = partitionDesc.getTableDesc().getProperties();
+          final List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(
+              props.getProperty(serdeConstants.LIST_COLUMN_TYPES));
+          final List<String> allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos);
+          final List<String> allColumnNames = Utilities.getColumnNames(props);
+          hasLlap = Utilities.checkLlapIOSupportedTypes(readColumnNames, allColumnNames,
+              allColumnTypes);
+        }
+      }
+    }
+
     llapIoDesc = deriveLlapIoDescString(
         isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/7f943835/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
index 480627f..142bdee 100644
--- a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
@@ -70,7 +70,7 @@ STAGE PLANS:
                               output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 2 
             Map Operator Tree:
                 TableScan
@@ -195,7 +195,7 @@ STAGE PLANS:
                               output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
             Execution mode: llap
-            LLAP IO: no inputs
+            LLAP IO: all inputs
         Map 2 
             Map Operator Tree:
                 TableScan

http://git-wip-us.apache.org/repos/asf/hive/blob/7f943835/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java
index abd2838..8f7b799 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java
@@ -814,15 +814,16 @@ public final class TypeInfoUtils {
     return parser.parseTypeInfos();
   }
 
-  public static String getTypesString(List<TypeInfo> typeInfos) {
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < typeInfos.size(); i++) {
-      if (i > 0) {
-        sb.append(":");
-      }
-      sb.append(typeInfos.get(i).getTypeName());
+  public static List<String> getTypeStringsFromTypeInfo(List<TypeInfo> typeInfos) {
+    if (typeInfos == null) {
+      return null;
     }
-    return sb.toString();
+
+    List<String> result = new ArrayList<>(typeInfos.size());
+    for (TypeInfo typeInfo : typeInfos) {
+      result.add(typeInfo.toString());
+    }
+    return result;
   }
 
   public static TypeInfo getTypeInfoFromTypeString(String typeString) {