You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2016/01/12 18:56:46 UTC
[15/18] hive git commit: HIVE-12625: Backport to branch-1 HIVE-11981
ORC Schema Evolution Issues (Vectorized, ACID,
and Non-Vectorized) (Matt McCline,
reviewed by Prasanth J) HIVE-12728: Apply DDL restrictions for ORC schema
evolution (Prasanth Jayachan
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
index ecd9b14..22f61ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
@@ -30,6 +30,9 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
@@ -50,6 +53,7 @@ import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
@@ -62,10 +66,64 @@ import org.apache.hadoop.io.Text;
*/
public class TreeReaderFactory {
- protected abstract static class TreeReader {
+ private static final Log LOG = LogFactory.getLog(TreeReaderFactory.class);
+
+ public static class TreeReaderSchema {
+
+ /**
+ * The types in the ORC file.
+ */
+ List<OrcProto.Type> fileTypes;
+
+ /**
+ * The treeReaderSchema that the reader should read as.
+ */
+ List<OrcProto.Type> schemaTypes;
+
+ /**
+ * The subtype of the row STRUCT. Different than 0 for ACID.
+ */
+ int innerStructSubtype;
+
+ public TreeReaderSchema() {
+ fileTypes = null;
+ schemaTypes = null;
+ innerStructSubtype = -1;
+ }
+
+ public TreeReaderSchema fileTypes(List<OrcProto.Type> fileTypes) {
+ this.fileTypes = fileTypes;
+ return this;
+ }
+
+ public TreeReaderSchema schemaTypes(List<OrcProto.Type> schemaTypes) {
+ this.schemaTypes = schemaTypes;
+ return this;
+ }
+
+ public TreeReaderSchema innerStructSubtype(int innerStructSubtype) {
+ this.innerStructSubtype = innerStructSubtype;
+ return this;
+ }
+
+ public List<OrcProto.Type> getFileTypes() {
+ return fileTypes;
+ }
+
+ public List<OrcProto.Type> getSchemaTypes() {
+ return schemaTypes;
+ }
+
+ public int getInnerStructSubtype() {
+ return innerStructSubtype;
+ }
+ }
+
+ public abstract static class TreeReader {
protected final int columnId;
protected BitFieldReader present = null;
protected boolean valuePresent = false;
+ protected int vectorColumnCount;
TreeReader(int columnId) throws IOException {
this(columnId, null);
@@ -79,6 +137,11 @@ public class TreeReaderFactory {
} else {
present = new BitFieldReader(in, 1);
}
+ vectorColumnCount = -1;
+ }
+
+ void setVectorColumnCount(int vectorColumnCount) {
+ this.vectorColumnCount = vectorColumnCount;
}
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
@@ -1947,24 +2010,56 @@ public class TreeReaderFactory {
}
protected static class StructTreeReader extends TreeReader {
+ private final int fileColumnCount;
+ private final int resultColumnCount;
protected final TreeReader[] fields;
private final String[] fieldNames;
- StructTreeReader(int columnId,
- List<OrcProto.Type> types,
+ protected StructTreeReader(
+ int columnId,
+ TreeReaderSchema treeReaderSchema,
boolean[] included,
boolean skipCorrupt) throws IOException {
super(columnId);
- OrcProto.Type type = types.get(columnId);
- int fieldCount = type.getFieldNamesCount();
- this.fields = new TreeReader[fieldCount];
- this.fieldNames = new String[fieldCount];
- for (int i = 0; i < fieldCount; ++i) {
- int subtype = type.getSubtypes(i);
- if (included == null || included[subtype]) {
- this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt);
+
+ OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId);
+ fileColumnCount = fileStructType.getFieldNamesCount();
+
+ OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId);
+
+ if (columnId == treeReaderSchema.getInnerStructSubtype()) {
+ // If there are more result columns than reader columns, we will default those additional
+ // columns to NULL.
+ resultColumnCount = schemaStructType.getFieldNamesCount();
+ } else {
+ resultColumnCount = fileColumnCount;
+ }
+
+ this.fields = new TreeReader[fileColumnCount];
+ this.fieldNames = new String[fileColumnCount];
+
+ if (included == null) {
+ for (int i = 0; i < fileColumnCount; ++i) {
+ int subtype = schemaStructType.getSubtypes(i);
+ this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
+ // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
+ this.fieldNames[i] = schemaStructType.getFieldNames(i);
+ }
+ } else {
+ for (int i = 0; i < fileColumnCount; ++i) {
+ int subtype = schemaStructType.getSubtypes(i);
+ if (subtype >= included.length) {
+ throw new IOException("subtype " + subtype + " exceeds the included array size " +
+ included.length + " fileTypes " + treeReaderSchema.getFileTypes().toString() +
+ " schemaTypes " + treeReaderSchema.getSchemaTypes().toString() +
+ " innerStructSubtype " + treeReaderSchema.getInnerStructSubtype());
+ }
+ if (included[subtype]) {
+ this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
+ }
+ // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
+ this.fieldNames[i] = schemaStructType.getFieldNames(i);
}
- this.fieldNames[i] = type.getFieldNames(i);
}
}
@@ -1984,22 +2079,28 @@ public class TreeReaderFactory {
OrcStruct result = null;
if (valuePresent) {
if (previous == null) {
- result = new OrcStruct(fields.length);
+ result = new OrcStruct(resultColumnCount);
} else {
result = (OrcStruct) previous;
// If the input format was initialized with a file with a
// different number of fields, the number of fields needs to
// be updated to the correct number
- if (result.getNumFields() != fields.length) {
- result.setNumFields(fields.length);
+ if (result.getNumFields() != resultColumnCount) {
+ result.setNumFields(resultColumnCount);
}
}
- for (int i = 0; i < fields.length; ++i) {
+ for (int i = 0; i < fileColumnCount; ++i) {
if (fields[i] != null) {
result.setFieldValue(i, fields[i].next(result.getFieldValue(i)));
}
}
+ if (resultColumnCount > fileColumnCount) {
+ for (int i = fileColumnCount; i < resultColumnCount; ++i) {
+ // Default new treeReaderSchema evolution fields to NULL.
+ result.setFieldValue(i, null);
+ }
+ }
}
return result;
}
@@ -2008,13 +2109,13 @@ public class TreeReaderFactory {
public Object nextVector(Object previousVector, long batchSize) throws IOException {
final ColumnVector[] result;
if (previousVector == null) {
- result = new ColumnVector[fields.length];
+ result = new ColumnVector[fileColumnCount];
} else {
result = (ColumnVector[]) previousVector;
}
// Read all the members of struct as column vectors
- for (int i = 0; i < fields.length; i++) {
+ for (int i = 0; i < fileColumnCount; i++) {
if (fields[i] != null) {
if (result[i] == null) {
result[i] = (ColumnVector) fields[i].nextVector(null, batchSize);
@@ -2023,6 +2124,19 @@ public class TreeReaderFactory {
}
}
}
+
+ // Default additional treeReaderSchema evolution fields to NULL.
+ if (vectorColumnCount != -1 && vectorColumnCount > fileColumnCount) {
+ for (int i = fileColumnCount; i < vectorColumnCount; ++i) {
+ ColumnVector colVector = result[i];
+ if (colVector != null) {
+ colVector.isRepeating = true;
+ colVector.noNulls = false;
+ colVector.isNull[0] = true;
+ }
+ }
+ }
+
return result;
}
@@ -2053,18 +2167,18 @@ public class TreeReaderFactory {
protected final TreeReader[] fields;
protected RunLengthByteReader tags;
- UnionTreeReader(int columnId,
- List<OrcProto.Type> types,
+ protected UnionTreeReader(int columnId,
+ TreeReaderSchema treeReaderSchema,
boolean[] included,
boolean skipCorrupt) throws IOException {
super(columnId);
- OrcProto.Type type = types.get(columnId);
+ OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
int fieldCount = type.getSubtypesCount();
this.fields = new TreeReader[fieldCount];
for (int i = 0; i < fieldCount; ++i) {
int subtype = type.getSubtypes(i);
if (included == null || included[subtype]) {
- this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt);
+ this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
}
}
}
@@ -2133,13 +2247,13 @@ public class TreeReaderFactory {
protected final TreeReader elementReader;
protected IntegerReader lengths = null;
- ListTreeReader(int columnId,
- List<OrcProto.Type> types,
+ protected ListTreeReader(int columnId,
+ TreeReaderSchema treeReaderSchema,
boolean[] included,
boolean skipCorrupt) throws IOException {
super(columnId);
- OrcProto.Type type = types.get(columnId);
- elementReader = createTreeReader(type.getSubtypes(0), types, included, skipCorrupt);
+ OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
+ elementReader = createTreeReader(type.getSubtypes(0), treeReaderSchema, included, skipCorrupt);
}
@Override
@@ -2223,21 +2337,21 @@ public class TreeReaderFactory {
protected final TreeReader valueReader;
protected IntegerReader lengths = null;
- MapTreeReader(int columnId,
- List<OrcProto.Type> types,
+ protected MapTreeReader(int columnId,
+ TreeReaderSchema treeReaderSchema,
boolean[] included,
boolean skipCorrupt) throws IOException {
super(columnId);
- OrcProto.Type type = types.get(columnId);
+ OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
int keyColumn = type.getSubtypes(0);
int valueColumn = type.getSubtypes(1);
if (included == null || included[keyColumn]) {
- keyReader = createTreeReader(keyColumn, types, included, skipCorrupt);
+ keyReader = createTreeReader(keyColumn, treeReaderSchema, included, skipCorrupt);
} else {
keyReader = null;
}
if (included == null || included[valueColumn]) {
- valueReader = createTreeReader(valueColumn, types, included, skipCorrupt);
+ valueReader = createTreeReader(valueColumn, treeReaderSchema, included, skipCorrupt);
} else {
valueReader = null;
}
@@ -2317,11 +2431,11 @@ public class TreeReaderFactory {
}
public static TreeReader createTreeReader(int columnId,
- List<OrcProto.Type> types,
+ TreeReaderSchema treeReaderSchema,
boolean[] included,
boolean skipCorrupt
) throws IOException {
- OrcProto.Type type = types.get(columnId);
+ OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
switch (type.getKind()) {
case BOOLEAN:
return new BooleanTreeReader(columnId);
@@ -2361,13 +2475,13 @@ public class TreeReaderFactory {
int scale = type.hasScale() ? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE;
return new DecimalTreeReader(columnId, precision, scale);
case STRUCT:
- return new StructTreeReader(columnId, types, included, skipCorrupt);
+ return new StructTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
case LIST:
- return new ListTreeReader(columnId, types, included, skipCorrupt);
+ return new ListTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
case MAP:
- return new MapTreeReader(columnId, types, included, skipCorrupt);
+ return new MapTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
case UNION:
- return new UnionTreeReader(columnId, types, included, skipCorrupt);
+ return new UnionTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
default:
throw new IllegalArgumentException("Unsupported type " +
type.getKind());
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
new file mode 100644
index 0000000..3c0d590
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TypeDescription.java
@@ -0,0 +1,514 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This is the description of the types in an ORC file.
+ */
+public class TypeDescription {
+ private static final int MAX_PRECISION = 38;
+ private static final int MAX_SCALE = 38;
+ private static final int DEFAULT_PRECISION = 38;
+ private static final int DEFAULT_SCALE = 10;
+ private static final int DEFAULT_LENGTH = 256;
+ public enum Category {
+ BOOLEAN("boolean", true),
+ BYTE("tinyint", true),
+ SHORT("smallint", true),
+ INT("int", true),
+ LONG("bigint", true),
+ FLOAT("float", true),
+ DOUBLE("double", true),
+ STRING("string", true),
+ DATE("date", true),
+ TIMESTAMP("timestamp", true),
+ BINARY("binary", true),
+ DECIMAL("decimal", true),
+ VARCHAR("varchar", true),
+ CHAR("char", true),
+ LIST("array", false),
+ MAP("map", false),
+ STRUCT("struct", false),
+ UNION("union", false);
+
+ Category(String name, boolean isPrimitive) {
+ this.name = name;
+ this.isPrimitive = isPrimitive;
+ }
+
+ final boolean isPrimitive;
+ final String name;
+
+ public boolean isPrimitive() {
+ return isPrimitive;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
+
+ public static TypeDescription createBoolean() {
+ return new TypeDescription(Category.BOOLEAN);
+ }
+
+ public static TypeDescription createByte() {
+ return new TypeDescription(Category.BYTE);
+ }
+
+ public static TypeDescription createShort() {
+ return new TypeDescription(Category.SHORT);
+ }
+
+ public static TypeDescription createInt() {
+ return new TypeDescription(Category.INT);
+ }
+
+ public static TypeDescription createLong() {
+ return new TypeDescription(Category.LONG);
+ }
+
+ public static TypeDescription createFloat() {
+ return new TypeDescription(Category.FLOAT);
+ }
+
+ public static TypeDescription createDouble() {
+ return new TypeDescription(Category.DOUBLE);
+ }
+
+ public static TypeDescription createString() {
+ return new TypeDescription(Category.STRING);
+ }
+
+ public static TypeDescription createDate() {
+ return new TypeDescription(Category.DATE);
+ }
+
+ public static TypeDescription createTimestamp() {
+ return new TypeDescription(Category.TIMESTAMP);
+ }
+
+ public static TypeDescription createBinary() {
+ return new TypeDescription(Category.BINARY);
+ }
+
+ public static TypeDescription createDecimal() {
+ return new TypeDescription(Category.DECIMAL);
+ }
+
+ /**
+ * For decimal types, set the precision.
+ * @param precision the new precision
+ * @return this
+ */
+ public TypeDescription withPrecision(int precision) {
+ if (category != Category.DECIMAL) {
+ throw new IllegalArgumentException("precision is only allowed on decimal"+
+ " and not " + category.name);
+ } else if (precision < 1 || precision > MAX_PRECISION || scale > precision){
+ throw new IllegalArgumentException("precision " + precision +
+ " is out of range 1 .. " + scale);
+ }
+ this.precision = precision;
+ return this;
+ }
+
+ /**
+ * For decimal types, set the scale.
+ * @param scale the new scale
+ * @return this
+ */
+ public TypeDescription withScale(int scale) {
+ if (category != Category.DECIMAL) {
+ throw new IllegalArgumentException("scale is only allowed on decimal"+
+ " and not " + category.name);
+ } else if (scale < 0 || scale > MAX_SCALE || scale > precision) {
+ throw new IllegalArgumentException("scale is out of range at " + scale);
+ }
+ this.scale = scale;
+ return this;
+ }
+
+ public static TypeDescription createVarchar() {
+ return new TypeDescription(Category.VARCHAR);
+ }
+
+ public static TypeDescription createChar() {
+ return new TypeDescription(Category.CHAR);
+ }
+
+ /**
+ * Set the maximum length for char and varchar types.
+ * @param maxLength the maximum value
+ * @return this
+ */
+ public TypeDescription withMaxLength(int maxLength) {
+ if (category != Category.VARCHAR && category != Category.CHAR) {
+ throw new IllegalArgumentException("maxLength is only allowed on char" +
+ " and varchar and not " + category.name);
+ }
+ this.maxLength = maxLength;
+ return this;
+ }
+
+ public static TypeDescription createList(TypeDescription childType) {
+ TypeDescription result = new TypeDescription(Category.LIST);
+ result.children.add(childType);
+ childType.parent = result;
+ return result;
+ }
+
+ public static TypeDescription createMap(TypeDescription keyType,
+ TypeDescription valueType) {
+ TypeDescription result = new TypeDescription(Category.MAP);
+ result.children.add(keyType);
+ result.children.add(valueType);
+ keyType.parent = result;
+ valueType.parent = result;
+ return result;
+ }
+
+ public static TypeDescription createUnion() {
+ return new TypeDescription(Category.UNION);
+ }
+
+ public static TypeDescription createStruct() {
+ return new TypeDescription(Category.STRUCT);
+ }
+
+ /**
+ * Add a child to a union type.
+ * @param child a new child type to add
+ * @return the union type.
+ */
+ public TypeDescription addUnionChild(TypeDescription child) {
+ if (category != Category.UNION) {
+ throw new IllegalArgumentException("Can only add types to union type" +
+ " and not " + category);
+ }
+ children.add(child);
+ child.parent = this;
+ return this;
+ }
+
+ /**
+ * Add a field to a struct type as it is built.
+ * @param field the field name
+ * @param fieldType the type of the field
+ * @return the struct type
+ */
+ public TypeDescription addField(String field, TypeDescription fieldType) {
+ if (category != Category.STRUCT) {
+ throw new IllegalArgumentException("Can only add fields to struct type" +
+ " and not " + category);
+ }
+ fieldNames.add(field);
+ children.add(fieldType);
+ fieldType.parent = this;
+ return this;
+ }
+
+ /**
+ * Get the id for this type.
+ * The first call will cause all of the the ids in tree to be assigned, so
+ * it should not be called before the type is completely built.
+ * @return the sequential id
+ */
+ public int getId() {
+ // if the id hasn't been assigned, assign all of the ids from the root
+ if (id == -1) {
+ TypeDescription root = this;
+ while (root.parent != null) {
+ root = root.parent;
+ }
+ root.assignIds(0);
+ }
+ return id;
+ }
+
+ /**
+ * Get the maximum id assigned to this type or its children.
+ * The first call will cause all of the the ids in tree to be assigned, so
+ * it should not be called before the type is completely built.
+ * @return the maximum id assigned under this type
+ */
+ public int getMaximumId() {
+ // if the id hasn't been assigned, assign all of the ids from the root
+ if (maxId == -1) {
+ TypeDescription root = this;
+ while (root.parent != null) {
+ root = root.parent;
+ }
+ root.assignIds(0);
+ }
+ return maxId;
+ }
+
+ private ColumnVector createColumn() {
+ switch (category) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case TIMESTAMP:
+ case DATE:
+ return new LongColumnVector();
+ case FLOAT:
+ case DOUBLE:
+ return new DoubleColumnVector();
+ case DECIMAL:
+ return new DecimalColumnVector(precision, scale);
+ case STRING:
+ case BINARY:
+ case CHAR:
+ case VARCHAR:
+ return new BytesColumnVector();
+ default:
+ throw new IllegalArgumentException("Unknown type " + category);
+ }
+ }
+
+ public VectorizedRowBatch createRowBatch() {
+ VectorizedRowBatch result;
+ if (category == Category.STRUCT) {
+ result = new VectorizedRowBatch(children.size(),
+ VectorizedRowBatch.DEFAULT_SIZE);
+ for(int i=0; i < result.cols.length; ++i) {
+ result.cols[i] = children.get(i).createColumn();
+ }
+ } else {
+ result = new VectorizedRowBatch(1, VectorizedRowBatch.DEFAULT_SIZE);
+ result.cols[0] = createColumn();
+ }
+ result.reset();
+ return result;
+ }
+
+ /**
+ * Get the kind of this type.
+ * @return get the category for this type.
+ */
+ public Category getCategory() {
+ return category;
+ }
+
+ /**
+ * Get the maximum length of the type. Only used for char and varchar types.
+ * @return the maximum length of the string type
+ */
+ public int getMaxLength() {
+ return maxLength;
+ }
+
+ /**
+ * Get the precision of the decimal type.
+ * @return the number of digits for the precision.
+ */
+ public int getPrecision() {
+ return precision;
+ }
+
+ /**
+ * Get the scale of the decimal type.
+ * @return the number of digits for the scale.
+ */
+ public int getScale() {
+ return scale;
+ }
+
+ /**
+ * For struct types, get the list of field names.
+ * @return the list of field names.
+ */
+ public List<String> getFieldNames() {
+ return Collections.unmodifiableList(fieldNames);
+ }
+
+ /**
+ * Get the subtypes of this type.
+ * @return the list of children types
+ */
+ public List<TypeDescription> getChildren() {
+ return children == null ? null : Collections.unmodifiableList(children);
+ }
+
+ /**
+ * Assign ids to all of the nodes under this one.
+ * @param startId the lowest id to assign
+ * @return the next available id
+ */
+ private int assignIds(int startId) {
+ id = startId++;
+ if (children != null) {
+ for (TypeDescription child : children) {
+ startId = child.assignIds(startId);
+ }
+ }
+ maxId = startId - 1;
+ return startId;
+ }
+
+ private TypeDescription(Category category) {
+ this.category = category;
+ if (category.isPrimitive) {
+ children = null;
+ } else {
+ children = new ArrayList<>();
+ }
+ if (category == Category.STRUCT) {
+ fieldNames = new ArrayList<>();
+ } else {
+ fieldNames = null;
+ }
+ }
+
+ private int id = -1;
+ private int maxId = -1;
+ private TypeDescription parent;
+ private final Category category;
+ private final List<TypeDescription> children;
+ private final List<String> fieldNames;
+ private int maxLength = DEFAULT_LENGTH;
+ private int precision = DEFAULT_PRECISION;
+ private int scale = DEFAULT_SCALE;
+
+ public void printToBuffer(StringBuilder buffer) {
+ buffer.append(category.name);
+ switch (category) {
+ case DECIMAL:
+ buffer.append('(');
+ buffer.append(precision);
+ buffer.append(',');
+ buffer.append(scale);
+ buffer.append(')');
+ break;
+ case CHAR:
+ case VARCHAR:
+ buffer.append('(');
+ buffer.append(maxLength);
+ buffer.append(')');
+ break;
+ case LIST:
+ case MAP:
+ case UNION:
+ buffer.append('<');
+ for(int i=0; i < children.size(); ++i) {
+ if (i != 0) {
+ buffer.append(',');
+ }
+ children.get(i).printToBuffer(buffer);
+ }
+ buffer.append('>');
+ break;
+ case STRUCT:
+ buffer.append('<');
+ for(int i=0; i < children.size(); ++i) {
+ if (i != 0) {
+ buffer.append(',');
+ }
+ buffer.append(fieldNames.get(i));
+ buffer.append(':');
+ children.get(i).printToBuffer(buffer);
+ }
+ buffer.append('>');
+ break;
+ default:
+ break;
+ }
+ }
+
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ printToBuffer(buffer);
+ return buffer.toString();
+ }
+
+ private void printJsonToBuffer(String prefix, StringBuilder buffer,
+ int indent) {
+ for(int i=0; i < indent; ++i) {
+ buffer.append(' ');
+ }
+ buffer.append(prefix);
+ buffer.append("{\"category\": \"");
+ buffer.append(category.name);
+ buffer.append("\", \"id\": ");
+ buffer.append(getId());
+ buffer.append(", \"max\": ");
+ buffer.append(maxId);
+ switch (category) {
+ case DECIMAL:
+ buffer.append(", \"precision\": ");
+ buffer.append(precision);
+ buffer.append(", \"scale\": ");
+ buffer.append(scale);
+ break;
+ case CHAR:
+ case VARCHAR:
+ buffer.append(", \"length\": ");
+ buffer.append(maxLength);
+ break;
+ case LIST:
+ case MAP:
+ case UNION:
+ buffer.append(", \"children\": [");
+ for(int i=0; i < children.size(); ++i) {
+ buffer.append('\n');
+ children.get(i).printJsonToBuffer("", buffer, indent + 2);
+ if (i != children.size() - 1) {
+ buffer.append(',');
+ }
+ }
+ buffer.append("]");
+ break;
+ case STRUCT:
+ buffer.append(", \"fields\": [");
+ for(int i=0; i < children.size(); ++i) {
+ buffer.append('\n');
+ children.get(i).printJsonToBuffer("\"" + fieldNames.get(i) + "\": ",
+ buffer, indent + 2);
+ if (i != children.size() - 1) {
+ buffer.append(',');
+ }
+ }
+ buffer.append(']');
+ break;
+ default:
+ break;
+ }
+ buffer.append('}');
+ }
+
+ public String toJson() {
+ StringBuilder buffer = new StringBuilder();
+ printJsonToBuffer("", buffer, 0);
+ return buffer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
index a8e5c2e..a2725b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -46,31 +45,25 @@ class VectorizedOrcAcidRowReader
private final AcidInputFormat.RowReader<OrcStruct> innerReader;
private final RecordIdentifier key;
private final OrcStruct value;
- private final VectorizedRowBatchCtx rowBatchCtx;
+ private VectorizedRowBatchCtx rbCtx;
+ private Object[] partitionValues;
private final ObjectInspector objectInspector;
private final DataOutputBuffer buffer = new DataOutputBuffer();
VectorizedOrcAcidRowReader(AcidInputFormat.RowReader<OrcStruct> inner,
Configuration conf,
+ VectorizedRowBatchCtx vectorizedRowBatchCtx,
FileSplit split) throws IOException {
this.innerReader = inner;
this.key = inner.createKey();
- this.rowBatchCtx = new VectorizedRowBatchCtx();
+ rbCtx = vectorizedRowBatchCtx;
+ int partitionColumnCount = rbCtx.getPartitionColumnCount();
+ if (partitionColumnCount > 0) {
+ partitionValues = new Object[partitionColumnCount];
+ rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues);
+ }
this.value = inner.createValue();
this.objectInspector = inner.getObjectInspector();
- try {
- rowBatchCtx.init(conf, split);
- } catch (ClassNotFoundException e) {
- throw new IOException("Failed to initialize context", e);
- } catch (SerDeException e) {
- throw new IOException("Failed to initialize context", e);
- } catch (InstantiationException e) {
- throw new IOException("Failed to initialize context", e);
- } catch (IllegalAccessException e) {
- throw new IOException("Failed to initialize context", e);
- } catch (HiveException e) {
- throw new IOException("Failed to initialize context", e);
- }
}
@Override
@@ -82,23 +75,21 @@ class VectorizedOrcAcidRowReader
if (!innerReader.next(key, value)) {
return false;
}
- try {
- rowBatchCtx.addPartitionColsToBatch(vectorizedRowBatch);
- } catch (HiveException e) {
- throw new IOException("Problem adding partition column", e);
+ if (partitionValues != null) {
+ rbCtx.addPartitionColsToBatch(vectorizedRowBatch, partitionValues);
}
try {
VectorizedBatchUtil.acidAddRowToBatch(value,
(StructObjectInspector) objectInspector,
- vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer);
+ vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer);
while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length &&
innerReader.next(key, value)) {
VectorizedBatchUtil.acidAddRowToBatch(value,
(StructObjectInspector) objectInspector,
- vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer);
+ vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer);
}
- } catch (HiveException he) {
- throw new IOException("error iterating", he);
+ } catch (Exception e) {
+ throw new IOException("error iterating", e);
}
return true;
}
@@ -110,11 +101,7 @@ class VectorizedOrcAcidRowReader
@Override
public VectorizedRowBatch createValue() {
- try {
- return rowBatchCtx.createVectorizedRowBatch();
- } catch (HiveException e) {
- throw new RuntimeException("Error creating a batch", e);
- }
+ return rbCtx.createVectorizedRowBatch();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index bf09001..d90425a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -27,14 +26,12 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
@@ -47,7 +44,8 @@ import org.apache.hadoop.mapred.Reporter;
* A MapReduce/Hive input format for ORC files.
*/
public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
- implements InputFormatChecker, VectorizedInputFormatInterface {
+ implements InputFormatChecker, VectorizedInputFormatInterface,
+ SelfDescribingInputFormatInterface {
static class VectorizedOrcRecordReader
implements RecordReader<NullWritable, VectorizedRowBatch> {
@@ -56,12 +54,29 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
private final long length;
private float progress = 0.0f;
private VectorizedRowBatchCtx rbCtx;
+ private final boolean[] columnsToIncludeTruncated;
+ private final Object[] partitionValues;
private boolean addPartitionCols = true;
VectorizedOrcRecordReader(Reader file, Configuration conf,
FileSplit fileSplit) throws IOException {
+
+ // if HiveCombineInputFormat gives us FileSplits instead of OrcSplits,
+ // we know it is not ACID. (see a check in CombineHiveInputFormat.getSplits() that assures this).
+ //
+ // Why would an ACID table reach here instead of VectorizedOrcAcidRowReader?
+ // OrcInputFormat.getRecordReader will use this reader for original files that have no deltas.
+ //
+ boolean isAcid = (fileSplit instanceof OrcSplit);
+
+ /**
+ * Do we have schema on read in the configuration variables?
+ */
+ TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, isAcid);
+
List<OrcProto.Type> types = file.getTypes();
Reader.Options options = new Reader.Options();
+ options.schema(schema);
this.offset = fileSplit.getStart();
this.length = fileSplit.getLength();
options.range(offset, length);
@@ -69,11 +84,17 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
OrcInputFormat.setSearchArgument(options, types, conf, true);
this.reader = file.rowsOptions(options);
- try {
- rbCtx = new VectorizedRowBatchCtx();
- rbCtx.init(conf, fileSplit);
- } catch (Exception e) {
- throw new RuntimeException(e);
+
+ rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+
+ columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(conf);
+
+ int partitionColumnCount = rbCtx.getPartitionColumnCount();
+ if (partitionColumnCount > 0) {
+ partitionValues = new Object[partitionColumnCount];
+ rbCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues);
+ } else {
+ partitionValues = null;
}
}
@@ -90,7 +111,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
// as this does not call CreateValue for each new RecordReader it creates, this check is
// required in next()
if (addPartitionCols) {
- rbCtx.addPartitionColsToBatch(value);
+ if (partitionValues != null) {
+ rbCtx.addPartitionColsToBatch(value, partitionValues);
+ }
addPartitionCols = false;
}
reader.nextBatch(value);
@@ -108,11 +131,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
@Override
public VectorizedRowBatch createValue() {
- try {
- return rbCtx.createVectorizedRowBatch();
- } catch (HiveException e) {
- throw new RuntimeException("Error creating a batch", e);
- }
+ return rbCtx.createVectorizedRowBatch(columnsToIncludeTruncated);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
index ed99615..c6070c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
@@ -14,8 +14,11 @@
package org.apache.hadoop.hive.ql.io.parquet;
import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssign;
import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssignFactory;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
@@ -23,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
@@ -32,7 +36,6 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-
import org.apache.parquet.hadoop.ParquetInputFormat;
/**
@@ -52,6 +55,7 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable,
private final ParquetRecordReaderWrapper internalReader;
private VectorizedRowBatchCtx rbCtx;
+ private Object[] partitionValues;
private ArrayWritable internalValues;
private NullWritable internalKey;
private VectorColumnAssign[] assigners;
@@ -65,11 +69,11 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable,
split,
conf,
reporter);
- try {
- rbCtx = new VectorizedRowBatchCtx();
- rbCtx.init(conf, split);
- } catch (Exception e) {
- throw new RuntimeException(e);
+ rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+ int partitionColumnCount = rbCtx.getPartitionColumnCount();
+ if (partitionColumnCount > 0) {
+ partitionValues = new Object[partitionColumnCount];
+ rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues);
}
}
@@ -81,13 +85,9 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable,
@Override
public VectorizedRowBatch createValue() {
- VectorizedRowBatch outputBatch = null;
- try {
- outputBatch = rbCtx.createVectorizedRowBatch();
- internalValues = internalReader.createValue();
- } catch (HiveException e) {
- throw new RuntimeException("Error creating a batch", e);
- }
+ VectorizedRowBatch outputBatch;
+ outputBatch = rbCtx.createVectorizedRowBatch();
+ internalValues = internalReader.createValue();
return outputBatch;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 5708cb8..40d0e34 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -109,6 +109,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.InputFormat;
@@ -700,6 +702,11 @@ public final class GenMapRedUtils {
parseCtx.getGlobalLimitCtx().disableOpt();
}
+ if (topOp instanceof TableScanOperator) {
+ Utilities.addSchemaEvolutionToTableScanOperator(partsList.getSourceTable(),
+ (TableScanOperator) topOp);
+ }
+
Iterator<Path> iterPath = partDir.iterator();
Iterator<PartitionDesc> iterPartnDesc = partDesc.iterator();
@@ -761,6 +768,7 @@ public final class GenMapRedUtils {
* whether you need to add to map-reduce or local work
* @param tt_desc
* table descriptor
+ * @throws SerDeException
*/
public static void setTaskPlan(String path, String alias,
Operator<? extends OperatorDesc> topOp, MapWork plan, boolean local,
@@ -770,6 +778,16 @@ public final class GenMapRedUtils {
return;
}
+ if (topOp instanceof TableScanOperator) {
+ try {
+ Utilities.addSchemaEvolutionToTableScanOperator(
+ (StructObjectInspector) tt_desc.getDeserializer().getObjectInspector(),
+ (TableScanOperator) topOp);
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
if (!local) {
if (plan.getPathToAliases().get(path) == null) {
plan.getPathToAliases().put(path, new ArrayList<String>());
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index 2af6f9a..20e1ee6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -370,6 +370,7 @@ public class SimpleFetchOptimizer implements Transform {
private FetchWork convertToWork() throws HiveException {
inputs.clear();
+ Utilities.addSchemaEvolutionToTableScanOperator(table, scanOp);
TableDesc tableDesc = Utilities.getTableDesc(table);
if (!table.isPartitioned()) {
inputs.add(new ReadEntity(table, parent, !table.isView() && parent == null));
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
index 20f9400..5d010cc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.Stack;
@@ -33,6 +34,7 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.*;
@@ -63,6 +65,11 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -88,6 +95,7 @@ import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionConversion;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
@@ -100,6 +108,7 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind;
+import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.UDFAcos;
import org.apache.hadoop.hive.ql.udf.UDFAsin;
@@ -149,6 +158,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import com.google.common.base.Joiner;
+
public class Vectorizer implements PhysicalPlanResolver {
protected static transient final Log LOG = LogFactory.getLog(Vectorizer.class);
@@ -311,17 +322,51 @@ public class Vectorizer implements PhysicalPlanResolver {
supportedAggregationUdfs.add("stddev_samp");
}
+ private class VectorTaskColumnInfo {
+ List<String> columnNames;
+ List<TypeInfo> typeInfos;
+ int partitionColumnCount;
+
+ String[] scratchTypeNameArray;
+
+ VectorTaskColumnInfo() {
+ partitionColumnCount = 0;
+ }
+
+ public void setColumnNames(List<String> columnNames) {
+ this.columnNames = columnNames;
+ }
+ public void setTypeInfos(List<TypeInfo> typeInfos) {
+ this.typeInfos = typeInfos;
+ }
+ public void setPartitionColumnCount(int partitionColumnCount) {
+ this.partitionColumnCount = partitionColumnCount;
+ }
+ public void setScratchTypeNameArray(String[] scratchTypeNameArray) {
+ this.scratchTypeNameArray = scratchTypeNameArray;
+ }
+
+ public void transferToBaseWork(BaseWork baseWork) {
+
+ String[] columnNameArray = columnNames.toArray(new String[0]);
+ TypeInfo[] typeInfoArray = typeInfos.toArray(new TypeInfo[0]);
+
+ VectorizedRowBatchCtx vectorizedRowBatchCtx =
+ new VectorizedRowBatchCtx(
+ columnNameArray,
+ typeInfoArray,
+ partitionColumnCount,
+ scratchTypeNameArray);
+ baseWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx);
+ }
+ }
+
class VectorizationDispatcher implements Dispatcher {
private final PhysicalContext physicalContext;
- private List<String> reduceColumnNames;
- private List<TypeInfo> reduceTypeInfos;
-
public VectorizationDispatcher(PhysicalContext physicalContext) {
this.physicalContext = physicalContext;
- reduceColumnNames = null;
- reduceTypeInfos = null;
}
@Override
@@ -359,9 +404,10 @@ public class Vectorizer implements PhysicalPlanResolver {
}
private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
- boolean ret = validateMapWork(mapWork, isTez);
+ VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
+ boolean ret = validateMapWork(mapWork, vectorTaskColumnInfo, isTez);
if (ret) {
- vectorizeMapWork(mapWork, isTez);
+ vectorizeMapWork(mapWork, vectorTaskColumnInfo, isTez);
}
}
@@ -372,40 +418,262 @@ public class Vectorizer implements PhysicalPlanResolver {
+ ReduceSinkOperator.getOperatorName()), np);
}
- private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
- LOG.info("Validating MapWork...");
+ private ImmutablePair<String, TableScanOperator> verifyOnlyOneTableScanOperator(MapWork mapWork) {
// Eliminate MR plans with more than one TableScanOperator.
+
LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mapWork.getAliasToWork();
if ((aliasToWork == null) || (aliasToWork.size() == 0)) {
- return false;
+ return null;
}
int tableScanCount = 0;
- for (Operator<?> op : aliasToWork.values()) {
+ String alias = "";
+ TableScanOperator tableScanOperator = null;
+ for (Entry<String, Operator<? extends OperatorDesc>> entry : aliasToWork.entrySet()) {
+ Operator<?> op = entry.getValue();
if (op == null) {
LOG.warn("Map work has invalid aliases to work with. Fail validation!");
- return false;
+ return null;
}
if (op instanceof TableScanOperator) {
tableScanCount++;
+ alias = entry.getKey();
+ tableScanOperator = (TableScanOperator) op;
}
}
if (tableScanCount > 1) {
- LOG.warn("Map work has more than 1 TableScanOperator aliases to work with. Fail validation!");
- return false;
+ LOG.warn("Map work has more than 1 TableScanOperator. Fail validation!");
+ return null;
+ }
+ return new ImmutablePair(alias, tableScanOperator);
+ }
+
+ private void getTableScanOperatorSchemaInfo(TableScanOperator tableScanOperator,
+ List<String> logicalColumnNameList, List<TypeInfo> logicalTypeInfoList) {
+
+ TableScanDesc tableScanDesc = tableScanOperator.getConf();
+
+ // Add all non-virtual columns to make a vectorization context for
+ // the TableScan operator.
+ RowSchema rowSchema = tableScanOperator.getSchema();
+ for (ColumnInfo c : rowSchema.getSignature()) {
+ // Validation will later exclude vectorization of virtual columns usage (HIVE-5560).
+ if (!isVirtualColumn(c)) {
+ String columnName = c.getInternalName();
+ String typeName = c.getTypeName();
+ TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName);
+
+ logicalColumnNameList.add(columnName);
+ logicalTypeInfoList.add(typeInfo);
+ }
+ }
+ }
+
+ private String getColumns(List<String> columnNames, int start, int length,
+ Character separator) {
+ return Joiner.on(separator).join(columnNames.subList(start, start + length));
+ }
+
+ private String getTypes(List<TypeInfo> typeInfos, int start, int length) {
+ return TypeInfoUtils.getTypesString(typeInfos.subList(start, start + length));
+ }
+
+ private boolean verifyAndSetVectorPartDesc(PartitionDesc pd) {
+
+ // Look for Pass-Thru case where InputFileFormat has VectorizedInputFormatInterface
+ // and reads VectorizedRowBatch as a "row".
+
+ if (Utilities.isInputFileFormatVectorized(pd)) {
+
+ pd.setVectorPartitionDesc(VectorPartitionDesc.createVectorizedInputFileFormat());
+
+ return true;
}
+ LOG.info("Input format: " + pd.getInputFileFormatClassName()
+ + ", doesn't provide vectorized input");
+
+ return false;
+ }
+
+ private boolean validateInputFormatAndSchemaEvolution(MapWork mapWork, String alias,
+ TableScanOperator tableScanOperator, VectorTaskColumnInfo vectorTaskColumnInfo) {
+
+ // These names/types are the data columns plus partition columns.
+ final List<String> allColumnNameList = new ArrayList<String>();
+ final List<TypeInfo> allTypeInfoList = new ArrayList<TypeInfo>();
+
+ getTableScanOperatorSchemaInfo(tableScanOperator, allColumnNameList, allTypeInfoList);
+ final int allColumnCount = allColumnNameList.size();
+
+ // Validate input format and schema evolution capability.
+
+ // For the table, enter a null value in the multi-key map indicating no conversion necessary
+ // if the schema matches the table.
+
+ HashMap<ImmutablePair, boolean[]> conversionMap = new HashMap<ImmutablePair, boolean[]>();
+
+ boolean isFirst = true;
+ int dataColumnCount = 0;
+ int partitionColumnCount = 0;
+
+ List<String> dataColumnList = null;
+ String dataColumnsString = "";
+ List<TypeInfo> dataTypeInfoList = null;
+
// Validate the input format
- for (String path : mapWork.getPathToPartitionInfo().keySet()) {
- PartitionDesc pd = mapWork.getPathToPartitionInfo().get(path);
- List<Class<?>> interfaceList =
- Arrays.asList(pd.getInputFileFormatClass().getInterfaces());
- if (!interfaceList.contains(VectorizedInputFormatInterface.class)) {
- LOG.info("Input format: " + pd.getInputFileFormatClassName()
- + ", doesn't provide vectorized input");
+ VectorPartitionConversion partitionConversion = new VectorPartitionConversion();
+ LinkedHashMap<String, ArrayList<String>> pathToAliases = mapWork.getPathToAliases();
+ LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = mapWork.getPathToPartitionInfo();
+ for (Entry<String, ArrayList<String>> entry: pathToAliases.entrySet()) {
+ String path = entry.getKey();
+ List<String> aliases = entry.getValue();
+ boolean isPresent = (aliases != null && aliases.indexOf(alias) != -1);
+ if (!isPresent) {
+ LOG.info("Alias " + alias + " not present in aliases " + aliases);
+ return false;
+ }
+ PartitionDesc partDesc = pathToPartitionInfo.get(path);
+ if (partDesc.getVectorPartitionDesc() != null) {
+ // We seen this already.
+ continue;
+ }
+ if (!verifyAndSetVectorPartDesc(partDesc)) {
return false;
}
+ VectorPartitionDesc vectorPartDesc = partDesc.getVectorPartitionDesc();
+ LOG.info("Vectorizer path: " + path + ", read type " +
+ vectorPartDesc.getVectorMapOperatorReadType().name() + ", aliases " + aliases);
+
+ Properties partProps = partDesc.getProperties();
+
+ String nextDataColumnsString =
+ partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
+ String[] nextDataColumns = nextDataColumnsString.split(",");
+
+ String nextDataTypesString =
+ partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
+
+ // We convert to an array of TypeInfo using a library routine since it parses the information
+ // and can handle use of different separators, etc. We cannot use the raw type string
+ // for comparison in the map because of the different separators used.
+ List<TypeInfo> nextDataTypeInfoList =
+ TypeInfoUtils.getTypeInfosFromTypeString(nextDataTypesString);
+
+ if (isFirst) {
+
+ // We establish with the first one whether the table is partitioned or not.
+
+ LinkedHashMap<String, String> partSpec = partDesc.getPartSpec();
+ if (partSpec != null && partSpec.size() > 0) {
+ partitionColumnCount = partSpec.size();
+ dataColumnCount = allColumnCount - partitionColumnCount;
+ } else {
+ partitionColumnCount = 0;
+ dataColumnCount = allColumnCount;
+ }
+
+ dataColumnList = allColumnNameList.subList(0, dataColumnCount);
+ dataColumnsString = getColumns(allColumnNameList, 0, dataColumnCount, ',');
+ dataTypeInfoList = allTypeInfoList.subList(0, dataColumnCount);
+
+ // Add the table (non-partitioned) columns and types into the map as not needing
+ // conversion (i.e. null).
+ conversionMap.put(
+ new ImmutablePair(dataColumnsString, dataTypeInfoList), null);
+
+ isFirst = false;
+ }
+
+ ImmutablePair columnNamesAndTypesCombination =
+ new ImmutablePair(nextDataColumnsString, nextDataTypeInfoList);
+
+ boolean[] conversionFlags;
+ if (conversionMap.containsKey(columnNamesAndTypesCombination)) {
+
+ conversionFlags = conversionMap.get(columnNamesAndTypesCombination);
+
+ } else {
+
+ List<String> nextDataColumnList = Arrays.asList(nextDataColumns);
+
+ // Validate the column names that are present are the same. Missing columns will be
+ // implicitly defaulted to null.
+
+ if (nextDataColumnList.size() > dataColumnList.size()) {
+ LOG.info(
+ String.format("Could not vectorize partition %s. The partition column names %d is greater than the number of table columns %d",
+ path, nextDataColumnList.size(), dataColumnList.size()));
+ return false;
+ }
+ for (int i = 0; i < nextDataColumnList.size(); i++) {
+ String nextColumnName = nextDataColumnList.get(i);
+ String tableColumnName = dataColumnList.get(i);
+ if (!nextColumnName.equals(tableColumnName)) {
+ LOG.info(
+ String.format("Could not vectorize partition %s. The partition column name %s is does not match table column name %s",
+ path, nextColumnName, tableColumnName));
+ return false;
+ }
+ }
+
+ // The table column types might have been changed with ALTER. There are restrictions
+ // here for vectorization.
+
+ // Some readers / deserializers take responsibility for conversion themselves.
+
+ // If we need to check for conversion, the conversion object may come back null
+ // indicating from a vectorization point of view the conversion is implicit. That is,
+ // all implicit integer upgrades.
+
+ if (vectorPartDesc.getNeedsDataTypeConversionCheck() &&
+ !nextDataTypeInfoList.equals(dataTypeInfoList)) {
+
+ // The results will be in 2 members: validConversion and conversionFlags
+ partitionConversion.validateConversion(nextDataTypeInfoList, dataTypeInfoList);
+ if (!partitionConversion.getValidConversion()) {
+ return false;
+ }
+ conversionFlags = partitionConversion.getResultConversionFlags();
+ } else {
+ conversionFlags = null;
+ }
+
+ // We enter this in our map so we don't have to check again for subsequent partitions.
+
+ conversionMap.put(columnNamesAndTypesCombination, conversionFlags);
+ }
+
+ vectorPartDesc.setConversionFlags(conversionFlags);
+
+ vectorPartDesc.setTypeInfos(nextDataTypeInfoList);
+ }
+
+ vectorTaskColumnInfo.setColumnNames(allColumnNameList);
+ vectorTaskColumnInfo.setTypeInfos(allTypeInfoList);
+ vectorTaskColumnInfo.setPartitionColumnCount(partitionColumnCount);
+
+ return true;
+ }
+
+ private boolean validateMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez)
+ throws SemanticException {
+
+ LOG.info("Validating MapWork...");
+
+ ImmutablePair<String,TableScanOperator> pair = verifyOnlyOneTableScanOperator(mapWork);
+ if (pair == null) {
+ return false;
+ }
+ String alias = pair.left;
+ TableScanOperator tableScanOperator = pair.right;
+
+ // This call fills in the column names, types, and partition column count in
+ // vectorTaskColumnInfo.
+ if (!validateInputFormatAndSchemaEvolution(mapWork, alias, tableScanOperator, vectorTaskColumnInfo)) {
+ return false;
}
+
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(mapWork, isTez);
addMapWorkRules(opRules, vnp);
@@ -427,11 +695,14 @@ public class Vectorizer implements PhysicalPlanResolver {
return true;
}
- private void vectorizeMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+ private void vectorizeMapWork(MapWork mapWork, VectorTaskColumnInfo vectorTaskColumnInfo,
+ boolean isTez) throws SemanticException {
+
LOG.info("Vectorizing MapWork...");
mapWork.setVectorMode(true);
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- MapWorkVectorizationNodeProcessor vnp = new MapWorkVectorizationNodeProcessor(mapWork, isTez);
+ MapWorkVectorizationNodeProcessor vnp =
+ new MapWorkVectorizationNodeProcessor(mapWork, isTez, vectorTaskColumnInfo);
addMapWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new PreOrderWalker(disp);
@@ -441,9 +712,9 @@ public class Vectorizer implements PhysicalPlanResolver {
HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
ogw.startWalking(topNodes, nodeOutput);
- mapWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
- mapWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
- mapWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
+ vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames());
+
+ vectorTaskColumnInfo.transferToBaseWork(mapWork);
if (LOG.isDebugEnabled()) {
debugDisplayAllMaps(mapWork);
@@ -453,13 +724,19 @@ public class Vectorizer implements PhysicalPlanResolver {
}
private void convertReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
- boolean ret = validateReduceWork(reduceWork);
+ VectorTaskColumnInfo vectorTaskColumnInfo = new VectorTaskColumnInfo();
+ boolean ret = validateReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
if (ret) {
- vectorizeReduceWork(reduceWork, isTez);
+ vectorizeReduceWork(reduceWork, vectorTaskColumnInfo, isTez);
}
}
- private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws SemanticException {
+ private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork,
+ VectorTaskColumnInfo vectorTaskColumnInfo) throws SemanticException {
+
+ ArrayList<String> reduceColumnNames = new ArrayList<String>();
+ ArrayList<TypeInfo> reduceTypeInfos = new ArrayList<TypeInfo>();
+
try {
// Check key ObjectInspector.
ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector();
@@ -483,9 +760,6 @@ public class Vectorizer implements PhysicalPlanResolver {
StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs();
- reduceColumnNames = new ArrayList<String>();
- reduceTypeInfos = new ArrayList<TypeInfo>();
-
for (StructField field: keyFields) {
reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
@@ -497,6 +771,10 @@ public class Vectorizer implements PhysicalPlanResolver {
} catch (Exception e) {
throw new SemanticException(e);
}
+
+ vectorTaskColumnInfo.setColumnNames(reduceColumnNames);
+ vectorTaskColumnInfo.setTypeInfos(reduceTypeInfos);
+
return true;
}
@@ -505,11 +783,13 @@ public class Vectorizer implements PhysicalPlanResolver {
opRules.put(new RuleRegExp("R2", SelectOperator.getOperatorName() + ".*"), np);
}
- private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException {
+ private boolean validateReduceWork(ReduceWork reduceWork,
+ VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException {
+
LOG.info("Validating ReduceWork...");
// Validate input to ReduceWork.
- if (!getOnlyStructObjectInspectors(reduceWork)) {
+ if (!getOnlyStructObjectInspectors(reduceWork, vectorTaskColumnInfo)) {
return false;
}
// Now check the reduce operator tree.
@@ -533,7 +813,9 @@ public class Vectorizer implements PhysicalPlanResolver {
return true;
}
- private void vectorizeReduceWork(ReduceWork reduceWork, boolean isTez) throws SemanticException {
+ private void vectorizeReduceWork(ReduceWork reduceWork,
+ VectorTaskColumnInfo vectorTaskColumnInfo, boolean isTez) throws SemanticException {
+
LOG.info("Vectorizing ReduceWork...");
reduceWork.setVectorMode(true);
@@ -542,7 +824,7 @@ public class Vectorizer implements PhysicalPlanResolver {
// VectorizationContext... Do we use PreOrderWalker instead of DefaultGraphWalker.
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
ReduceWorkVectorizationNodeProcessor vnp =
- new ReduceWorkVectorizationNodeProcessor(reduceColumnNames, reduceTypeInfos, isTez);
+ new ReduceWorkVectorizationNodeProcessor(vectorTaskColumnInfo, isTez);
addReduceWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new PreOrderWalker(disp);
@@ -557,9 +839,9 @@ public class Vectorizer implements PhysicalPlanResolver {
// Necessary since we are vectorizing the root operator in reduce.
reduceWork.setReducer(vnp.getRootVectorOp());
- reduceWork.setVectorColumnNameMap(vnp.getVectorColumnNameMap());
- reduceWork.setVectorColumnTypeMap(vnp.getVectorColumnTypeMap());
- reduceWork.setVectorScratchColumnTypeMap(vnp.getVectorScratchColumnTypeMap());
+ vectorTaskColumnInfo.setScratchTypeNameArray(vnp.getVectorScratchColumnTypeNames());
+
+ vectorTaskColumnInfo.transferToBaseWork(reduceWork);
if (LOG.isDebugEnabled()) {
debugDisplayAllMaps(reduceWork);
@@ -627,23 +909,11 @@ public class Vectorizer implements PhysicalPlanResolver {
// The vectorization context for the Map or Reduce task.
protected VectorizationContext taskVectorizationContext;
- // The input projection column type name map for the Map or Reduce task.
- protected Map<Integer, String> taskColumnTypeNameMap;
-
VectorizationNodeProcessor() {
- taskColumnTypeNameMap = new HashMap<Integer, String>();
- }
-
- public Map<String, Integer> getVectorColumnNameMap() {
- return taskVectorizationContext.getProjectionColumnMap();
}
- public Map<Integer, String> getVectorColumnTypeMap() {
- return taskColumnTypeNameMap;
- }
-
- public Map<Integer, String> getVectorScratchColumnTypeMap() {
- return taskVectorizationContext.getScratchColumnTypeMap();
+ public String[] getVectorScratchColumnTypeNames() {
+ return taskVectorizationContext.getScratchColumnTypeNames();
}
protected final Set<Operator<? extends OperatorDesc>> opsDone =
@@ -713,11 +983,14 @@ public class Vectorizer implements PhysicalPlanResolver {
class MapWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
private final MapWork mWork;
+ private VectorTaskColumnInfo vectorTaskColumnInfo;
private final boolean isTez;
- public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez) {
+ public MapWorkVectorizationNodeProcessor(MapWork mWork, boolean isTez,
+ VectorTaskColumnInfo vectorTaskColumnInfo) {
super();
this.mWork = mWork;
+ this.vectorTaskColumnInfo = vectorTaskColumnInfo;
this.isTez = isTez;
}
@@ -731,8 +1004,7 @@ public class Vectorizer implements PhysicalPlanResolver {
if (op instanceof TableScanOperator) {
if (taskVectorizationContext == null) {
- taskVectorizationContext = getVectorizationContext(op.getSchema(), op.getName(),
- taskColumnTypeNameMap);
+ taskVectorizationContext = getVectorizationContext(op.getName(), vectorTaskColumnInfo);
}
vContext = taskVectorizationContext;
} else {
@@ -773,8 +1045,7 @@ public class Vectorizer implements PhysicalPlanResolver {
class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
- private final List<String> reduceColumnNames;
- private final List<TypeInfo> reduceTypeInfos;
+ private VectorTaskColumnInfo vectorTaskColumnInfo;
private boolean isTez;
@@ -784,11 +1055,11 @@ public class Vectorizer implements PhysicalPlanResolver {
return rootVectorOp;
}
- public ReduceWorkVectorizationNodeProcessor(List<String> reduceColumnNames,
- List<TypeInfo> reduceTypeInfos, boolean isTez) {
+ public ReduceWorkVectorizationNodeProcessor(VectorTaskColumnInfo vectorTaskColumnInfo,
+ boolean isTez) {
+
super();
- this.reduceColumnNames = reduceColumnNames;
- this.reduceTypeInfos = reduceTypeInfos;
+ this.vectorTaskColumnInfo = vectorTaskColumnInfo;
rootVectorOp = null;
this.isTez = isTez;
}
@@ -804,15 +1075,11 @@ public class Vectorizer implements PhysicalPlanResolver {
boolean saveRootVectorOp = false;
if (op.getParentOperators().size() == 0) {
- LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + reduceColumnNames.toString());
+ LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + vectorTaskColumnInfo.columnNames.toString());
- vContext = new VectorizationContext("__Reduce_Shuffle__", reduceColumnNames);
+ vContext = new VectorizationContext("__Reduce_Shuffle__", vectorTaskColumnInfo.columnNames);
taskVectorizationContext = vContext;
- int i = 0;
- for (TypeInfo typeInfo : reduceTypeInfos) {
- taskColumnTypeNameMap.put(i, typeInfo.getTypeName());
- i++;
- }
+
saveRootVectorOp = true;
if (LOG.isDebugEnabled()) {
@@ -881,7 +1148,7 @@ public class Vectorizer implements PhysicalPlanResolver {
@Override
public PhysicalContext resolve(PhysicalContext physicalContext) throws SemanticException {
- this.physicalContext = physicalContext;
+
hiveConf = physicalContext.getConf();
boolean vectorPath = HiveConf.getBoolVar(hiveConf,
@@ -1022,65 +1289,6 @@ public class Vectorizer implements PhysicalPlanResolver {
return false;
}
- String columns = "";
- String types = "";
- String partitionColumns = "";
- String partitionTypes = "";
- boolean haveInfo = false;
-
- // This over-reaches slightly, since we can have > 1 table-scan per map-work.
- // It needs path to partition, path to alias, then check the alias == the same table-scan, to be accurate.
- // That said, that is a TODO item to be fixed when we support >1 TableScans per vectorized pipeline later.
- LinkedHashMap<String, PartitionDesc> partitionDescs = mWork.getPathToPartitionInfo();
-
- // For vectorization, compare each partition information for against the others.
- // We assume the table information will be from one of the partitions, so it will
- // work to focus on the partition information and not compare against the TableScanOperator
- // columns (in the VectorizationContext)....
- for (Map.Entry<String, PartitionDesc> entry : partitionDescs.entrySet()) {
- PartitionDesc partDesc = entry.getValue();
- if (partDesc.getPartSpec() == null || partDesc.getPartSpec().isEmpty()) {
- // No partition information -- we match because we would default to using the table description.
- continue;
- }
- Properties partProps = partDesc.getProperties();
- if (!haveInfo) {
- columns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
- types = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
- partitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
- partitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- haveInfo = true;
- } else {
- String nextColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMNS);
- String nextTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_COLUMN_TYPES);
- String nextPartitionColumns = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
- String nextPartitionTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- if (!columns.equalsIgnoreCase(nextColumns)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its column names %s do not match the other column names %s",
- entry.getKey(), nextColumns, columns));
- return false;
- }
- if (!types.equalsIgnoreCase(nextTypes)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its column types %s do not match the other column types %s",
- entry.getKey(), nextTypes, types));
- return false;
- }
- if (!partitionColumns.equalsIgnoreCase(nextPartitionColumns)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its partition column names %s do not match the other partition column names %s",
- entry.getKey(), nextPartitionColumns, partitionColumns));
- return false;
- }
- if (!partitionTypes.equalsIgnoreCase(nextPartitionTypes)) {
- LOG.info(
- String.format("Could not vectorize partition %s. Its partition column types %s do not match the other partition column types %s",
- entry.getKey(), nextPartitionTypes, partitionTypes));
- return false;
- }
- }
- }
return true;
}
@@ -1412,23 +1620,10 @@ public class Vectorizer implements PhysicalPlanResolver {
return result;
}
- private VectorizationContext getVectorizationContext(RowSchema rowSchema, String contextName,
- Map<Integer, String> typeNameMap) {
+ private VectorizationContext getVectorizationContext(String contextName,
+ VectorTaskColumnInfo vectorTaskColumnInfo) {
- VectorizationContext vContext = new VectorizationContext(contextName);
-
- // Add all non-virtual columns to make a vectorization context for
- // the TableScan operator.
- int i = 0;
- for (ColumnInfo c : rowSchema.getSignature()) {
- // Earlier, validation code should have eliminated virtual columns usage (HIVE-5560).
- if (!isVirtualColumn(c)) {
- vContext.addInitialColumn(c.getInternalName());
- typeNameMap.put(i, c.getTypeName());
- i++;
- }
- }
- vContext.finishedAddingInitialColumns();
+ VectorizationContext vContext = new VectorizationContext(contextName, vectorTaskColumnInfo.columnNames);
return vContext;
}
@@ -1785,12 +1980,16 @@ public class Vectorizer implements PhysicalPlanResolver {
public void debugDisplayAllMaps(BaseWork work) {
- Map<String, Integer> columnNameMap = work.getVectorColumnNameMap();
- Map<Integer, String> columnTypeMap = work.getVectorColumnTypeMap();
- Map<Integer, String> scratchColumnTypeMap = work.getVectorScratchColumnTypeMap();
+ VectorizedRowBatchCtx vectorizedRowBatchCtx = work.getVectorizedRowBatchCtx();
+
+ String[] columnNames = vectorizedRowBatchCtx.getRowColumnNames();
+ Object columnTypeInfos = vectorizedRowBatchCtx.getRowColumnTypeInfos();
+ int partitionColumnCount = vectorizedRowBatchCtx.getPartitionColumnCount();
+ String[] scratchColumnTypeNames =vectorizedRowBatchCtx.getScratchColumnTypeNames();
- LOG.debug("debugDisplayAllMaps columnNameMap " + columnNameMap.toString());
- LOG.debug("debugDisplayAllMaps columnTypeMap " + columnTypeMap.toString());
- LOG.debug("debugDisplayAllMaps scratchColumnTypeMap " + scratchColumnTypeMap.toString());
+ LOG.debug("debugDisplayAllMaps columnNames " + Arrays.toString(columnNames));
+ LOG.debug("debugDisplayAllMaps columnTypeInfos " + Arrays.deepToString((Object[]) columnTypeInfos));
+ LOG.debug("debugDisplayAllMaps partitionColumnCount " + partitionColumnCount);
+ LOG.debug("debugDisplayAllMaps scratchColumnTypeNames " + Arrays.toString(scratchColumnTypeNames));
}
}