You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2015/11/18 23:41:05 UTC
[23/34] hive git commit: HIVE-11981: ORC Schema Evolution Issues
(Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
deleted file mode 100644
index 5ce7553..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.exec.vector;
-
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
-import java.util.List;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.ByteStream;
-import org.apache.hadoop.hive.serde2.SerDe;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.lazy.LazyDate;
-import org.apache.hadoop.hive.serde2.lazy.LazyLong;
-import org.apache.hadoop.hive.serde2.lazy.LazyTimestamp;
-import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-/**
- * VectorizedColumnarSerDe is used by Vectorized query execution engine
- * for columnar based storage supported by RCFile.
- */
-public class VectorizedColumnarSerDe extends ColumnarSerDe implements VectorizedSerde {
-
- public VectorizedColumnarSerDe() throws SerDeException {
- }
-
- private final BytesRefArrayWritable[] byteRefArray = new BytesRefArrayWritable[VectorizedRowBatch.DEFAULT_SIZE];
- private final ObjectWritable ow = new ObjectWritable();
- private final ByteStream.Output serializeVectorStream = new ByteStream.Output();
-
- /**
- * Serialize a vectorized row batch
- *
- * @param vrg
- * Vectorized row batch to serialize
- * @param objInspector
- * The ObjectInspector for the row object
- * @return The serialized Writable object
- * @throws SerDeException
- * @see SerDe#serialize(Object, ObjectInspector)
- */
- @Override
- public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
- throws SerDeException {
- try {
- // Validate that the OI is of struct type
- if (objInspector.getCategory() != Category.STRUCT) {
- throw new UnsupportedOperationException(getClass().toString()
- + " can only serialize struct types, but we got: "
- + objInspector.getTypeName());
- }
-
- VectorizedRowBatch batch = (VectorizedRowBatch) vrg;
- StructObjectInspector soi = (StructObjectInspector) objInspector;
- List<? extends StructField> fields = soi.getAllStructFieldRefs();
-
- // Reset the byte buffer
- serializeVectorStream.reset();
- int count = 0;
- int rowIndex = 0;
- for (int i = 0; i < batch.size; i++) {
-
- // If selectedInUse is true then we need to serialize only
- // the selected indexes
- if (batch.selectedInUse) {
- rowIndex = batch.selected[i];
- } else {
- rowIndex = i;
- }
-
- BytesRefArrayWritable byteRow = byteRefArray[i];
- int numCols = fields.size();
-
- if (byteRow == null) {
- byteRow = new BytesRefArrayWritable(numCols);
- byteRefArray[i] = byteRow;
- }
-
- byteRow.resetValid(numCols);
-
- for (int p = 0; p < batch.projectionSize; p++) {
- int k = batch.projectedColumns[p];
- ObjectInspector foi = fields.get(k).getFieldObjectInspector();
- ColumnVector currentColVector = batch.cols[k];
-
- switch (foi.getCategory()) {
- case PRIMITIVE: {
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
- if (!currentColVector.noNulls
- && (currentColVector.isRepeating || currentColVector.isNull[rowIndex])) {
- // The column is null hence write null value
- serializeVectorStream.write(new byte[0], 0, 0);
- } else {
- // If here then the vector value is not null.
- if (currentColVector.isRepeating) {
- // If the vector has repeating values then set rowindex to zero
- rowIndex = 0;
- }
-
- switch (poi.getPrimitiveCategory()) {
- case BOOLEAN: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[k];
- // In vectorization true is stored as 1 and false as 0
- boolean b = lcv.vector[rowIndex] == 1 ? true : false;
- if (b) {
- serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length);
- } else {
- serializeVectorStream.write(LazyUtils.trueBytes, 0, LazyUtils.trueBytes.length);
- }
- }
- break;
- case BYTE:
- case SHORT:
- case INT:
- case LONG:
- LongColumnVector lcv = (LongColumnVector) batch.cols[k];
- LazyLong.writeUTF8(serializeVectorStream, lcv.vector[rowIndex]);
- break;
- case FLOAT:
- case DOUBLE:
- DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[k];
- ByteBuffer b = Text.encode(String.valueOf(dcv.vector[rowIndex]));
- serializeVectorStream.write(b.array(), 0, b.limit());
- break;
- case BINARY: {
- BytesColumnVector bcv = (BytesColumnVector) batch.cols[k];
- byte[] bytes = bcv.vector[rowIndex];
- serializeVectorStream.write(bytes, 0, bytes.length);
- }
- break;
- case STRING:
- case CHAR:
- case VARCHAR: {
- // Is it correct to escape CHAR and VARCHAR?
- BytesColumnVector bcv = (BytesColumnVector) batch.cols[k];
- LazyUtils.writeEscaped(serializeVectorStream, bcv.vector[rowIndex],
- bcv.start[rowIndex],
- bcv.length[rowIndex],
- serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams
- .getNeedsEscape());
- }
- break;
- case TIMESTAMP:
- LongColumnVector tcv = (LongColumnVector) batch.cols[k];
- long timeInNanoSec = tcv.vector[rowIndex];
- Timestamp t = new Timestamp(0);
- TimestampUtils.assignTimeInNanoSec(timeInNanoSec, t);
- TimestampWritable tw = new TimestampWritable();
- tw.set(t);
- LazyTimestamp.writeUTF8(serializeVectorStream, tw);
- break;
- case DATE:
- LongColumnVector dacv = (LongColumnVector) batch.cols[k];
- DateWritable daw = new DateWritable((int) dacv.vector[rowIndex]);
- LazyDate.writeUTF8(serializeVectorStream, daw);
- break;
- default:
- throw new UnsupportedOperationException(
- "Vectorizaton is not supported for datatype:"
- + poi.getPrimitiveCategory());
- }
- }
- break;
- }
- case LIST:
- case MAP:
- case STRUCT:
- case UNION:
- throw new UnsupportedOperationException("Vectorizaton is not supported for datatype:"
- + foi.getCategory());
- default:
- throw new SerDeException("Unknown ObjectInspector category!");
-
- }
-
- byteRow.get(k).set(serializeVectorStream.getData(), count, serializeVectorStream
- .getLength() - count);
- count = serializeVectorStream.getLength();
- }
-
- }
- ow.set(byteRefArray);
- } catch (Exception e) {
- throw new SerDeException(e);
- }
- return ow;
- }
-
- @Override
- public SerDeStats getSerDeStats() {
- return null;
- }
-
- @Override
- public Class<? extends Writable> getSerializedClass() {
- return BytesRefArrayWritable.class;
- }
-
- @Override
- public Object deserialize(Writable blob) throws SerDeException {
-
- // Ideally this should throw UnsupportedOperationException as the serde is
- // vectorized serde. But since RC file reader does not support vectorized reading this
- // is left as it is. This function will be called from VectorizedRowBatchCtx::addRowToBatch
- // to deserialize the row one by one and populate the batch. Once RC file reader supports vectorized
- // reading this serde and be standalone serde with no dependency on ColumnarSerDe.
- return super.deserialize(blob);
- }
-
- @Override
- public ObjectInspector getObjectInspector() throws SerDeException {
- return cachedObjectInspector;
- }
-
- @Override
- public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Deserializes the rowBlob into Vectorized row batch
- * @param rowBlob
- * rowBlob row batch to deserialize
- * @param rowsInBlob
- * Total number of rows in rowBlob to deserialize
- * @param reuseBatch
- * VectorizedRowBatch to which the rows should be serialized *
- * @throws SerDeException
- */
- @Override
- public void deserializeVector(Object rowBlob, int rowsInBlob,
- VectorizedRowBatch reuseBatch) throws SerDeException {
-
- BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) rowBlob;
- DataOutputBuffer buffer = new DataOutputBuffer();
- for (int i = 0; i < rowsInBlob; i++) {
- Object row = deserialize(refArray[i]);
- try {
- VectorizedBatchUtil.addRowToBatch(row,
- (StructObjectInspector) cachedObjectInspector, i,
- reuseBatch, buffer);
- } catch (HiveException e) {
- throw new SerDeException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
index 6557002..a904a50 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
@@ -20,16 +20,10 @@ package org.apache.hadoop.hive.ql.exec.vector;
import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,337 +31,266 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveIntervalDayTime;
import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.IOPrepareCache;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hive.common.util.DateUtils;
/**
- * Context for Vectorized row batch. this calss does eager deserialization of row data using serde
+ * Context for Vectorized row batch. this class does eager deserialization of row data using serde
* in the RecordReader layer.
* It has supports partitions in this layer so that the vectorized batch is populated correctly
* with the partition column.
*/
public class VectorizedRowBatchCtx {
+ private static final long serialVersionUID = 1L;
+
private static final Logger LOG = LoggerFactory.getLogger(VectorizedRowBatchCtx.class.getName());
- // OI for raw row data (EG without partition cols)
- private StructObjectInspector rawRowOI;
+ // The following information is for creating VectorizedRowBatch and for helping with
+ // knowing how the table is partitioned.
+ //
+ // It will be stored in MapWork and ReduceWork.
+ private String[] rowColumnNames;
+ private TypeInfo[] rowColumnTypeInfos;
+ private int dataColumnCount;
+ private int partitionColumnCount;
- // OI for the row (Raw row OI + partition OI)
- private StructObjectInspector rowOI;
+ private String[] scratchColumnTypeNames;
- // Deserializer for the row data
- private Deserializer deserializer;
+ /**
+ * Constructor for VectorizedRowBatchCtx
+ */
+ public VectorizedRowBatchCtx() {
+ }
- // Hash map of partition values. Key=TblColName value=PartitionValue
- private Map<String, Object> partitionValues;
-
- //partition types
- private Map<String, PrimitiveCategory> partitionTypes;
+ public VectorizedRowBatchCtx(String[] rowColumnNames, TypeInfo[] rowColumnTypeInfos,
+ int partitionColumnCount, String[] scratchColumnTypeNames) {
+ this.rowColumnNames = rowColumnNames;
+ this.rowColumnTypeInfos = rowColumnTypeInfos;
+ this.partitionColumnCount = partitionColumnCount;
+ this.scratchColumnTypeNames = scratchColumnTypeNames;
- // partition column positions, for use by classes that need to know whether a given column is a
- // partition column
- private Set<Integer> partitionCols;
-
- // Column projection list - List of column indexes to include. This
- // list does not contain partition columns
- private List<Integer> colsToInclude;
+ dataColumnCount = rowColumnTypeInfos.length - partitionColumnCount;
+ }
- private Map<Integer, String> scratchColumnTypeMap = null;
+ public String[] getRowColumnNames() {
+ return rowColumnNames;
+ }
- /**
- * Constructor for VectorizedRowBatchCtx
- *
- * @param rawRowOI
- * OI for raw row data (EG without partition cols)
- * @param rowOI
- * OI for the row (Raw row OI + partition OI)
- * @param deserializer
- * Deserializer for the row data
- * @param partitionValues
- * Hash map of partition values. Key=TblColName value=PartitionValue
- */
- public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspector rowOI,
- Deserializer deserializer, Map<String, Object> partitionValues,
- Map<String, PrimitiveCategory> partitionTypes) {
- this.rowOI = rowOI;
- this.rawRowOI = rawRowOI;
- this.deserializer = deserializer;
- this.partitionValues = partitionValues;
- this.partitionTypes = partitionTypes;
+ public TypeInfo[] getRowColumnTypeInfos() {
+ return rowColumnTypeInfos;
}
- /**
- * Constructor for VectorizedRowBatchCtx
- */
- public VectorizedRowBatchCtx() {
+ public int getDataColumnCount() {
+ return dataColumnCount;
+ }
+ public int getPartitionColumnCount() {
+ return partitionColumnCount;
+ }
+
+ public String[] getScratchColumnTypeNames() {
+ return scratchColumnTypeNames;
}
/**
- * Initializes the VectorizedRowBatch context based on an scratch column type map and
+ * Initializes the VectorizedRowBatch context based on an scratch column type names and
* object inspector.
- * @param scratchColumnTypeMap
- * @param rowOI
+ * @param structObjectInspector
+ * @param scratchColumnTypeNames
* Object inspector that shapes the column types
+ * @throws HiveException
*/
- public void init(Map<Integer, String> scratchColumnTypeMap,
- StructObjectInspector rowOI) {
- this.scratchColumnTypeMap = scratchColumnTypeMap;
- this.rowOI= rowOI;
- this.rawRowOI = rowOI;
+ public void init(StructObjectInspector structObjectInspector, String[] scratchColumnTypeNames)
+ throws HiveException {
+
+ // Row column information.
+ rowColumnNames = VectorizedBatchUtil.columnNamesFromStructObjectInspector(structObjectInspector);
+ rowColumnTypeInfos = VectorizedBatchUtil.typeInfosFromStructObjectInspector(structObjectInspector);
+ partitionColumnCount = 0;
+ dataColumnCount = rowColumnTypeInfos.length;
+
+ // Scratch column information.
+ this.scratchColumnTypeNames = scratchColumnTypeNames;
}
- /**
- * Initializes VectorizedRowBatch context based on the
- * split and Hive configuration (Job conf with hive Plan).
- *
- * @param hiveConf
- * Hive configuration using Hive plan is extracted
- * @param split
- * File split of the file being read
- * @throws ClassNotFoundException
- * @throws IOException
- * @throws SerDeException
- * @throws InstantiationException
- * @throws IllegalAccessException
- * @throws HiveException
- */
- public void init(Configuration hiveConf, FileSplit split) throws ClassNotFoundException,
- IOException,
- SerDeException,
- InstantiationException,
- IllegalAccessException,
- HiveException {
+ public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, Configuration hiveConf,
+ FileSplit split, Object[] partitionValues) throws IOException {
Map<String, PartitionDesc> pathToPartitionInfo = Utilities
.getMapWork(hiveConf).getPathToPartitionInfo();
- PartitionDesc part = HiveFileFormatUtils
+ PartitionDesc partDesc = HiveFileFormatUtils
.getPartitionDescFromPathRecursively(pathToPartitionInfo,
split.getPath(), IOPrepareCache.get().getPartitionDescMap());
- String partitionPath = split.getPath().getParent().toString();
- scratchColumnTypeMap = Utilities.getMapWorkVectorScratchColumnTypeMap(hiveConf);
- // LOG.info("VectorizedRowBatchCtx init scratchColumnTypeMap " + scratchColumnTypeMap.toString());
-
- Properties partProps =
- (part.getPartSpec() == null || part.getPartSpec().isEmpty()) ?
- part.getTableDesc().getProperties() : part.getProperties();
-
- Class serdeclass = hiveConf.getClassByName(part.getSerdeClassName());
- Deserializer partDeserializer = (Deserializer) serdeclass.newInstance();
- SerDeUtils.initializeSerDe(partDeserializer, hiveConf, part.getTableDesc().getProperties(),
- partProps);
- StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer
- .getObjectInspector();
-
- deserializer = partDeserializer;
-
- // Check to see if this split is part of a partition of a table
- String pcols = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-
- String[] partKeys = null;
- if (pcols != null && pcols.length() > 0) {
-
- // Partitions exist for this table. Get the partition object inspector and
- // raw row object inspector (row with out partition col)
- LinkedHashMap<String, String> partSpec = part.getPartSpec();
- partKeys = pcols.trim().split("/");
- String pcolTypes = partProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
- String[] partKeyTypes = pcolTypes.trim().split(":");
-
- if (partKeys.length > partKeyTypes.length) {
- throw new HiveException("Internal error : partKeys length, " +partKeys.length +
- " greater than partKeyTypes length, " + partKeyTypes.length);
- }
-
- List<String> partNames = new ArrayList<String>(partKeys.length);
- List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length);
- partitionValues = new LinkedHashMap<String, Object>();
- partitionTypes = new LinkedHashMap<String, PrimitiveCategory>();
- for (int i = 0; i < partKeys.length; i++) {
- String key = partKeys[i];
- partNames.add(key);
- ObjectInspector objectInspector = null;
- Object objectVal;
- if (partSpec == null) {
- // for partitionless table, initialize partValue to empty string.
- // We can have partitionless table even if we have partition keys
- // when there is only only partition selected and the partition key is not
- // part of the projection/include list.
- objectVal = null;
- objectInspector = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
- partitionTypes.put(key, PrimitiveCategory.STRING);
- } else {
- // Create a Standard java object Inspector
- objectInspector = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
- TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]));
- objectVal =
- ObjectInspectorConverters.
- getConverter(PrimitiveObjectInspectorFactory.
- javaStringObjectInspector, objectInspector).
- convert(partSpec.get(key));
- partitionTypes.put(key, TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]).getPrimitiveCategory());
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Partition column: name: " + key + ", value: " + objectVal + ", type: " + partitionTypes.get(key));
- }
- partitionValues.put(key, objectVal);
- partObjectInspectors.add(objectInspector);
- }
-
- // Create partition OI
- StructObjectInspector partObjectInspector = ObjectInspectorFactory
- .getStandardStructObjectInspector(partNames, partObjectInspectors);
-
- // Get row OI from partition OI and raw row OI
- StructObjectInspector rowObjectInspector = ObjectInspectorFactory
- .getUnionStructObjectInspector(Arrays
- .asList(new StructObjectInspector[] {partRawRowObjectInspector, partObjectInspector}));
- rowOI = rowObjectInspector;
- rawRowOI = partRawRowObjectInspector;
-
- // We have to do this after we've set rowOI, as getColIndexBasedOnColName uses it
- partitionCols = new HashSet<Integer>();
- if (pcols != null && pcols.length() > 0) {
- for (int i = 0; i < partKeys.length; i++) {
- partitionCols.add(getColIndexBasedOnColName(partKeys[i]));
- }
- }
+ getPartitionValues(vrbCtx, partDesc, partitionValues);
- } else {
+ }
- // No partitions for this table, hence row OI equals raw row OI
- rowOI = partRawRowObjectInspector;
- rawRowOI = partRawRowObjectInspector;
+ public static void getPartitionValues(VectorizedRowBatchCtx vrbCtx, PartitionDesc partDesc,
+ Object[] partitionValues) {
+
+ LinkedHashMap<String, String> partSpec = partDesc.getPartSpec();
+
+ for (int i = 0; i < vrbCtx.partitionColumnCount; i++) {
+ Object objectValue;
+ if (partSpec == null) {
+ // For partition-less table, initialize partValue to empty string.
+ // We can have partition-less table even if we have partition keys
+ // when there is only only partition selected and the partition key is not
+ // part of the projection/include list.
+ objectValue = null;
+ } else {
+ String key = vrbCtx.rowColumnNames[vrbCtx.dataColumnCount + i];
+
+ // Create a Standard java object Inspector
+ ObjectInspector objectInspector =
+ TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+ vrbCtx.rowColumnTypeInfos[vrbCtx.dataColumnCount + i]);
+ objectValue =
+ ObjectInspectorConverters.
+ getConverter(PrimitiveObjectInspectorFactory.
+ javaStringObjectInspector, objectInspector).
+ convert(partSpec.get(key));
+ }
+ partitionValues[i] = objectValue;
}
-
- colsToInclude = ColumnProjectionUtils.getReadColumnIDs(hiveConf);
}
-
+
/**
* Creates a Vectorized row batch and the column vectors.
*
* @return VectorizedRowBatch
* @throws HiveException
*/
- public VectorizedRowBatch createVectorizedRowBatch() throws HiveException {
- List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
- VectorizedRowBatch result = new VectorizedRowBatch(fieldRefs.size());
- for (int j = 0; j < fieldRefs.size(); j++) {
- // If the column is included in the include list or if the column is a
- // partition column then create the column vector. Also note that partition columns are not
- // in the included list.
- if ((colsToInclude == null) || colsToInclude.contains(j)
- || ((partitionValues != null) &&
- partitionValues.containsKey(fieldRefs.get(j).getFieldName()))) {
- ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector();
- result.cols[j] = VectorizedBatchUtil.createColumnVector(foi);
- }
+ public VectorizedRowBatch createVectorizedRowBatch()
+ {
+ int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length;
+ VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
+
+ LOG.info("createVectorizedRowBatch columnsToIncludeTruncated NONE");
+ for (int i = 0; i < rowColumnTypeInfos.length; i++) {
+ TypeInfo typeInfo = rowColumnTypeInfos[i];
+ result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+ }
+
+ for (int i = 0; i < scratchColumnTypeNames.length; i++) {
+ String typeName = scratchColumnTypeNames[i];
+ result.cols[rowColumnTypeInfos.length + i] =
+ VectorizedBatchUtil.createColumnVector(typeName);
}
- result.numCols = fieldRefs.size();
- this.addScratchColumnsToBatch(result);
+
+ result.setPartitionInfo(dataColumnCount, partitionColumnCount);
+
result.reset();
return result;
}
- /**
- * Adds the row to the batch after deserializing the row
- *
- * @param rowIndex
- * Row index in the batch to which the row is added
- * @param rowBlob
- * Row blob (serialized version of row)
- * @param batch
- * Vectorized batch to which the row is added
- * @param buffer a buffer to copy strings into
- * @throws HiveException
- * @throws SerDeException
- */
- public void addRowToBatch(int rowIndex, Writable rowBlob,
- VectorizedRowBatch batch,
- DataOutputBuffer buffer
- ) throws HiveException, SerDeException
+ public VectorizedRowBatch createVectorizedRowBatch(boolean[] columnsToIncludeTruncated)
{
- Object row = this.deserializer.deserialize(rowBlob);
- VectorizedBatchUtil.addRowToBatch(row, this.rawRowOI, rowIndex, batch, buffer);
- }
+ if (columnsToIncludeTruncated == null) {
+ return createVectorizedRowBatch();
+ }
- /**
- * Deserialized set of rows and populates the batch
- *
- * @param rowBlob
- * to deserialize
- * @param batch
- * Vectorized row batch which contains deserialized data
- * @throws SerDeException
- */
- public void convertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob,
- VectorizedRowBatch batch)
- throws SerDeException {
-
- if (deserializer instanceof VectorizedSerde) {
- ((VectorizedSerde) deserializer).deserializeVector(rowBlob, rowsInBlob, batch);
- } else {
- throw new SerDeException(
- "Not able to deserialize row batch. Serde does not implement VectorizedSerde");
+ LOG.info("createVectorizedRowBatch columnsToIncludeTruncated " + Arrays.toString(columnsToIncludeTruncated));
+ int totalColumnCount = rowColumnTypeInfos.length + scratchColumnTypeNames.length;
+ VectorizedRowBatch result = new VectorizedRowBatch(totalColumnCount);
+
+ for (int i = 0; i < columnsToIncludeTruncated.length; i++) {
+ if (columnsToIncludeTruncated[i]) {
+ TypeInfo typeInfo = rowColumnTypeInfos[i];
+ result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
+ }
+ }
+
+ for (int i = dataColumnCount; i < dataColumnCount + partitionColumnCount; i++) {
+ TypeInfo typeInfo = rowColumnTypeInfos[i];
+ result.cols[i] = VectorizedBatchUtil.createColumnVector(typeInfo);
}
+
+ for (int i = 0; i < scratchColumnTypeNames.length; i++) {
+ String typeName = scratchColumnTypeNames[i];
+ result.cols[rowColumnTypeInfos.length + i] =
+ VectorizedBatchUtil.createColumnVector(typeName);
+ }
+
+ result.setPartitionInfo(dataColumnCount, partitionColumnCount);
+
+ result.reset();
+ return result;
}
- private int getColIndexBasedOnColName(String colName) throws HiveException
- {
- List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
- for (int i = 0; i < fieldRefs.size(); i++) {
- if (fieldRefs.get(i).getFieldName().equals(colName)) {
- return i;
+ public boolean[] getColumnsToIncludeTruncated(Configuration conf) {
+ boolean[] columnsToIncludeTruncated = null;
+
+ List<Integer> columnsToIncludeTruncatedList = ColumnProjectionUtils.getReadColumnIDs(conf);
+ if (columnsToIncludeTruncatedList != null && columnsToIncludeTruncatedList.size() > 0 ) {
+
+ // Partitioned columns will not be in the include list.
+
+ boolean[] columnsToInclude = new boolean[dataColumnCount];
+ Arrays.fill(columnsToInclude, false);
+ for (int columnNum : columnsToIncludeTruncatedList) {
+ if (columnNum < dataColumnCount) {
+ columnsToInclude[columnNum] = true;
+ }
+ }
+
+ // Work backwards to find the highest wanted column.
+
+ int highestWantedColumnNum = -1;
+ for (int i = dataColumnCount - 1; i >= 0; i--) {
+ if (columnsToInclude[i]) {
+ highestWantedColumnNum = i;
+ break;
+ }
+ }
+ if (highestWantedColumnNum == -1) {
+ throw new RuntimeException("No columns to include?");
+ }
+ int newColumnCount = highestWantedColumnNum + 1;
+ if (newColumnCount == dataColumnCount) {
+ // Didn't trim any columns off the end. Use the original.
+ columnsToIncludeTruncated = columnsToInclude;
+ } else {
+ columnsToIncludeTruncated = Arrays.copyOf(columnsToInclude, newColumnCount);
}
}
- throw new HiveException("Not able to find column name in row object inspector");
+ return columnsToIncludeTruncated;
}
-
+
/**
* Add the partition values to the batch
*
* @param batch
+ * @param partitionValues
* @throws HiveException
*/
- public void addPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException {
- int colIndex;
- Object value;
- PrimitiveCategory pCategory;
+ public void addPartitionColsToBatch(VectorizedRowBatch batch, Object[] partitionValues)
+ {
if (partitionValues != null) {
- for (String key : partitionValues.keySet()) {
- colIndex = getColIndexBasedOnColName(key);
- value = partitionValues.get(key);
- pCategory = partitionTypes.get(key);
-
- switch (pCategory) {
+ for (int i = 0; i < partitionColumnCount; i++) {
+ Object value = partitionValues[i];
+
+ int colIndex = dataColumnCount + i;
+ String partitionColumnName = rowColumnNames[colIndex];
+ PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) rowColumnTypeInfos[colIndex];
+ switch (primitiveTypeInfo.getPrimitiveCategory()) {
case BOOLEAN: {
LongColumnVector lcv = (LongColumnVector) batch.cols[colIndex];
if (value == null) {
@@ -519,7 +442,7 @@ public class VectorizedRowBatchCtx {
HiveDecimal hd = (HiveDecimal) value;
dv.set(0, hd);
dv.isRepeating = true;
- dv.isNull[0] = false;
+ dv.isNull[0] = false;
}
}
break;
@@ -548,15 +471,15 @@ public class VectorizedRowBatchCtx {
bcv.isNull[0] = true;
bcv.isRepeating = true;
} else {
- bcv.fill(sVal.getBytes());
+ bcv.fill(sVal.getBytes());
bcv.isNull[0] = false;
}
}
break;
-
+
default:
- throw new HiveException("Unable to recognize the partition type " + pCategory +
- " for column " + key);
+ throw new RuntimeException("Unable to recognize the partition type " + primitiveTypeInfo.getPrimitiveCategory() +
+ " for column " + partitionColumnName);
}
}
}
@@ -564,64 +487,12 @@ public class VectorizedRowBatchCtx {
/**
* Determine whether a given column is a partition column
- * @param colnum column number in
+ * @param colNum column number in
* {@link org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch}s created by this context.
* @return true if it is a partition column, false otherwise
*/
- public final boolean isPartitionCol(int colnum) {
- return (partitionCols == null) ? false : partitionCols.contains(colnum);
- }
-
- private void addScratchColumnsToBatch(VectorizedRowBatch vrb) throws HiveException {
- if (scratchColumnTypeMap != null && !scratchColumnTypeMap.isEmpty()) {
- int origNumCols = vrb.numCols;
- int newNumCols = vrb.cols.length+scratchColumnTypeMap.keySet().size();
- vrb.cols = Arrays.copyOf(vrb.cols, newNumCols);
- for (int i = origNumCols; i < newNumCols; i++) {
- String typeName = scratchColumnTypeMap.get(i);
- if (typeName == null) {
- throw new HiveException("No type entry found for column " + i + " in map " + scratchColumnTypeMap.toString());
- }
- vrb.cols[i] = allocateColumnVector(typeName,
- VectorizedRowBatch.DEFAULT_SIZE);
- }
- vrb.numCols = vrb.cols.length;
- }
+ public final boolean isPartitionCol(int colNum) {
+ return colNum >= dataColumnCount && colNum < rowColumnTypeInfos.length;
}
- /**
- * Get the scale and precision for the given decimal type string. The decimal type is assumed to be
- * of the format decimal(precision,scale) e.g. decimal(20,10).
- * @param decimalType The given decimal type string.
- * @return An integer array of size 2 with first element set to precision and second set to scale.
- */
- private static int[] getScalePrecisionFromDecimalType(String decimalType) {
- Pattern p = Pattern.compile("\\d+");
- Matcher m = p.matcher(decimalType);
- m.find();
- int precision = Integer.parseInt(m.group());
- m.find();
- int scale = Integer.parseInt(m.group());
- int [] precScale = { precision, scale };
- return precScale;
- }
-
- public static ColumnVector allocateColumnVector(String type, int defaultSize) {
- if (type.equalsIgnoreCase("double")) {
- return new DoubleColumnVector(defaultSize);
- } else if (VectorizationContext.isStringFamily(type)) {
- return new BytesColumnVector(defaultSize);
- } else if (VectorizationContext.decimalTypePattern.matcher(type).matches()){
- int [] precisionScale = getScalePrecisionFromDecimalType(type);
- return new DecimalColumnVector(defaultSize, precisionScale[0], precisionScale[1]);
- } else if (type.equalsIgnoreCase("long") ||
- type.equalsIgnoreCase("date") ||
- type.equalsIgnoreCase("timestamp") ||
- type.equalsIgnoreCase(serdeConstants.INTERVAL_YEAR_MONTH_TYPE_NAME) ||
- type.equalsIgnoreCase(serdeConstants.INTERVAL_DAY_TIME_TYPE_NAME)) {
- return new LongColumnVector(defaultSize);
- } else {
- throw new RuntimeException("Cannot allocate vector column for " + type);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
index 1d5a9de..6ecfaf7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java
@@ -20,12 +20,8 @@ package org.apache.hadoop.hive.ql.exec.vector.mapjoin;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Future;
-
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,12 +30,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.HashTableLoaderFactory;
import org.apache.hadoop.hive.ql.exec.HashTableLoader;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorColumnMapping;
import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping;
@@ -51,15 +42,12 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type;
import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.optimized.VectorMapJoinOptimizedCreateHashTable;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTable;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinTableContainer;
import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastHashTableLoader;
-import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -69,10 +57,8 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc;
import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -576,7 +562,7 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
smallTableVectorDeserializeRow =
new VectorDeserializeRow<LazyBinaryDeserializeRead>(
new LazyBinaryDeserializeRead(
- VectorizedBatchUtil.primitiveTypeInfosFromTypeNames(
+ VectorizedBatchUtil.typeInfosFromTypeNames(
smallTableMapping.getTypeNames())));
smallTableVectorDeserializeRow.init(smallTableMapping.getOutputColumns());
}
@@ -649,22 +635,12 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
* build join output results in.
*/
protected VectorizedRowBatch setupOverflowBatch() throws HiveException {
+
+ int initialColumnCount = vContext.firstOutputColumnIndex();
VectorizedRowBatch overflowBatch;
- Map<Integer, String> scratchColumnTypeMap = vOutContext.getScratchColumnTypeMap();
- int maxColumn = 0;
- for (int i = 0; i < outputProjection.length; i++) {
- int outputColumn = outputProjection[i];
- if (maxColumn < outputColumn) {
- maxColumn = outputColumn;
- }
- }
- for (int outputColumn : scratchColumnTypeMap.keySet()) {
- if (maxColumn < outputColumn) {
- maxColumn = outputColumn;
- }
- }
- overflowBatch = new VectorizedRowBatch(maxColumn + 1);
+ int totalNumColumns = initialColumnCount + vOutContext.getScratchColumnTypeNames().length;
+ overflowBatch = new VectorizedRowBatch(totalNumColumns);
// First, just allocate just the projection columns we will be using.
for (int i = 0; i < outputProjection.length; i++) {
@@ -674,9 +650,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
}
// Now, add any scratch columns needed for children operators.
- for (int outputColumn : scratchColumnTypeMap.keySet()) {
- String typeName = scratchColumnTypeMap.get(outputColumn);
- allocateOverflowBatchColumnVector(overflowBatch, outputColumn, typeName);
+ int outputColumn = initialColumnCount;
+ for (String typeName : vOutContext.getScratchColumnTypeNames()) {
+ allocateOverflowBatchColumnVector(overflowBatch, outputColumn++, typeName);
}
overflowBatch.projectedColumns = outputProjection;
@@ -696,33 +672,9 @@ public abstract class VectorMapJoinCommonOperator extends MapJoinOperator implem
if (overflowBatch.cols[outputColumn] == null) {
typeName = VectorizationContext.mapTypeNameSynonyms(typeName);
- String columnVectorTypeName;
-
TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeName);
- Type columnVectorType = VectorizationContext.getColumnVectorTypeFromTypeInfo(typeInfo);
-
- switch (columnVectorType) {
- case LONG:
- columnVectorTypeName = "long";
- break;
-
- case DOUBLE:
- columnVectorTypeName = "double";
- break;
-
- case BYTES:
- columnVectorTypeName = "string";
- break;
-
- case DECIMAL:
- columnVectorTypeName = typeName; // Keep precision and scale.
- break;
-
- default:
- throw new HiveException("Unexpected column vector type " + columnVectorType);
- }
- overflowBatch.cols[outputColumn] = VectorizedRowBatchCtx.allocateColumnVector(columnVectorTypeName, VectorizedRowBatch.DEFAULT_SIZE);
+ overflowBatch.cols[outputColumn] = VectorizedBatchUtil.createColumnVector(typeInfo);
if (isLogDebugEnabled) {
LOG.debug(taskName + ", " + getOperatorId() + " VectorMapJoinCommonOperator initializeOp overflowBatch outputColumn " + outputColumn + " class " + overflowBatch.cols[outputColumn].getClass().getSimpleName());
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
index b20cca4..2d9da84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.ByteStream.Output;
/**
@@ -413,8 +414,8 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC
private void setupSpillSerDe(VectorizedRowBatch batch) throws HiveException {
- PrimitiveTypeInfo[] inputObjInspectorsTypeInfos =
- VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector(
+ TypeInfo[] inputObjInspectorsTypeInfos =
+ VectorizedBatchUtil.typeInfosFromStructObjectInspector(
(StructObjectInspector) inputObjInspectors[posBigTable]);
List<Integer> projectedColumns = vContext.getProjectedColumns();
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index a883124..b63deb2 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -510,11 +510,16 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
// ensure filters are not set from previous pushFilters
jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR);
jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
+
+ Utilities.unsetSchemaEvolution(jobConf);
+
TableScanDesc scanDesc = tableScan.getConf();
if (scanDesc == null) {
return;
}
+ Utilities.addTableSchemaToConf(jobConf, tableScan);
+
// construct column name list and types for reference by filter push down
Utilities.setColumnNameList(jobConf, tableScan);
Utilities.setColumnTypeList(jobConf, tableScan);
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
index 9879dfe..8d94da8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java
@@ -36,6 +36,17 @@ public final class IOConstants {
public static final String AVRO = "AVRO";
public static final String AVROFILE = "AVROFILE";
+ /**
+ * The desired TABLE column names and types for input format schema evolution.
+ * This is different than COLUMNS and COLUMNS_TYPES, which are based on individual partition
+ * metadata.
+ *
+ * Virtual columns and partition columns are not included
+ *
+ */
+ public static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns";
+ public static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types";
+
@VisibleForTesting
public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde";
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java
new file mode 100644
index 0000000..6c455bd
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+/**
+ * Marker interface to indicate a given input format is self-describing and
+ * can perform schema evolution itself.
+ */
+public interface SelfDescribingInputFormatInterface {
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
deleted file mode 100644
index e9e1d5a..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * A MapReduce/Hive Vectorized input format for RC files.
- */
-public class VectorizedRCFileInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
- implements InputFormatChecker {
-
- public VectorizedRCFileInputFormat() {
- setMinSplitSize(SequenceFile.SYNC_INTERVAL);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(InputSplit split, JobConf job,
- Reporter reporter) throws IOException {
-
- reporter.setStatus(split.toString());
-
- return new VectorizedRCFileRecordReader(job, (FileSplit) split);
- }
-
- @Override
- public boolean validateInput(FileSystem fs, HiveConf conf,
- List<FileStatus> files) throws IOException {
- if (files.size() <= 0) {
- return false;
- }
- for (int fileId = 0; fileId < files.size(); fileId++) {
- RCFile.Reader reader = null;
- try {
- reader = new RCFile.Reader(fs, files.get(fileId)
- .getPath(), conf);
- reader.close();
- reader = null;
- } catch (IOException e) {
- return false;
- } finally {
- if (null != reader) {
- reader.close();
- }
- }
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
deleted file mode 100644
index 4cc1c2f..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.ql.io;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.WeakHashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
-import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer;
-import org.apache.hadoop.hive.ql.io.RCFile.Reader;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-
-/**
- * RCFileRecordReader.
- */
-public class VectorizedRCFileRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> {
-
- private final Reader in;
- private final long start;
- private final long end;
- private boolean more = true;
- protected Configuration conf;
- private final FileSplit split;
- private final boolean useCache;
- private VectorizedRowBatchCtx rbCtx;
- private final LongWritable keyCache = new LongWritable();
- private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable();
- private boolean addPartitionCols = true;
- private final DataOutputBuffer buffer = new DataOutputBuffer();
-
- private static RCFileSyncCache syncCache = new RCFileSyncCache();
-
- private static final class RCFileSyncEntry {
- long end;
- long endSync;
- }
-
- private static final class RCFileSyncCache {
-
- private final Map<String, RCFileSyncEntry> cache;
-
- public RCFileSyncCache() {
- cache = Collections.synchronizedMap(new WeakHashMap<String, RCFileSyncEntry>());
- }
-
- public void put(FileSplit split, long endSync) {
- Path path = split.getPath();
- long end = split.getStart() + split.getLength();
- String key = path.toString() + "+" + String.format("%d", end);
-
- RCFileSyncEntry entry = new RCFileSyncEntry();
- entry.end = end;
- entry.endSync = endSync;
- if (entry.endSync >= entry.end) {
- cache.put(key, entry);
- }
- }
-
- public long get(FileSplit split) {
- Path path = split.getPath();
- long start = split.getStart();
- String key = path.toString() + "+" + String.format("%d", start);
- RCFileSyncEntry entry = cache.get(key);
- if (entry != null) {
- return entry.endSync;
- }
- return -1;
- }
- }
-
- public VectorizedRCFileRecordReader(Configuration conf, FileSplit split)
- throws IOException {
-
- Path path = split.getPath();
- FileSystem fs = path.getFileSystem(conf);
- this.in = new RCFile.Reader(fs, path, conf);
- this.end = split.getStart() + split.getLength();
- this.conf = conf;
- this.split = split;
-
- useCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSERCFILESYNCCACHE);
-
- if (split.getStart() > in.getPosition()) {
- long oldSync = useCache ? syncCache.get(split) : -1;
- if (oldSync == -1) {
- in.sync(split.getStart()); // sync to start
- } else {
- in.seek(oldSync);
- }
- }
-
- this.start = in.getPosition();
-
- more = start < end;
- try {
- rbCtx = new VectorizedRowBatchCtx();
- rbCtx.init(conf, split);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public Class<?> getKeyClass() {
- return LongWritable.class;
- }
-
- public Class<?> getValueClass() {
- return BytesRefArrayWritable.class;
- }
-
- @Override
- public NullWritable createKey() {
- return NullWritable.get();
- }
-
- @Override
- public VectorizedRowBatch createValue() {
- VectorizedRowBatch result;
- try {
- result = rbCtx.createVectorizedRowBatch();
- } catch (HiveException e) {
- throw new RuntimeException("Error creating a batch", e);
- }
- return result;
- }
-
- public boolean nextBlock() throws IOException {
- return in.nextBlock();
- }
-
- @Override
- public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
-
- // Reset column fields noNull values to true
- VectorizedBatchUtil.setNoNullFields(value);
- buffer.reset();
- value.selectedInUse = false;
- for (int i = 0; i < value.numCols; i++) {
- value.cols[i].isRepeating = false;
- }
-
- int i = 0;
- try {
-
- for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) {
- more = next(keyCache);
- if (more) {
- // Check and update partition cols if necessary. Ideally this should be done
- // in CreateValue() as the partition is constant per split. But since Hive uses
- // CombineHiveRecordReader and as this does not call CreateValue() for
- // each new RecordReader it creates, this check is required in next()
- if (addPartitionCols) {
- rbCtx.addPartitionColsToBatch(value);
- addPartitionCols = false;
- }
- in.getCurrentRow(colsCache);
- // Currently RCFile reader does not support reading vectorized
- // data. Populating the batch by adding one row at a time.
- rbCtx.addRowToBatch(i, (Writable) colsCache, value, buffer);
- } else {
- break;
- }
- }
- } catch (Exception e) {
- throw new RuntimeException("Error while getting next row", e);
- }
- value.size = i;
- return more;
- }
-
- protected boolean next(LongWritable key) throws IOException {
- if (!more) {
- return false;
- }
-
- more = in.next(key);
-
- long lastSeenSyncPos = in.lastSeenSyncPos();
-
- if (lastSeenSyncPos >= end) {
- if (useCache) {
- syncCache.put(split, lastSeenSyncPos);
- }
- more = false;
- return more;
- }
- return more;
- }
-
- /**
- * Return the progress within the input split.
- *
- * @return 0.0 to 1.0 of the input byte range
- */
- public float getProgress() throws IOException {
- if (end == start) {
- return 0.0f;
- } else {
- return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start));
- }
- }
-
- public long getPos() throws IOException {
- return in.getPosition();
- }
-
- public KeyBuffer getKeyBuffer() {
- return in.getCurrentKeyBufferObj();
- }
-
- protected void seek(long pos) throws IOException {
- in.seek(pos);
- }
-
- public void sync(long pos) throws IOException {
- in.sync(pos);
- }
-
- public void resetBuffer() {
- in.resetBuffer();
- }
-
- public long getStart() {
- return start;
- }
-
- public void close() throws IOException {
- in.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java
deleted file mode 100644
index aaf4eb4..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.io.orc;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Factory for creating ORC tree readers. These tree readers can handle type promotions and type
- * conversions.
- */
-public class ConversionTreeReaderFactory extends TreeReaderFactory {
-
- // TODO: This is currently only a place holder for type conversions.
-
- public static TreeReader createTreeReader(int columnId,
- List<OrcProto.Type> types,
- boolean[] included,
- boolean skipCorrupt
- ) throws IOException {
- return TreeReaderFactory.createTreeReader(columnId, types, included, skipCorrupt);
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index b4dd4ab..56ac40b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -402,7 +402,7 @@ public final class OrcFile {
public WriterOptions inspector(ObjectInspector value) {
this.inspector = value;
if (!explicitSchema) {
- schema = OrcOutputFormat.convertTypeInfo(
+ schema = OrcUtils.convertTypeInfo(
TypeInfoUtils.getTypeInfoFromObjectInspector(value));
}
return this;
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 46862da..714af23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -61,8 +61,10 @@ import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterVersion;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
@@ -73,7 +75,6 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -116,7 +117,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
*/
public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
InputFormatChecker, VectorizedInputFormatInterface, LlapWrappableInputFormatInterface,
- AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination {
+ SelfDescribingInputFormatInterface, AcidInputFormat<NullWritable, OrcStruct>,
+ CombineHiveInputFormat.AvoidSplitCombination {
static enum SplitStrategyKind {
HYBRID,
@@ -232,7 +234,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
Configuration conf,
long offset, long length
) throws IOException {
+
+ /**
+ * Do we have schema on read in the configuration variables?
+ */
+ TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf);
+
Reader.Options options = new Reader.Options().range(offset, length);
+ options.schema(schema);
boolean isOriginal = isOriginal(file);
List<OrcProto.Type> types = file.getTypes();
options.include(genIncludedColumns(types, conf, isOriginal));
@@ -1415,7 +1424,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (vectorMode) {
return (org.apache.hadoop.mapred.RecordReader)
- new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit);
+ new VectorizedOrcAcidRowReader(inner, conf,
+ Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit);
}
return new NullKeyRecordReader(inner, conf);
}
@@ -1467,10 +1477,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
+ // The schema type description does not include the ACID fields (i.e. it is the
+ // non-ACID original schema).
+ private static boolean SCHEMA_TYPES_IS_ORIGINAL = true;
@Override
public RowReader<OrcStruct> getReader(InputSplit inputSplit,
- Options options) throws IOException {
+ Options options)
+ throws IOException {
final OrcSplit split = (OrcSplit) inputSplit;
final Path path = split.getPath();
Path root;
@@ -1485,36 +1499,30 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas());
final Configuration conf = options.getConfiguration();
+
+
+ /**
+ * Do we have schema on read in the configuration variables?
+ */
+ TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf);
+
final Reader reader;
final int bucket;
- Reader.Options readOptions = new Reader.Options();
+ Reader.Options readOptions = new Reader.Options().schema(schema);
readOptions.range(split.getStart(), split.getLength());
+
+ // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription.
+ final List<Type> schemaTypes = OrcUtils.getOrcTypes(schema);
+ readOptions.include(genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL));
+ setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL);
+
if (split.hasBase()) {
bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf)
.getBucket();
reader = OrcFile.createReader(path, OrcFile.readerOptions(conf));
- final List<OrcProto.Type> types = reader.getTypes();
- readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
- setSearchArgument(readOptions, types, conf, split.isOriginal());
} else {
bucket = (int) split.getStart();
reader = null;
- if(deltas != null && deltas.length > 0) {
- Path bucketPath = AcidUtils.createBucketFile(deltas[0], bucket);
- OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
- FileSystem fs = readerOptions.getFilesystem();
- if(fs == null) {
- fs = path.getFileSystem(options.getConfiguration());
- }
- if(fs.exists(bucketPath)) {
- /* w/o schema evolution (which ACID doesn't support yet) all delta
- files have the same schema, so choosing the 1st one*/
- final List<OrcProto.Type> types =
- OrcFile.createReader(bucketPath, readerOptions).getTypes();
- readOptions.include(genIncludedColumns(types, conf, split.isOriginal()));
- setSearchArgument(readOptions, types, conf, split.isOriginal());
- }
- }
}
String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
Long.MAX_VALUE + ":");
@@ -1527,9 +1535,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
@Override
public ObjectInspector getObjectInspector() {
- return ((StructObjectInspector) records.getObjectInspector())
- .getAllStructFieldRefs().get(OrcRecordUpdater.ROW)
- .getFieldObjectInspector();
+ return OrcStruct.createObjectInspector(0, schemaTypes);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index 8a5de7f..2d0eaaf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -68,88 +68,6 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
private static final Logger LOG = LoggerFactory.getLogger(OrcOutputFormat.class);
- static TypeDescription convertTypeInfo(TypeInfo info) {
- switch (info.getCategory()) {
- case PRIMITIVE: {
- PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info;
- switch (pinfo.getPrimitiveCategory()) {
- case BOOLEAN:
- return TypeDescription.createBoolean();
- case BYTE:
- return TypeDescription.createByte();
- case SHORT:
- return TypeDescription.createShort();
- case INT:
- return TypeDescription.createInt();
- case LONG:
- return TypeDescription.createLong();
- case FLOAT:
- return TypeDescription.createFloat();
- case DOUBLE:
- return TypeDescription.createDouble();
- case STRING:
- return TypeDescription.createString();
- case DATE:
- return TypeDescription.createDate();
- case TIMESTAMP:
- return TypeDescription.createTimestamp();
- case BINARY:
- return TypeDescription.createBinary();
- case DECIMAL: {
- DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo;
- return TypeDescription.createDecimal()
- .withScale(dinfo.getScale())
- .withPrecision(dinfo.getPrecision());
- }
- case VARCHAR: {
- BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
- return TypeDescription.createVarchar()
- .withMaxLength(cinfo.getLength());
- }
- case CHAR: {
- BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo;
- return TypeDescription.createChar()
- .withMaxLength(cinfo.getLength());
- }
- default:
- throw new IllegalArgumentException("ORC doesn't handle primitive" +
- " category " + pinfo.getPrimitiveCategory());
- }
- }
- case LIST: {
- ListTypeInfo linfo = (ListTypeInfo) info;
- return TypeDescription.createList
- (convertTypeInfo(linfo.getListElementTypeInfo()));
- }
- case MAP: {
- MapTypeInfo minfo = (MapTypeInfo) info;
- return TypeDescription.createMap
- (convertTypeInfo(minfo.getMapKeyTypeInfo()),
- convertTypeInfo(minfo.getMapValueTypeInfo()));
- }
- case UNION: {
- UnionTypeInfo minfo = (UnionTypeInfo) info;
- TypeDescription result = TypeDescription.createUnion();
- for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) {
- result.addUnionChild(convertTypeInfo(child));
- }
- return result;
- }
- case STRUCT: {
- StructTypeInfo sinfo = (StructTypeInfo) info;
- TypeDescription result = TypeDescription.createStruct();
- for(String fieldName: sinfo.getAllStructFieldNames()) {
- result.addField(fieldName,
- convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName)));
- }
- return result;
- }
- default:
- throw new IllegalArgumentException("ORC doesn't handle " +
- info.getCategory());
- }
- }
-
private static class OrcRecordWriter
implements RecordWriter<NullWritable, OrcSerdeRow>,
StatsProvidingRecordWriter {
@@ -242,7 +160,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
TypeDescription schema = TypeDescription.createStruct();
for (int i = 0; i < columnNames.size(); ++i) {
schema.addField(columnNames.get(i),
- convertTypeInfo(columnTypes.get(i)));
+ OrcUtils.convertTypeInfo(columnTypes.get(i)));
}
if (LOG.isDebugEnabled()) {
LOG.debug("ORC schema = " + schema);
http://git-wip-us.apache.org/repos/asf/hive/blob/30f20e99/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index ebe1afd..bc4d2ee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.io.orc;
import com.google.common.annotations.VisibleForTesting;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -25,23 +26,16 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
-import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -57,6 +51,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
private final Configuration conf;
private final boolean collapse;
private final RecordReader baseReader;
+ private final ObjectInspector objectInspector;
private final long offset;
private final long length;
private final ValidTxnList validTxnList;
@@ -443,6 +438,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
this.offset = options.getOffset();
this.length = options.getLength();
this.validTxnList = validTxnList;
+
+ TypeDescription typeDescr = OrcUtils.getDesiredRowTypeDescr(conf);
+ if (typeDescr == null) {
+ throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg());
+ }
+
+ objectInspector = OrcRecordUpdater.createEventSchema
+ (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr)));
+
// modify the options to reflect the event instead of the base row
Reader.Options eventOptions = createEventOptions(options);
if (reader == null) {
@@ -675,46 +679,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
@Override
public ObjectInspector getObjectInspector() {
- // Read the configuration parameters
- String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS);
- // NOTE: if "columns.types" is missing, all columns will be of String type
- String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
-
- // Parse the configuration parameters
- ArrayList<String> columnNames = new ArrayList<String>();
- Deque<Integer> virtualColumns = new ArrayDeque<Integer>();
- if (columnNameProperty != null && columnNameProperty.length() > 0) {
- String[] colNames = columnNameProperty.split(",");
- for (int i = 0; i < colNames.length; i++) {
- if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(colNames[i])) {
- virtualColumns.addLast(i);
- } else {
- columnNames.add(colNames[i]);
- }
- }
- }
- if (columnTypeProperty == null) {
- // Default type: all string
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < columnNames.size(); i++) {
- if (i > 0) {
- sb.append(":");
- }
- sb.append("string");
- }
- columnTypeProperty = sb.toString();
- }
-
- ArrayList<TypeInfo> fieldTypes =
- TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
- while (virtualColumns.size() > 0) {
- fieldTypes.remove(virtualColumns.removeLast());
- }
- StructTypeInfo rowType = new StructTypeInfo();
- rowType.setAllStructFieldNames(columnNames);
- rowType.setAllStructFieldTypeInfos(fieldTypes);
- return OrcRecordUpdater.createEventSchema
- (OrcStruct.createObjectInspector(rowType));
+ return objectInspector;
}
@Override