You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/07/13 20:29:14 UTC
hive git commit: HIVE-14196: Disable LLAP IO when complex types are
involved (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 87101b73b -> 7f9438357
HIVE-14196: Disable LLAP IO when complex types are involved (Prasanth Jayachandran reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7f943835
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7f943835
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7f943835
Branch: refs/heads/master
Commit: 7f9438357656edfa39e2539abd69773c64c9af95
Parents: 87101b7
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Wed Jul 13 13:28:55 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Wed Jul 13 13:28:55 2016 -0700
----------------------------------------------------------------------
.../hive/llap/io/api/impl/LlapInputFormat.java | 12 +++
.../apache/hadoop/hive/ql/exec/Utilities.java | 84 ++++++++++++++++++++
.../hive/ql/optimizer/physical/Vectorizer.java | 41 +++++-----
.../org/apache/hadoop/hive/ql/plan/MapWork.java | 27 +++++++
.../llap/vector_complex_join.q.out | 4 +-
.../hive/serde2/typeinfo/TypeInfoUtils.java | 17 ++--
6 files changed, 156 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/7f943835/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index c4ffb9f..cc4e10b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.llap.io.api.impl;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
import org.apache.hadoop.hive.ql.io.BatchToRowInputFormat;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -30,11 +31,14 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -64,8 +68,10 @@ import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
+import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -116,6 +122,12 @@ public class LlapInputFormat implements InputFormat<NullWritable, VectorizedRowB
useLlapIo = ((LlapAwareSplit)split).canUseLlapIo();
}
boolean isVectorized = Utilities.getUseVectorizedInputFileFormat(job);
+
+ // validate for supported types. Until we fix HIVE-14089 we need this check.
+ if (useLlapIo) {
+ useLlapIo = Utilities.checkLlapIOSupportedTypes(job);
+ }
+
if (!useLlapIo) {
LlapIoImpl.LOG.warn("Not using LLAP IO for an unsupported split: " + split);
return sourceInputFormat.getRecordReader(split, job, reporter);
http://git-wip-us.apache.org/repos/asf/hive/blob/7f943835/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 44a3699..a1f67f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -113,6 +113,7 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -140,6 +141,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
@@ -159,6 +161,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.Serializer;
@@ -169,6 +172,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
@@ -3667,4 +3672,83 @@ public final class Utilities {
}
return value;
}
+
+ /**
+ * Check if LLAP IO supports the column type that is being read
+ * @param conf - configuration
+ * @return false for types not supported by vectorization, true otherwise
+ */
+ public static boolean checkLlapIOSupportedTypes(final Configuration conf) {
+ final String[] readColumnNames = ColumnProjectionUtils.getReadColumnNames(conf);
+ final String columnNames = conf.get(serdeConstants.LIST_COLUMNS);
+ final String columnTypes = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+ if (columnNames == null || columnTypes == null || columnNames.isEmpty() ||
+ columnTypes.isEmpty()) {
+ LOG.warn("Column names ({}) or types ({}) is null. Skipping type checking for LLAP IO.",
+ columnNames, columnTypes);
+ return true;
+ }
+ final List<String> allColumnNames = Lists.newArrayList(columnNames.split(","));
+ final List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
+ final List<String> allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos);
+ return checkLlapIOSupportedTypes(Lists.newArrayList(readColumnNames), allColumnNames,
+ allColumnTypes);
+ }
+
+ /**
+ * Check if LLAP IO supports the column type that is being read
+ * @param readColumnNames - columns that will be read from the table/partition
+ * @param allColumnNames - all columns
+ * @param allColumnTypes - all column types
+ * @return false for types not supported by vectorization, true otherwise
+ */
+ public static boolean checkLlapIOSupportedTypes(final List<String> readColumnNames,
+ final List<String> allColumnNames, final List<String> allColumnTypes) {
+ final String[] readColumnTypes = getReadColumnTypes(readColumnNames, allColumnNames,
+ allColumnTypes);
+
+ if (readColumnTypes != null) {
+ for (String readColumnType : readColumnTypes) {
+ if (readColumnType != null) {
+ if (!Vectorizer.validateDataType(readColumnType,
+ VectorExpressionDescriptor.Mode.PROJECTION)) {
+ LOG.warn("Unsupported column type encountered ({}). Disabling LLAP IO.",
+ readColumnType);
+ return false;
+ }
+ }
+ }
+ } else {
+ LOG.warn("readColumnTypes is null. Skipping type checking for LLAP IO. " +
+ "readColumnNames: {} allColumnNames: {} allColumnTypes: {} readColumnTypes: {}",
+ readColumnNames, allColumnNames, allColumnTypes, readColumnTypes);
+ }
+ return true;
+ }
+
+ private static String[] getReadColumnTypes(final List<String> readColumnNames,
+ final List<String> allColumnNames, final List<String> allColumnTypes) {
+ if (readColumnNames == null || allColumnNames == null || allColumnTypes == null ||
+ readColumnNames.isEmpty() || allColumnNames.isEmpty() || allColumnTypes.isEmpty()) {
+ return null;
+ }
+ Map<String, String> columnNameToType = new HashMap<>();
+ List<TypeInfo> types = TypeInfoUtils.typeInfosFromTypeNames(allColumnTypes);
+ if (allColumnNames.size() != types.size()) {
+ LOG.warn("Column names count does not match column types count." +
+ " ColumnNames: {} [{}] ColumnTypes: {} [{}]", allColumnNames, allColumnNames.size(),
+ allColumnTypes, types.size());
+ return null;
+ }
+
+ for (int i = 0; i < allColumnNames.size(); i++) {
+ columnNameToType.put(allColumnNames.get(i), types.get(i).toString());
+ }
+
+ String[] result = new String[readColumnNames.size()];
+ for (int i = 0; i < readColumnNames.size(); i++) {
+ result[i] = columnNameToType.get(readColumnNames.get(i));
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7f943835/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index f51a084..bce3853 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -186,25 +186,9 @@ public class Vectorizer implements PhysicalPlanResolver {
protected static transient final Logger LOG = LoggerFactory.getLogger(Vectorizer.class);
- Pattern supportedDataTypesPattern;
- List<Task<? extends Serializable>> vectorizableTasks =
- new ArrayList<Task<? extends Serializable>>();
- Set<Class<?>> supportedGenericUDFs = new HashSet<Class<?>>();
-
- Set<String> supportedAggregationUdfs = new HashSet<String>();
-
- private HiveConf hiveConf;
-
- private boolean isSpark;
-
- boolean useVectorizedInputFileFormat;
- boolean useVectorDeserialize;
- boolean useRowDeserialize;
-
- boolean isSchemaEvolution;
-
- public Vectorizer() {
+ static Pattern supportedDataTypesPattern;
+ static {
StringBuilder patternBuilder = new StringBuilder();
patternBuilder.append("int");
patternBuilder.append("|smallint");
@@ -235,6 +219,25 @@ public class Vectorizer implements PhysicalPlanResolver {
patternBuilder.append("|varchar.*");
supportedDataTypesPattern = Pattern.compile(patternBuilder.toString());
+ }
+
+ List<Task<? extends Serializable>> vectorizableTasks =
+ new ArrayList<Task<? extends Serializable>>();
+ Set<Class<?>> supportedGenericUDFs = new HashSet<Class<?>>();
+
+ Set<String> supportedAggregationUdfs = new HashSet<String>();
+
+ private HiveConf hiveConf;
+
+ private boolean isSpark;
+
+ boolean useVectorizedInputFileFormat;
+ boolean useVectorDeserialize;
+ boolean useRowDeserialize;
+
+ boolean isSchemaEvolution;
+
+ public Vectorizer() {
supportedGenericUDFs.add(GenericUDFOPPlus.class);
supportedGenericUDFs.add(GenericUDFOPMinus.class);
@@ -1928,7 +1931,7 @@ public class Vectorizer implements PhysicalPlanResolver {
return new Pair<Boolean,Boolean>(true, outputIsPrimitive);
}
- private boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode) {
+ public static boolean validateDataType(String type, VectorExpressionDescriptor.Mode mode) {
type = type.toLowerCase();
boolean result = supportedDataTypesPattern.matcher(type).matches();
if (result && mode == VectorExpressionDescriptor.Mode.PROJECTION && type.equals("void")) {
http://git-wip-us.apache.org/repos/asf/hive/blob/7f943835/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index d908d48..8d329d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.plan;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import java.util.ArrayList;
@@ -31,6 +32,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Properties;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -47,6 +49,9 @@ import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.BucketCo
import org.apache.hadoop.hive.ql.optimizer.physical.BucketingSortingCtx.SortCol;
import org.apache.hadoop.hive.ql.parse.SplitSample;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.JobConf;
import com.google.common.collect.Interner;
@@ -243,6 +248,28 @@ public class MapWork extends BaseWork {
}
}
}
+
+ // check if the column types that are read are supported by LLAP IO
+ for (Map.Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) {
+ if (hasLlap) {
+ final String alias = entry.getKey();
+ Operator<? extends OperatorDesc> op = entry.getValue();
+ PartitionDesc partitionDesc = aliasToPartnInfo.get(alias);
+ if (op instanceof TableScanOperator && partitionDesc != null &&
+ partitionDesc.getTableDesc() != null) {
+ final TableScanOperator tsOp = (TableScanOperator) op;
+ final List<String> readColumnNames = tsOp.getNeededColumns();
+ final Properties props = partitionDesc.getTableDesc().getProperties();
+ final List<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(
+ props.getProperty(serdeConstants.LIST_COLUMN_TYPES));
+ final List<String> allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos);
+ final List<String> allColumnNames = Utilities.getColumnNames(props);
+ hasLlap = Utilities.checkLlapIOSupportedTypes(readColumnNames, allColumnNames,
+ allColumnTypes);
+ }
+ }
+ }
+
llapIoDesc = deriveLlapIoDescString(
isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/7f943835/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
index 480627f..142bdee 100644
--- a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
+++ b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out
@@ -70,7 +70,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Execution mode: llap
- LLAP IO: no inputs
+ LLAP IO: all inputs
Map 2
Map Operator Tree:
TableScan
@@ -195,7 +195,7 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Execution mode: llap
- LLAP IO: no inputs
+ LLAP IO: all inputs
Map 2
Map Operator Tree:
TableScan
http://git-wip-us.apache.org/repos/asf/hive/blob/7f943835/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java
index abd2838..8f7b799 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/typeinfo/TypeInfoUtils.java
@@ -814,15 +814,16 @@ public final class TypeInfoUtils {
return parser.parseTypeInfos();
}
- public static String getTypesString(List<TypeInfo> typeInfos) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < typeInfos.size(); i++) {
- if (i > 0) {
- sb.append(":");
- }
- sb.append(typeInfos.get(i).getTypeName());
+ public static List<String> getTypeStringsFromTypeInfo(List<TypeInfo> typeInfos) {
+ if (typeInfos == null) {
+ return null;
}
- return sb.toString();
+
+ List<String> result = new ArrayList<>(typeInfos.size());
+ for (TypeInfo typeInfo : typeInfos) {
+ result.add(typeInfo.toString());
+ }
+ return result;
}
public static TypeInfo getTypeInfoFromTypeString(String typeString) {