You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mm...@apache.org on 2015/11/18 15:52:17 UTC
[15/17] hive git commit: HIVE-11981: ORC Schema Evolution Issues
(Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
index b654b64..aabea0b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java
@@ -17,11 +17,29 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+
+import com.google.common.collect.Lists;
public class OrcUtils {
private static final Logger LOG = LoggerFactory.getLogger(OrcUtils.class);
@@ -79,4 +97,533 @@ public class OrcUtils {
}
return null;
}
+
+ /**
+ * Convert a Hive type property string that contains separated type names into a list of
+ * TypeDescription objects.
+ * @return the list of TypeDescription objects.
+ */
+ public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty(
+ String hiveTypeProperty) {
+
+ // CONSDIER: We need a type name parser for TypeDescription.
+
+ ArrayList<TypeInfo> typeInfoList = TypeInfoUtils.getTypeInfosFromTypeString(hiveTypeProperty);
+ ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size());
+ for (TypeInfo typeInfo : typeInfoList) {
+ typeDescrList.add(convertTypeInfo(typeInfo));
+ }
+ return typeDescrList;
+ }
+
+ public static TypeDescription convertTypeInfo(TypeInfo info) {
+ switch (info.getCategory()) {
+ case PRIMITIVE: {
+ PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info;
+ switch (pinfo.getPrimitiveCategory()) {
+ case BOOLEAN:
+ return TypeDescription.createBoolean();
+ case BYTE:
+ return TypeDescription.createByte();
+ case SHORT:
+ return TypeDescription.createShort();
+ case INT:
+ return TypeDescription.createInt();
+ case LONG:
+ return TypeDescription.createLong();
+ case FLOAT:
+ return TypeDescription.createFloat();
+ case DOUBLE:
+ return TypeDescription.createDouble();
+ case STRING:
+ return TypeDescription.createString();
+ case DATE:
+ return TypeDescription.createDate();
+ case TIMESTAMP:
+ return TypeDescription.createTimestamp();
+ case BINARY:
+ return TypeDescription.createBinary();
+ case DECIMAL: {
+ DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo;
+ return TypeDescription.createDecimal()
+ .withScale(dinfo.getScale())
+ .withPrecision(dinfo.getPrecision());
+ }
+ case VARCHAR: {
+ BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+ return TypeDescription.createVarchar()
+ .withMaxLength(cinfo.getLength());
+ }
+ case CHAR: {
+ BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
+ return TypeDescription.createChar()
+ .withMaxLength(cinfo.getLength());
+ }
+ default:
+ throw new IllegalArgumentException("ORC doesn't handle primitive" +
+ " category " + pinfo.getPrimitiveCategory());
+ }
+ }
+ case LIST: {
+ ListTypeInfo linfo = (ListTypeInfo) info;
+ return TypeDescription.createList
+ (convertTypeInfo(linfo.getListElementTypeInfo()));
+ }
+ case MAP: {
+ MapTypeInfo minfo = (MapTypeInfo) info;
+ return TypeDescription.createMap
+ (convertTypeInfo(minfo.getMapKeyTypeInfo()),
+ convertTypeInfo(minfo.getMapValueTypeInfo()));
+ }
+ case UNION: {
+ UnionTypeInfo minfo = (UnionTypeInfo) info;
+ TypeDescription result = TypeDescription.createUnion();
+ for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) {
+ result.addUnionChild(convertTypeInfo(child));
+ }
+ return result;
+ }
+ case STRUCT: {
+ StructTypeInfo sinfo = (StructTypeInfo) info;
+ TypeDescription result = TypeDescription.createStruct();
+ for(String fieldName: sinfo.getAllStructFieldNames()) {
+ result.addField(fieldName,
+ convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName)));
+ }
+ return result;
+ }
+ default:
+ throw new IllegalArgumentException("ORC doesn't handle " +
+ info.getCategory());
+ }
+ }
+
+ public static List<OrcProto.Type> getOrcTypes(TypeDescription typeDescr) {
+ List<OrcProto.Type> result = Lists.newArrayList();
+ appendOrcTypes(result, typeDescr);
+ return result;
+ }
+
+ private static void appendOrcTypes(List<OrcProto.Type> result, TypeDescription typeDescr) {
+ OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+ List<TypeDescription> children = typeDescr.getChildren();
+ switch (typeDescr.getCategory()) {
+ case BOOLEAN:
+ type.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ type.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ type.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ type.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ type.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ type.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ type.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ type.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case CHAR:
+ type.setKind(OrcProto.Type.Kind.CHAR);
+ type.setMaximumLength(typeDescr.getMaxLength());
+ break;
+ case VARCHAR:
+ type.setKind(Type.Kind.VARCHAR);
+ type.setMaximumLength(typeDescr.getMaxLength());
+ break;
+ case BINARY:
+ type.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ case DATE:
+ type.setKind(OrcProto.Type.Kind.DATE);
+ break;
+ case DECIMAL:
+ type.setKind(OrcProto.Type.Kind.DECIMAL);
+ type.setPrecision(typeDescr.getPrecision());
+ type.setScale(typeDescr.getScale());
+ break;
+ case LIST:
+ type.setKind(OrcProto.Type.Kind.LIST);
+ type.addSubtypes(children.get(0).getId());
+ break;
+ case MAP:
+ type.setKind(OrcProto.Type.Kind.MAP);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ break;
+ case STRUCT:
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ for(String field: typeDescr.getFieldNames()) {
+ type.addFieldNames(field);
+ }
+ break;
+ case UNION:
+ type.setKind(OrcProto.Type.Kind.UNION);
+ for(TypeDescription t: children) {
+ type.addSubtypes(t.getId());
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown category: " +
+ typeDescr.getCategory());
+ }
+ result.add(type.build());
+ if (children != null) {
+ for(TypeDescription child: children) {
+ appendOrcTypes(result, child);
+ }
+ }
+ }
+
+ /**
+ * NOTE: This method ignores the subtype numbers in the TypeDescription rebuilds the subtype
+ * numbers based on the length of the result list being appended.
+ *
+ * @param result
+ * @param typeInfo
+ */
+ public static void appendOrcTypesRebuildSubtypes(List<OrcProto.Type> result,
+ TypeDescription typeDescr) {
+
+ int subtype = result.size();
+ OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+ boolean needsAdd = true;
+ List<TypeDescription> children = typeDescr.getChildren();
+ switch (typeDescr.getCategory()) {
+ case BOOLEAN:
+ type.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ type.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ type.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ type.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ type.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ type.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ type.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ type.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case CHAR:
+ type.setKind(OrcProto.Type.Kind.CHAR);
+ type.setMaximumLength(typeDescr.getMaxLength());
+ break;
+ case VARCHAR:
+ type.setKind(Type.Kind.VARCHAR);
+ type.setMaximumLength(typeDescr.getMaxLength());
+ break;
+ case BINARY:
+ type.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ type.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ case DATE:
+ type.setKind(OrcProto.Type.Kind.DATE);
+ break;
+ case DECIMAL:
+ type.setKind(OrcProto.Type.Kind.DECIMAL);
+ type.setPrecision(typeDescr.getPrecision());
+ type.setScale(typeDescr.getScale());
+ break;
+ case LIST:
+ type.setKind(OrcProto.Type.Kind.LIST);
+ type.addSubtypes(++subtype);
+ result.add(type.build());
+ needsAdd = false;
+ appendOrcTypesRebuildSubtypes(result, children.get(0));
+ break;
+ case MAP:
+ {
+ // Make room for MAP type.
+ result.add(null);
+
+ // Add MAP type pair in order to determine their subtype values.
+ appendOrcTypesRebuildSubtypes(result, children.get(0));
+ int subtype2 = result.size();
+ appendOrcTypesRebuildSubtypes(result, children.get(1));
+ type.setKind(OrcProto.Type.Kind.MAP);
+ type.addSubtypes(subtype + 1);
+ type.addSubtypes(subtype2);
+ result.set(subtype, type.build());
+ needsAdd = false;
+ }
+ break;
+ case STRUCT:
+ {
+ List<String> fieldNames = typeDescr.getFieldNames();
+
+ // Make room for STRUCT type.
+ result.add(null);
+
+ List<Integer> fieldSubtypes = new ArrayList<Integer>(fieldNames.size());
+ for(TypeDescription child: children) {
+ int fieldSubtype = result.size();
+ fieldSubtypes.add(fieldSubtype);
+ appendOrcTypesRebuildSubtypes(result, child);
+ }
+
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+
+ for (int i = 0 ; i < fieldNames.size(); i++) {
+ type.addSubtypes(fieldSubtypes.get(i));
+ type.addFieldNames(fieldNames.get(i));
+ }
+ result.set(subtype, type.build());
+ needsAdd = false;
+ }
+ break;
+ case UNION:
+ {
+ // Make room for UNION type.
+ result.add(null);
+
+ List<Integer> unionSubtypes = new ArrayList<Integer>(children.size());
+ for(TypeDescription child: children) {
+ int unionSubtype = result.size();
+ unionSubtypes.add(unionSubtype);
+ appendOrcTypesRebuildSubtypes(result, child);
+ }
+
+ type.setKind(OrcProto.Type.Kind.UNION);
+ for (int i = 0 ; i < children.size(); i++) {
+ type.addSubtypes(unionSubtypes.get(i));
+ }
+ result.set(subtype, type.build());
+ needsAdd = false;
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown category: " + typeDescr.getCategory());
+ }
+ if (needsAdd) {
+ result.add(type.build());
+ }
+ }
+
+ /**
+ * NOTE: This method ignores the subtype numbers in the OrcProto.Type rebuilds the subtype
+ * numbers based on the length of the result list being appended.
+ *
+ * @param result
+ * @param typeInfo
+ */
+ public static int appendOrcTypesRebuildSubtypes(List<OrcProto.Type> result,
+ List<OrcProto.Type> types, int columnId) {
+
+ OrcProto.Type oldType = types.get(columnId++);
+
+ int subtype = result.size();
+ OrcProto.Type.Builder builder = OrcProto.Type.newBuilder();
+ boolean needsAdd = true;
+ switch (oldType.getKind()) {
+ case BOOLEAN:
+ builder.setKind(OrcProto.Type.Kind.BOOLEAN);
+ break;
+ case BYTE:
+ builder.setKind(OrcProto.Type.Kind.BYTE);
+ break;
+ case SHORT:
+ builder.setKind(OrcProto.Type.Kind.SHORT);
+ break;
+ case INT:
+ builder.setKind(OrcProto.Type.Kind.INT);
+ break;
+ case LONG:
+ builder.setKind(OrcProto.Type.Kind.LONG);
+ break;
+ case FLOAT:
+ builder.setKind(OrcProto.Type.Kind.FLOAT);
+ break;
+ case DOUBLE:
+ builder.setKind(OrcProto.Type.Kind.DOUBLE);
+ break;
+ case STRING:
+ builder.setKind(OrcProto.Type.Kind.STRING);
+ break;
+ case CHAR:
+ builder.setKind(OrcProto.Type.Kind.CHAR);
+ builder.setMaximumLength(oldType.getMaximumLength());
+ break;
+ case VARCHAR:
+ builder.setKind(Type.Kind.VARCHAR);
+ builder.setMaximumLength(oldType.getMaximumLength());
+ break;
+ case BINARY:
+ builder.setKind(OrcProto.Type.Kind.BINARY);
+ break;
+ case TIMESTAMP:
+ builder.setKind(OrcProto.Type.Kind.TIMESTAMP);
+ break;
+ case DATE:
+ builder.setKind(OrcProto.Type.Kind.DATE);
+ break;
+ case DECIMAL:
+ builder.setKind(OrcProto.Type.Kind.DECIMAL);
+ builder.setPrecision(oldType.getPrecision());
+ builder.setScale(oldType.getScale());
+ break;
+ case LIST:
+ builder.setKind(OrcProto.Type.Kind.LIST);
+ builder.addSubtypes(++subtype);
+ result.add(builder.build());
+ needsAdd = false;
+ columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+ break;
+ case MAP:
+ {
+ // Make room for MAP type.
+ result.add(null);
+
+ // Add MAP type pair in order to determine their subtype values.
+ columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+ int subtype2 = result.size();
+ columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+ builder.setKind(OrcProto.Type.Kind.MAP);
+ builder.addSubtypes(subtype + 1);
+ builder.addSubtypes(subtype2);
+ result.set(subtype, builder.build());
+ needsAdd = false;
+ }
+ break;
+ case STRUCT:
+ {
+ List<String> fieldNames = oldType.getFieldNamesList();
+
+ // Make room for STRUCT type.
+ result.add(null);
+
+ List<Integer> fieldSubtypes = new ArrayList<Integer>(fieldNames.size());
+ for(int i = 0 ; i < fieldNames.size(); i++) {
+ int fieldSubtype = result.size();
+ fieldSubtypes.add(fieldSubtype);
+ columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+ }
+
+ builder.setKind(OrcProto.Type.Kind.STRUCT);
+
+ for (int i = 0 ; i < fieldNames.size(); i++) {
+ builder.addSubtypes(fieldSubtypes.get(i));
+ builder.addFieldNames(fieldNames.get(i));
+ }
+ result.set(subtype, builder.build());
+ needsAdd = false;
+ }
+ break;
+ case UNION:
+ {
+ int subtypeCount = oldType.getSubtypesCount();
+
+ // Make room for UNION type.
+ result.add(null);
+
+ List<Integer> unionSubtypes = new ArrayList<Integer>(subtypeCount);
+ for(int i = 0 ; i < subtypeCount; i++) {
+ int unionSubtype = result.size();
+ unionSubtypes.add(unionSubtype);
+ columnId = appendOrcTypesRebuildSubtypes(result, types, columnId);
+ }
+
+ builder.setKind(OrcProto.Type.Kind.UNION);
+ for (int i = 0 ; i < subtypeCount; i++) {
+ builder.addSubtypes(unionSubtypes.get(i));
+ }
+ result.set(subtype, builder.build());
+ needsAdd = false;
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown category: " + oldType.getKind());
+ }
+ if (needsAdd) {
+ result.add(builder.build());
+ }
+ return columnId;
+ }
+
+ public static TypeDescription getDesiredRowTypeDescr(Configuration conf) {
+
+ String columnNameProperty = null;
+ String columnTypeProperty = null;
+
+ ArrayList<String> schemaEvolutionColumnNames = null;
+ ArrayList<TypeDescription> schemaEvolutionTypeDescrs = null;
+
+ boolean haveSchemaEvolutionProperties = false;
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) {
+
+ columnNameProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS);
+ columnTypeProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES);
+
+ haveSchemaEvolutionProperties =
+ (columnNameProperty != null && columnTypeProperty != null);
+
+ if (haveSchemaEvolutionProperties) {
+ schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(","));
+ if (schemaEvolutionColumnNames.size() == 0) {
+ haveSchemaEvolutionProperties = false;
+ } else {
+ schemaEvolutionTypeDescrs =
+ OrcUtils.typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+ if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+ haveSchemaEvolutionProperties = false;
+ }
+ }
+ }
+ }
+
+ if (!haveSchemaEvolutionProperties) {
+
+ // Try regular properties;
+ columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
+ columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+ if (columnTypeProperty == null || columnNameProperty == null) {
+ return null;
+ }
+
+ schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(","));
+ if (schemaEvolutionColumnNames.size() == 0) {
+ return null;
+ }
+ schemaEvolutionTypeDescrs =
+ OrcUtils.typeDescriptionsFromHiveTypeProperty(columnTypeProperty);
+ if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) {
+ return null;
+ }
+ }
+
+ // Desired schema does not include virtual columns or partition columns.
+ TypeDescription result = TypeDescription.createStruct();
+ for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) {
+ result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i));
+ }
+
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
index cf81782..750cf8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -145,6 +146,7 @@ public interface Reader {
private boolean[] include;
private long offset = 0;
private long length = Long.MAX_VALUE;
+ private TypeDescription schema;
private SearchArgument sarg = null;
private String[] columnNames = null;
private Boolean useZeroCopy = null;
@@ -173,6 +175,14 @@ public interface Reader {
}
/**
+ * Set the schema on read type description.
+ */
+ public Options schema(TypeDescription schema) {
+ this.schema = schema;
+ return this;
+ }
+
+ /**
* Set search argument for predicate push down.
* @param sarg the search argument
* @param columnNames the column names for
@@ -216,6 +226,10 @@ public interface Reader {
return length;
}
+ public TypeDescription getSchema() {
+ return schema;
+ }
+
public SearchArgument getSearchArgument() {
return sarg;
}
@@ -245,6 +259,7 @@ public interface Reader {
result.include = include;
result.offset = offset;
result.length = length;
+ result.schema = schema;
result.sarg = sarg;
result.columnNames = columnNames;
result.useZeroCopy = useZeroCopy;
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java
deleted file mode 100644
index 5e7d636..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde.serdeConstants;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
-
-import com.google.common.collect.Lists;
-
-/**
- * Factory to create ORC tree readers. It also compares file schema with schema specified on read
- * to see if type promotions are possible.
- */
-public class RecordReaderFactory {
- static final Logger LOG = LoggerFactory.getLogger(RecordReaderFactory.class);
- private static final boolean isLogInfoEnabled = LOG.isInfoEnabled();
-
- public static TreeReaderFactory.TreeReader createTreeReader(int colId,
- Configuration conf,
- List<OrcProto.Type> fileSchema,
- boolean[] included,
- boolean skipCorrupt) throws IOException {
- final boolean isAcid = checkAcidSchema(fileSchema);
- final List<OrcProto.Type> originalFileSchema;
- if (isAcid) {
- originalFileSchema = fileSchema.subList(fileSchema.get(0).getSubtypesCount(),
- fileSchema.size());
- } else {
- originalFileSchema = fileSchema;
- }
- final int numCols = originalFileSchema.get(0).getSubtypesCount();
- List<OrcProto.Type> schemaOnRead = getSchemaOnRead(numCols, conf);
- List<OrcProto.Type> schemaUsed = getMatchingSchema(fileSchema, schemaOnRead);
- if (schemaUsed == null) {
- return TreeReaderFactory.createTreeReader(colId, fileSchema, included, skipCorrupt);
- } else {
- return ConversionTreeReaderFactory.createTreeReader(colId, schemaUsed, included, skipCorrupt);
- }
- }
-
- static List<String> getAcidEventFields() {
- return Lists.newArrayList("operation", "originalTransaction", "bucket",
- "rowId", "currentTransaction", "row");
- }
-
- private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
- if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
- List<String> acidFields = getAcidEventFields();
- List<String> rootFields = fileSchema.get(0).getFieldNamesList();
- if (acidFields.equals(rootFields)) {
- return true;
- }
- }
- return false;
- }
-
- private static List<OrcProto.Type> getMatchingSchema(List<OrcProto.Type> fileSchema,
- List<OrcProto.Type> schemaOnRead) {
- if (schemaOnRead == null) {
- if (isLogInfoEnabled) {
- LOG.info("Schema is not specified on read. Using file schema.");
- }
- return null;
- }
-
- if (fileSchema.size() != schemaOnRead.size()) {
- if (isLogInfoEnabled) {
- LOG.info("Schema on read column count does not match file schema's column count." +
- " Falling back to using file schema.");
- }
- return null;
- } else {
- List<OrcProto.Type> result = Lists.newArrayList(fileSchema);
- // check type promotion. ORC can only support type promotions for integer types
- // short -> int -> bigint as same integer readers are used for the above types.
- boolean canPromoteType = false;
- for (int i = 0; i < fileSchema.size(); i++) {
- OrcProto.Type fColType = fileSchema.get(i);
- OrcProto.Type rColType = schemaOnRead.get(i);
- if (!fColType.getKind().equals(rColType.getKind())) {
-
- if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
-
- if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
- rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
- // type promotion possible, converting SHORT to INT/LONG requested type
- result.set(i, result.get(i).toBuilder().setKind(rColType.getKind()).build());
- canPromoteType = true;
- } else {
- canPromoteType = false;
- }
-
- } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
-
- if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
- // type promotion possible, converting INT to LONG requested type
- result.set(i, result.get(i).toBuilder().setKind(rColType.getKind()).build());
- canPromoteType = true;
- } else {
- canPromoteType = false;
- }
-
- } else {
- canPromoteType = false;
- }
- }
- }
-
- if (canPromoteType) {
- if (isLogInfoEnabled) {
- LOG.info("Integer type promotion happened in ORC record reader. Using promoted schema.");
- }
- return result;
- }
- }
-
- return null;
- }
-
- private static List<OrcProto.Type> getSchemaOnRead(int numCols, Configuration conf) {
- String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
- final String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
- if (columnTypeProperty == null || columnNameProperty == null) {
- return null;
- }
-
- ArrayList<String> columnNames = Lists.newArrayList(columnNameProperty.split(","));
- ArrayList<TypeInfo> fieldTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
- StructTypeInfo structTypeInfo = new StructTypeInfo();
- // Column types from conf includes virtual and partition columns at the end. We consider only
- // the actual columns in the file.
- structTypeInfo.setAllStructFieldNames(Lists.newArrayList(columnNames.subList(0, numCols)));
- structTypeInfo.setAllStructFieldTypeInfos(Lists.newArrayList(fieldTypes.subList(0, numCols)));
- ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(structTypeInfo);
- return getOrcTypes(oi);
- }
-
- private static List<OrcProto.Type> getOrcTypes(ObjectInspector inspector) {
- List<OrcProto.Type> result = Lists.newArrayList();
- getOrcTypesImpl(result, inspector);
- return result;
- }
-
- private static void getOrcTypesImpl(List<OrcProto.Type> result, ObjectInspector inspector) {
- OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
- switch (inspector.getCategory()) {
- case PRIMITIVE:
- switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) {
- case BOOLEAN:
- type.setKind(OrcProto.Type.Kind.BOOLEAN);
- break;
- case BYTE:
- type.setKind(OrcProto.Type.Kind.BYTE);
- break;
- case SHORT:
- type.setKind(OrcProto.Type.Kind.SHORT);
- break;
- case INT:
- type.setKind(OrcProto.Type.Kind.INT);
- break;
- case LONG:
- type.setKind(OrcProto.Type.Kind.LONG);
- break;
- case FLOAT:
- type.setKind(OrcProto.Type.Kind.FLOAT);
- break;
- case DOUBLE:
- type.setKind(OrcProto.Type.Kind.DOUBLE);
- break;
- case STRING:
- type.setKind(OrcProto.Type.Kind.STRING);
- break;
- case CHAR:
- // The char length needs to be written to file and should be available
- // from the object inspector
- CharTypeInfo charTypeInfo = (CharTypeInfo) ((PrimitiveObjectInspector) inspector)
- .getTypeInfo();
- type.setKind(OrcProto.Type.Kind.CHAR);
- type.setMaximumLength(charTypeInfo.getLength());
- break;
- case VARCHAR:
- // The varchar length needs to be written to file and should be available
- // from the object inspector
- VarcharTypeInfo typeInfo = (VarcharTypeInfo) ((PrimitiveObjectInspector) inspector)
- .getTypeInfo();
- type.setKind(OrcProto.Type.Kind.VARCHAR);
- type.setMaximumLength(typeInfo.getLength());
- break;
- case BINARY:
- type.setKind(OrcProto.Type.Kind.BINARY);
- break;
- case TIMESTAMP:
- type.setKind(OrcProto.Type.Kind.TIMESTAMP);
- break;
- case DATE:
- type.setKind(OrcProto.Type.Kind.DATE);
- break;
- case DECIMAL:
- DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) ((PrimitiveObjectInspector) inspector)
- .getTypeInfo();
- type.setKind(OrcProto.Type.Kind.DECIMAL);
- type.setPrecision(decTypeInfo.precision());
- type.setScale(decTypeInfo.scale());
- break;
- default:
- throw new IllegalArgumentException("Unknown primitive category: " +
- ((PrimitiveObjectInspector) inspector).getPrimitiveCategory());
- }
- result.add(type.build());
- break;
- case LIST:
- type.setKind(OrcProto.Type.Kind.LIST);
- result.add(type.build());
- getOrcTypesImpl(result, ((ListObjectInspector) inspector).getListElementObjectInspector());
- break;
- case MAP:
- type.setKind(OrcProto.Type.Kind.MAP);
- result.add(type.build());
- getOrcTypesImpl(result, ((MapObjectInspector) inspector).getMapKeyObjectInspector());
- getOrcTypesImpl(result, ((MapObjectInspector) inspector).getMapValueObjectInspector());
- break;
- case STRUCT:
- type.setKind(OrcProto.Type.Kind.STRUCT);
- result.add(type.build());
- for (StructField field : ((StructObjectInspector) inspector).getAllStructFieldRefs()) {
- getOrcTypesImpl(result, field.getFieldObjectInspector());
- }
- break;
- case UNION:
- type.setKind(OrcProto.Type.Kind.UNION);
- result.add(type.build());
- for (ObjectInspector oi : ((UnionObjectInspector) inspector).getObjectInspectors()) {
- getOrcTypesImpl(result, oi);
- }
- break;
- default:
- throw new IllegalArgumentException("Unknown category: " + inspector.getCategory());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 84d627a..7f550a4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -153,6 +155,18 @@ public class RecordReaderImpl implements RecordReader {
long strideRate,
Configuration conf
) throws IOException {
+
+ TreeReaderSchema treeReaderSchema;
+ if (options.getSchema() == null) {
+ treeReaderSchema = new TreeReaderSchema().fileTypes(types).schemaTypes(types);
+ } else {
+
+ // Now that we are creating a record reader for a file, validate that the schema to read
+ // is compatible with the file schema.
+ //
+ List<Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema());
+ treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes);
+ }
this.path = path;
this.codec = codec;
this.types = types;
@@ -197,7 +211,7 @@ public class RecordReaderImpl implements RecordReader {
skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf);
}
- reader = RecordReaderFactory.createTreeReader(0, conf, types, included, skipCorrupt);
+ reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt);
indexes = new OrcProto.RowIndex[types.size()];
bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
advanceToNextRow(reader, 0L, true);
@@ -1101,6 +1115,7 @@ public class RecordReaderImpl implements RecordReader {
} else {
result = (VectorizedRowBatch) previous;
result.selectedInUse = false;
+ reader.setVectorColumnCount(result.getDataColumnCount());
reader.nextVector(result.cols, (int) batchSize);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
new file mode 100644
index 0000000..9d00eb2
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
+import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema;
+
+/**
+ * Take the file types and the (optional) configuration column names/types and see if there
+ * has been schema evolution.
+ */
+public class SchemaEvolution {
+
+ private static final Log LOG = LogFactory.getLog(SchemaEvolution.class);
+
+ public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes,
+ List<OrcProto.Type> schemaTypes) throws IOException {
+
+ // For ACID, the row is the ROW field in the outer STRUCT.
+ final boolean isAcid = checkAcidSchema(fileTypes);
+ final List<OrcProto.Type> rowSchema;
+ int rowSubtype;
+ if (isAcid) {
+ rowSubtype = OrcRecordUpdater.ROW + 1;
+ rowSchema = fileTypes.subList(rowSubtype, fileTypes.size());
+ } else {
+ rowSubtype = 0;
+ rowSchema = fileTypes;
+ }
+
+ // Do checking on the overlap. Additional columns will be defaulted to NULL.
+
+ int numFileColumns = rowSchema.get(0).getSubtypesCount();
+ int numDesiredColumns = schemaTypes.get(0).getSubtypesCount();
+
+ int numReadColumns = Math.min(numFileColumns, numDesiredColumns);
+
+ /**
+ * Check type promotion.
+ *
+ * Currently, we only support integer type promotions that can be done "implicitly".
+ * That is, we know that using a bigger integer tree reader on the original smaller integer
+ * column will "just work".
+ *
+ * In the future, other type promotions might require type conversion.
+ */
+ // short -> int -> bigint as same integer readers are used for the above types.
+
+ for (int i = 0; i < numReadColumns; i++) {
+ OrcProto.Type fColType = fileTypes.get(rowSubtype + i);
+ OrcProto.Type rColType = schemaTypes.get(i);
+ if (!fColType.getKind().equals(rColType.getKind())) {
+
+ boolean ok = false;
+ if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) {
+
+ if (rColType.getKind().equals(OrcProto.Type.Kind.INT) ||
+ rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
+ // type promotion possible, converting SHORT to INT/LONG requested type
+ ok = true;
+ }
+ } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) {
+
+ if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) {
+ // type promotion possible, converting INT to LONG requested type
+ ok = true;
+ }
+ }
+
+ if (!ok) {
+ throw new IOException("ORC does not support type conversion from " +
+ fColType.getKind().name() + " to " + rColType.getKind().name());
+ }
+ }
+ }
+
+ List<Type> fullSchemaTypes;
+
+ if (isAcid) {
+ fullSchemaTypes = new ArrayList<OrcProto.Type>();
+
+ // This copies the ACID struct type which is subtype = 0.
+ // It has field names "operation" through "row".
+ // And we copy the types for all fields EXCEPT ROW (which must be last!).
+
+ for (int i = 0; i < rowSubtype; i++) {
+ fullSchemaTypes.add(fileTypes.get(i).toBuilder().build());
+ }
+
+ // Add the row struct type.
+ OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0);
+ } else {
+ fullSchemaTypes = schemaTypes;
+ }
+
+ int innerStructSubtype = rowSubtype;
+
+ // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() +
+ // " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString());
+
+ return new TreeReaderSchema().
+ fileTypes(fileTypes).
+ schemaTypes(fullSchemaTypes).
+ innerStructSubtype(innerStructSubtype);
+ }
+
+ private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) {
+ if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) {
+ List<String> rootFields = fileSchema.get(0).getFieldNamesList();
+ if (acidEventFieldNames.equals(rootFields)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * @param typeDescr
+ * @return ORC types for the ACID event based on the row's type description
+ */
+ public static List<Type> createEventSchema(TypeDescription typeDescr) {
+
+ List<Type> result = new ArrayList<Type>();
+
+ OrcProto.Type.Builder type = OrcProto.Type.newBuilder();
+ type.setKind(OrcProto.Type.Kind.STRUCT);
+ type.addAllFieldNames(acidEventFieldNames);
+ for (int i = 0; i < acidEventFieldNames.size(); i++) {
+ type.addSubtypes(i + 1);
+ }
+ result.add(type.build());
+
+ // Automatically add all fields except the last (ROW).
+ for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) {
+ type.clear();
+ type.setKind(acidEventOrcTypeKinds.get(i));
+ result.add(type.build());
+ }
+
+ OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr);
+ return result;
+ }
+
+ public static final List<String> acidEventFieldNames= new ArrayList<String>();
+ static {
+ acidEventFieldNames.add("operation");
+ acidEventFieldNames.add("originalTransaction");
+ acidEventFieldNames.add("bucket");
+ acidEventFieldNames.add("rowId");
+ acidEventFieldNames.add("currentTransaction");
+ acidEventFieldNames.add("row");
+ }
+ public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds =
+ new ArrayList<OrcProto.Type.Kind>();
+ static {
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG);
+ acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
index 4bcc621..8c13571 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
@@ -47,24 +47,80 @@ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.HadoopShims.TextReaderShim;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Factory for creating ORC tree readers.
*/
public class TreeReaderFactory {
+ public static final Logger LOG = LoggerFactory.getLogger(TreeReaderFactory.class);
+
+ public static class TreeReaderSchema {
+
+ /**
+ * The types in the ORC file.
+ */
+ List<OrcProto.Type> fileTypes;
+
+ /**
+ * The treeReaderSchema that the reader should read as.
+ */
+ List<OrcProto.Type> schemaTypes;
+
+ /**
+ * The subtype of the row STRUCT. Different than 0 for ACID.
+ */
+ int innerStructSubtype;
+
+ public TreeReaderSchema() {
+ fileTypes = null;
+ schemaTypes = null;
+ innerStructSubtype = -1;
+ }
+
+ public TreeReaderSchema fileTypes(List<OrcProto.Type> fileTypes) {
+ this.fileTypes = fileTypes;
+ return this;
+ }
+
+ public TreeReaderSchema schemaTypes(List<OrcProto.Type> schemaTypes) {
+ this.schemaTypes = schemaTypes;
+ return this;
+ }
+
+ public TreeReaderSchema innerStructSubtype(int innerStructSubtype) {
+ this.innerStructSubtype = innerStructSubtype;
+ return this;
+ }
+
+ public List<OrcProto.Type> getFileTypes() {
+ return fileTypes;
+ }
+
+ public List<OrcProto.Type> getSchemaTypes() {
+ return schemaTypes;
+ }
+
+ public int getInnerStructSubtype() {
+ return innerStructSubtype;
+ }
+ }
+
public abstract static class TreeReader {
protected final int columnId;
protected BitFieldReader present = null;
protected boolean valuePresent = false;
+ protected int vectorColumnCount;
TreeReader(int columnId) throws IOException {
this(columnId, null);
@@ -78,6 +134,11 @@ public class TreeReaderFactory {
} else {
present = new BitFieldReader(in, 1);
}
+ vectorColumnCount = -1;
+ }
+
+ void setVectorColumnCount(int vectorColumnCount) {
+ this.vectorColumnCount = vectorColumnCount;
}
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
@@ -1962,25 +2023,57 @@ public class TreeReaderFactory {
}
}
- public static class StructTreeReader extends TreeReader {
+ protected static class StructTreeReader extends TreeReader {
+ private final int fileColumnCount;
+ private final int resultColumnCount;
protected final TreeReader[] fields;
private final String[] fieldNames;
- protected StructTreeReader(int columnId,
- List<OrcProto.Type> types,
+ protected StructTreeReader(
+ int columnId,
+ TreeReaderSchema treeReaderSchema,
boolean[] included,
boolean skipCorrupt) throws IOException {
super(columnId);
- OrcProto.Type type = types.get(columnId);
- int fieldCount = type.getFieldNamesCount();
- this.fields = new TreeReader[fieldCount];
- this.fieldNames = new String[fieldCount];
- for (int i = 0; i < fieldCount; ++i) {
- int subtype = type.getSubtypes(i);
- if (included == null || included[subtype]) {
- this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt);
+
+ OrcProto.Type fileStructType = treeReaderSchema.getFileTypes().get(columnId);
+ fileColumnCount = fileStructType.getFieldNamesCount();
+
+ OrcProto.Type schemaStructType = treeReaderSchema.getSchemaTypes().get(columnId);
+
+ if (columnId == treeReaderSchema.getInnerStructSubtype()) {
+ // If there are more result columns than reader columns, we will default those additional
+ // columns to NULL.
+ resultColumnCount = schemaStructType.getFieldNamesCount();
+ } else {
+ resultColumnCount = fileColumnCount;
+ }
+
+ this.fields = new TreeReader[fileColumnCount];
+ this.fieldNames = new String[fileColumnCount];
+
+ if (included == null) {
+ for (int i = 0; i < fileColumnCount; ++i) {
+ int subtype = schemaStructType.getSubtypes(i);
+ this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
+ // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
+ this.fieldNames[i] = schemaStructType.getFieldNames(i);
+ }
+ } else {
+ for (int i = 0; i < fileColumnCount; ++i) {
+ int subtype = schemaStructType.getSubtypes(i);
+ if (subtype >= included.length) {
+ throw new IOException("subtype " + subtype + " exceeds the included array size " +
+ included.length + " fileTypes " + treeReaderSchema.getFileTypes().toString() +
+ " schemaTypes " + treeReaderSchema.getSchemaTypes().toString() +
+ " innerStructSubtype " + treeReaderSchema.getInnerStructSubtype());
+ }
+ if (included[subtype]) {
+ this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
+ }
+ // Use the treeReaderSchema evolution name since file/reader types may not have the real column name.
+ this.fieldNames[i] = schemaStructType.getFieldNames(i);
}
- this.fieldNames[i] = type.getFieldNames(i);
}
}
@@ -2000,22 +2093,28 @@ public class TreeReaderFactory {
OrcStruct result = null;
if (valuePresent) {
if (previous == null) {
- result = new OrcStruct(fields.length);
+ result = new OrcStruct(resultColumnCount);
} else {
result = (OrcStruct) previous;
// If the input format was initialized with a file with a
// different number of fields, the number of fields needs to
// be updated to the correct number
- if (result.getNumFields() != fields.length) {
- result.setNumFields(fields.length);
+ if (result.getNumFields() != resultColumnCount) {
+ result.setNumFields(resultColumnCount);
}
}
- for (int i = 0; i < fields.length; ++i) {
+ for (int i = 0; i < fileColumnCount; ++i) {
if (fields[i] != null) {
result.setFieldValue(i, fields[i].next(result.getFieldValue(i)));
}
}
+ if (resultColumnCount > fileColumnCount) {
+ for (int i = fileColumnCount; i < resultColumnCount; ++i) {
+ // Default new treeReaderSchema evolution fields to NULL.
+ result.setFieldValue(i, null);
+ }
+ }
}
return result;
}
@@ -2024,13 +2123,13 @@ public class TreeReaderFactory {
public Object nextVector(Object previousVector, long batchSize) throws IOException {
final ColumnVector[] result;
if (previousVector == null) {
- result = new ColumnVector[fields.length];
+ result = new ColumnVector[fileColumnCount];
} else {
result = (ColumnVector[]) previousVector;
}
// Read all the members of struct as column vectors
- for (int i = 0; i < fields.length; i++) {
+ for (int i = 0; i < fileColumnCount; i++) {
if (fields[i] != null) {
if (result[i] == null) {
result[i] = (ColumnVector) fields[i].nextVector(null, batchSize);
@@ -2039,6 +2138,19 @@ public class TreeReaderFactory {
}
}
}
+
+ // Default additional treeReaderSchema evolution fields to NULL.
+ if (vectorColumnCount != -1 && vectorColumnCount > fileColumnCount) {
+ for (int i = fileColumnCount; i < vectorColumnCount; ++i) {
+ ColumnVector colVector = result[i];
+ if (colVector != null) {
+ colVector.isRepeating = true;
+ colVector.noNulls = false;
+ colVector.isNull[0] = true;
+ }
+ }
+ }
+
return result;
}
@@ -2070,17 +2182,17 @@ public class TreeReaderFactory {
protected RunLengthByteReader tags;
protected UnionTreeReader(int columnId,
- List<OrcProto.Type> types,
+ TreeReaderSchema treeReaderSchema,
boolean[] included,
boolean skipCorrupt) throws IOException {
super(columnId);
- OrcProto.Type type = types.get(columnId);
+ OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
int fieldCount = type.getSubtypesCount();
this.fields = new TreeReader[fieldCount];
for (int i = 0; i < fieldCount; ++i) {
int subtype = type.getSubtypes(i);
if (included == null || included[subtype]) {
- this.fields[i] = createTreeReader(subtype, types, included, skipCorrupt);
+ this.fields[i] = createTreeReader(subtype, treeReaderSchema, included, skipCorrupt);
}
}
}
@@ -2150,12 +2262,12 @@ public class TreeReaderFactory {
protected IntegerReader lengths = null;
protected ListTreeReader(int columnId,
- List<OrcProto.Type> types,
+ TreeReaderSchema treeReaderSchema,
boolean[] included,
boolean skipCorrupt) throws IOException {
super(columnId);
- OrcProto.Type type = types.get(columnId);
- elementReader = createTreeReader(type.getSubtypes(0), types, included, skipCorrupt);
+ OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
+ elementReader = createTreeReader(type.getSubtypes(0), treeReaderSchema, included, skipCorrupt);
}
@Override
@@ -2240,20 +2352,20 @@ public class TreeReaderFactory {
protected IntegerReader lengths = null;
protected MapTreeReader(int columnId,
- List<OrcProto.Type> types,
+ TreeReaderSchema treeReaderSchema,
boolean[] included,
boolean skipCorrupt) throws IOException {
super(columnId);
- OrcProto.Type type = types.get(columnId);
+ OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
int keyColumn = type.getSubtypes(0);
int valueColumn = type.getSubtypes(1);
if (included == null || included[keyColumn]) {
- keyReader = createTreeReader(keyColumn, types, included, skipCorrupt);
+ keyReader = createTreeReader(keyColumn, treeReaderSchema, included, skipCorrupt);
} else {
keyReader = null;
}
if (included == null || included[valueColumn]) {
- valueReader = createTreeReader(valueColumn, types, included, skipCorrupt);
+ valueReader = createTreeReader(valueColumn, treeReaderSchema, included, skipCorrupt);
} else {
valueReader = null;
}
@@ -2333,11 +2445,11 @@ public class TreeReaderFactory {
}
public static TreeReader createTreeReader(int columnId,
- List<OrcProto.Type> types,
+ TreeReaderSchema treeReaderSchema,
boolean[] included,
boolean skipCorrupt
) throws IOException {
- OrcProto.Type type = types.get(columnId);
+ OrcProto.Type type = treeReaderSchema.getSchemaTypes().get(columnId);
switch (type.getKind()) {
case BOOLEAN:
return new BooleanTreeReader(columnId);
@@ -2377,13 +2489,13 @@ public class TreeReaderFactory {
int scale = type.hasScale() ? type.getScale() : HiveDecimal.SYSTEM_DEFAULT_SCALE;
return new DecimalTreeReader(columnId, precision, scale);
case STRUCT:
- return new StructTreeReader(columnId, types, included, skipCorrupt);
+ return new StructTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
case LIST:
- return new ListTreeReader(columnId, types, included, skipCorrupt);
+ return new ListTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
case MAP:
- return new MapTreeReader(columnId, types, included, skipCorrupt);
+ return new MapTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
case UNION:
- return new UnionTreeReader(columnId, types, included, skipCorrupt);
+ return new UnionTreeReader(columnId, treeReaderSchema, included, skipCorrupt);
default:
throw new IllegalArgumentException("Unsupported type " +
type.getKind());
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
index a8e5c2e..a2725b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowReader.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -46,31 +45,25 @@ class VectorizedOrcAcidRowReader
private final AcidInputFormat.RowReader<OrcStruct> innerReader;
private final RecordIdentifier key;
private final OrcStruct value;
- private final VectorizedRowBatchCtx rowBatchCtx;
+ private VectorizedRowBatchCtx rbCtx;
+ private Object[] partitionValues;
private final ObjectInspector objectInspector;
private final DataOutputBuffer buffer = new DataOutputBuffer();
VectorizedOrcAcidRowReader(AcidInputFormat.RowReader<OrcStruct> inner,
Configuration conf,
+ VectorizedRowBatchCtx vectorizedRowBatchCtx,
FileSplit split) throws IOException {
this.innerReader = inner;
this.key = inner.createKey();
- this.rowBatchCtx = new VectorizedRowBatchCtx();
+ rbCtx = vectorizedRowBatchCtx;
+ int partitionColumnCount = rbCtx.getPartitionColumnCount();
+ if (partitionColumnCount > 0) {
+ partitionValues = new Object[partitionColumnCount];
+ rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues);
+ }
this.value = inner.createValue();
this.objectInspector = inner.getObjectInspector();
- try {
- rowBatchCtx.init(conf, split);
- } catch (ClassNotFoundException e) {
- throw new IOException("Failed to initialize context", e);
- } catch (SerDeException e) {
- throw new IOException("Failed to initialize context", e);
- } catch (InstantiationException e) {
- throw new IOException("Failed to initialize context", e);
- } catch (IllegalAccessException e) {
- throw new IOException("Failed to initialize context", e);
- } catch (HiveException e) {
- throw new IOException("Failed to initialize context", e);
- }
}
@Override
@@ -82,23 +75,21 @@ class VectorizedOrcAcidRowReader
if (!innerReader.next(key, value)) {
return false;
}
- try {
- rowBatchCtx.addPartitionColsToBatch(vectorizedRowBatch);
- } catch (HiveException e) {
- throw new IOException("Problem adding partition column", e);
+ if (partitionValues != null) {
+ rbCtx.addPartitionColsToBatch(vectorizedRowBatch, partitionValues);
}
try {
VectorizedBatchUtil.acidAddRowToBatch(value,
(StructObjectInspector) objectInspector,
- vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer);
+ vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer);
while (vectorizedRowBatch.size < vectorizedRowBatch.selected.length &&
innerReader.next(key, value)) {
VectorizedBatchUtil.acidAddRowToBatch(value,
(StructObjectInspector) objectInspector,
- vectorizedRowBatch.size++, vectorizedRowBatch, rowBatchCtx, buffer);
+ vectorizedRowBatch.size++, vectorizedRowBatch, rbCtx, buffer);
}
- } catch (HiveException he) {
- throw new IOException("error iterating", he);
+ } catch (Exception e) {
+ throw new IOException("error iterating", e);
}
return true;
}
@@ -110,11 +101,7 @@ class VectorizedOrcAcidRowReader
@Override
public VectorizedRowBatch createValue() {
- try {
- return rowBatchCtx.createVectorizedRowBatch();
- } catch (HiveException e) {
- throw new RuntimeException("Error creating a batch", e);
- }
+ return rbCtx.createVectorizedRowBatch();
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
index bf09001..6d280c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -27,14 +26,12 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
@@ -47,7 +44,8 @@ import org.apache.hadoop.mapred.Reporter;
* A MapReduce/Hive input format for ORC files.
*/
public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
- implements InputFormatChecker, VectorizedInputFormatInterface {
+ implements InputFormatChecker, VectorizedInputFormatInterface,
+ SelfDescribingInputFormatInterface {
static class VectorizedOrcRecordReader
implements RecordReader<NullWritable, VectorizedRowBatch> {
@@ -56,12 +54,21 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
private final long length;
private float progress = 0.0f;
private VectorizedRowBatchCtx rbCtx;
+ private final boolean[] columnsToIncludeTruncated;
+ private final Object[] partitionValues;
private boolean addPartitionCols = true;
VectorizedOrcRecordReader(Reader file, Configuration conf,
FileSplit fileSplit) throws IOException {
+
+ /**
+ * Do we have schema on read in the configuration variables?
+ */
+ TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf);
+
List<OrcProto.Type> types = file.getTypes();
Reader.Options options = new Reader.Options();
+ options.schema(schema);
this.offset = fileSplit.getStart();
this.length = fileSplit.getLength();
options.range(offset, length);
@@ -69,11 +76,17 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
OrcInputFormat.setSearchArgument(options, types, conf, true);
this.reader = file.rowsOptions(options);
- try {
- rbCtx = new VectorizedRowBatchCtx();
- rbCtx.init(conf, fileSplit);
- } catch (Exception e) {
- throw new RuntimeException(e);
+
+ rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+
+ columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(conf);
+
+ int partitionColumnCount = rbCtx.getPartitionColumnCount();
+ if (partitionColumnCount > 0) {
+ partitionValues = new Object[partitionColumnCount];
+ rbCtx.getPartitionValues(rbCtx, conf, fileSplit, partitionValues);
+ } else {
+ partitionValues = null;
}
}
@@ -90,7 +103,9 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
// as this does not call CreateValue for each new RecordReader it creates, this check is
// required in next()
if (addPartitionCols) {
- rbCtx.addPartitionColsToBatch(value);
+ if (partitionValues != null) {
+ rbCtx.addPartitionColsToBatch(value, partitionValues);
+ }
addPartitionCols = false;
}
reader.nextBatch(value);
@@ -108,11 +123,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect
@Override
public VectorizedRowBatch createValue() {
- try {
- return rbCtx.createVectorizedRowBatch();
- } catch (HiveException e) {
- throw new RuntimeException("Error creating a batch", e);
- }
+ return rbCtx.createVectorizedRowBatch(columnsToIncludeTruncated);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
index b28d870..2072533 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java
@@ -14,8 +14,10 @@
package org.apache.hadoop.hive.ql.io.parquet;
import java.io.IOException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssign;
import org.apache.hadoop.hive.ql.exec.vector.VectorColumnAssignFactory;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
@@ -23,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
@@ -32,7 +35,6 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-
import org.apache.parquet.hadoop.ParquetInputFormat;
/**
@@ -52,6 +54,7 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable,
private final ParquetRecordReaderWrapper internalReader;
private VectorizedRowBatchCtx rbCtx;
+ private Object[] partitionValues;
private ArrayWritable internalValues;
private NullWritable internalKey;
private VectorColumnAssign[] assigners;
@@ -65,11 +68,11 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable,
split,
conf,
reporter);
- try {
- rbCtx = new VectorizedRowBatchCtx();
- rbCtx.init(conf, split);
- } catch (Exception e) {
- throw new RuntimeException(e);
+ rbCtx = Utilities.getVectorizedRowBatchCtx(conf);
+ int partitionColumnCount = rbCtx.getPartitionColumnCount();
+ if (partitionColumnCount > 0) {
+ partitionValues = new Object[partitionColumnCount];
+ rbCtx.getPartitionValues(rbCtx, conf, split, partitionValues);
}
}
@@ -81,13 +84,9 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable,
@Override
public VectorizedRowBatch createValue() {
- VectorizedRowBatch outputBatch = null;
- try {
- outputBatch = rbCtx.createVectorizedRowBatch();
- internalValues = internalReader.createValue();
- } catch (HiveException e) {
- throw new RuntimeException("Error creating a batch", e);
- }
+ VectorizedRowBatch outputBatch;
+ outputBatch = rbCtx.createVectorizedRowBatch();
+ internalValues = internalReader.createValue();
return outputBatch;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 05dfc4b..82514d4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -111,6 +111,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.InputFormat;
@@ -693,6 +695,11 @@ public final class GenMapRedUtils {
parseCtx.getGlobalLimitCtx().disableOpt();
}
+ if (topOp instanceof TableScanOperator) {
+ Utilities.addSchemaEvolutionToTableScanOperator(partsList.getSourceTable(),
+ (TableScanOperator) topOp);
+ }
+
Iterator<Path> iterPath = partDir.iterator();
Iterator<PartitionDesc> iterPartnDesc = partDesc.iterator();
@@ -754,6 +761,7 @@ public final class GenMapRedUtils {
* whether you need to add to map-reduce or local work
* @param tt_desc
* table descriptor
+ * @throws SerDeException
*/
public static void setTaskPlan(String path, String alias,
Operator<? extends OperatorDesc> topOp, MapWork plan, boolean local,
@@ -763,6 +771,16 @@ public final class GenMapRedUtils {
return;
}
+ if (topOp instanceof TableScanOperator) {
+ try {
+ Utilities.addSchemaEvolutionToTableScanOperator(
+ (StructObjectInspector) tt_desc.getDeserializer().getObjectInspector(),
+ (TableScanOperator) topOp);
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
if (!local) {
if (plan.getPathToAliases().get(path) == null) {
plan.getPathToAliases().put(path, new ArrayList<String>());
http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index 588f407..332e53b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -370,6 +370,7 @@ public class SimpleFetchOptimizer implements Transform {
private FetchWork convertToWork() throws HiveException {
inputs.clear();
+ Utilities.addSchemaEvolutionToTableScanOperator(table, scanOp);
TableDesc tableDesc = Utilities.getTableDesc(table);
if (!table.isPartitioned()) {
inputs.add(new ReadEntity(table, parent, !table.isView() && parent == null));