You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/05/03 16:44:23 UTC
svn commit: r1478819 - in /hive/branches/vectorization/ql/src:
java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java
Author: hashutosh
Date: Fri May 3 14:44:22 2013
New Revision: 1478819
URL: http://svn.apache.org/r1478819
Log:
HIVE-4480 : Implement partition support for vectorized query execution (Sarvesh Sakalanaga via Ashutosh Chauhan)
Added:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1478819&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Fri May 3 14:44:22 2013
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.IOPrepareCache;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+
+/**
+ * Context for Vectorized row batch. this calss does eager deserialization of row data using serde in the RecordReader layer.
+ * It has supports partitions in this layer so that the vectorized batch is populated correctly with the partition column.
+ * VectorizedRowBatchCtx.
+ *
+ */
+public class VectorizedRowBatchCtx {
+
+ // OI for raw row data (EG without partition cols)
+ private StructObjectInspector rawRowOI;
+
+ // OI for the row (Raw row OI + partition OI)
+ private StructObjectInspector rowOI;
+
+ // Deserializer for the row data
+ private Deserializer deserializer;
+
+ // Hash map of partition values. Key=TblColName value=PartitionValue
+ private LinkedHashMap<String, String> partitionValues;
+
+ /**
+ * Constructor for VectorizedRowBatchCtx
+ *
+ * @param rawRowOI
+ * OI for raw row data (EG without partition cols)
+ * @param rowOI
+ * OI for the row (Raw row OI + partition OI)
+ * @param deserializer
+ * Deserializer for the row data
+ * @param partitionValues
+ * Hash map of partition values. Key=TblColName value=PartitionValue
+ */
+ public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspector rowOI,
+ Deserializer deserializer, LinkedHashMap<String, String> partitionValues) {
+ this.rowOI = rowOI;
+ this.rawRowOI = rawRowOI;
+ this.deserializer = deserializer;
+ this.partitionValues = partitionValues;
+ }
+
+ /**
+ * Constructor for VectorizedRowBatchCtx
+ */
+ public VectorizedRowBatchCtx() {
+
+ }
+
+ /**
+ * Initializes VectorizedRowBatch context based on the
+ * split and Hive configuration (Job conf with hive Plan).
+ *
+ * @param hiveConf
+ * Hive configuration using Hive plan is extracted
+ * @param split
+ * File split of the file being read
+ * @throws ClassNotFoundException
+ * @throws IOException
+ * @throws SerDeException
+ * @throws InstantiationException
+ * @throws IllegalAccessException
+ * @throws HiveException
+ */
+ public void Init(Configuration hiveConf, FileSplit split) throws ClassNotFoundException,
+ IOException,
+ SerDeException,
+ InstantiationException,
+ IllegalAccessException, HiveException {
+
+ Map<String, PartitionDesc> pathToPartitionInfo = Utilities
+ .getMapRedWork(hiveConf).getPathToPartitionInfo();
+
+ PartitionDesc part = HiveFileFormatUtils
+ .getPartitionDescFromPathRecursively(pathToPartitionInfo,
+ split.getPath(), IOPrepareCache.get().getPartitionDescMap());
+ Class serdeclass = part.getDeserializerClass();
+
+ if (serdeclass == null) {
+ String className = part.getSerdeClassName();
+ if ((className == null) || (className.isEmpty())) {
+ throw new HiveException(
+ "SerDe class or the SerDe class name is not set for table: "
+ + part.getProperties().getProperty("name"));
+ }
+ serdeclass = hiveConf.getClassByName(className);
+ }
+
+ Properties partProps =
+ (part.getPartSpec() == null || part.getPartSpec().isEmpty()) ?
+ part.getTableDesc().getProperties() : part.getProperties();
+
+ Deserializer partDeserializer = (Deserializer) serdeclass.newInstance();
+ partDeserializer.initialize(hiveConf, partProps);
+ StructObjectInspector partRawRowObjectInspector = (StructObjectInspector) partDeserializer
+ .getObjectInspector();
+
+ deserializer = partDeserializer;
+
+ // Check to see if this split is part of a partition of a table
+ String pcols = partProps
+ .getProperty(org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+
+ if (pcols != null && pcols.length() > 0) {
+
+ // Partitions exist for this table. Get the partition object inspector and
+ // raw row object inspector (row with out partition col)
+ LinkedHashMap<String, String> partSpec = part.getPartSpec();
+ String[] partKeys = pcols.trim().split("/");
+ List<String> partNames = new ArrayList<String>(partKeys.length);
+ partitionValues = new LinkedHashMap<String, String>();
+ List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(
+ partKeys.length);
+ for (int i = 0; i < partKeys.length; i++) {
+ String key = partKeys[i];
+ partNames.add(key);
+ partitionValues.put(key, partSpec.get(key));
+ partObjectInspectors
+ .add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
+ }
+
+ // Create partition OI
+ StructObjectInspector partObjectInspector = ObjectInspectorFactory
+ .getStandardStructObjectInspector(partNames, partObjectInspectors);
+
+ // Get row OI from partition OI and raw row OI
+ StructObjectInspector rowObjectInspector = ObjectInspectorFactory
+ .getUnionStructObjectInspector(Arrays
+ .asList(new StructObjectInspector[] {partRawRowObjectInspector, partObjectInspector}));
+ rowOI = rowObjectInspector;
+ rawRowOI = partRawRowObjectInspector;
+ } else {
+
+ // No partitions for this table, hence row OI equals raw row OI
+ rowOI = partRawRowObjectInspector;
+ rawRowOI = partRawRowObjectInspector;
+ }
+ }
+
+ /**
+ * Creates a Vectorized row batch and the column vectors.
+ *
+ * @return VectorizedRowBatch
+ * @throws HiveException
+ */
+ public VectorizedRowBatch CreateVectorizedRowBatch() throws HiveException
+ {
+ List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
+ VectorizedRowBatch result = new VectorizedRowBatch(fieldRefs.size());
+ for (int j = 0; j < fieldRefs.size(); j++) {
+ ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector();
+ switch (foi.getCategory()) {
+ case PRIMITIVE: {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
+ // Vectorization currently only supports the following data types:
+ // SHORT, INT, LONG, FLOAT, DOUBLE, STRING
+ switch (poi.getPrimitiveCategory()) {
+ case SHORT:
+ case INT:
+ case LONG:
+ result.cols[j] = new LongColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+ break;
+ case FLOAT:
+ case DOUBLE:
+ result.cols[j] = new DoubleColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+ break;
+ case STRING:
+ result.cols[j] = new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE);
+ break;
+ default:
+ throw new RuntimeException("Vectorizaton is not supported for datatype:"
+ + poi.getPrimitiveCategory());
+ }
+ break;
+ }
+ case LIST:
+ case MAP:
+ case STRUCT:
+ case UNION:
+ throw new HiveException("Vectorizaton is not supported for datatype:"
+ + foi.getCategory());
+ default:
+ throw new HiveException("Unknown ObjectInspector category!");
+
+ }
+ }
+ result.numCols = fieldRefs.size();
+ return result;
+ }
+
+ /**
+ * Adds the row to the batch after deserializing the row
+ *
+ * @param rowIndex
+ * Row index in the batch to which the row is added
+ * @param rowBlob
+ * Row blob (serialized version of row)
+ * @param batch
+ * Vectorized batch to which the row is added
+ * @throws HiveException
+ * @throws SerDeException
+ */
+ public void AddRowToBatch(int rowIndex, Writable rowBlob, VectorizedRowBatch batch)
+ throws HiveException, SerDeException
+ {
+ List<? extends StructField> fieldRefs = rawRowOI.getAllStructFieldRefs();
+ Object row = this.deserializer.deserialize(rowBlob);
+ // Iterate thru the cols and load the batch
+ for (int i = 0; i < fieldRefs.size(); i++) {
+ Object fieldData = rawRowOI.getStructFieldData(row, fieldRefs.get(i));
+ ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector();
+
+ // Vectorization only supports PRIMITIVE data types. Assert the same
+ assert (foi.getCategory() == Category.PRIMITIVE);
+
+ // Get writable object
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
+ Object writableCol = poi.getPrimitiveWritableObject(fieldData);
+
+ // NOTE: The default value for null fields in vectorization is -1 for int types
+ switch (poi.getPrimitiveCategory()) {
+ case SHORT: {
+ LongColumnVector lcv = (LongColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ lcv.vector[rowIndex] = ((ShortWritable) writableCol).get();
+ lcv.isNull[rowIndex] = false;
+ } else {
+ lcv.vector[rowIndex] = 1;
+ SetNullColIsNullValue(lcv, rowIndex);
+ }
+ }
+ break;
+ case INT: {
+ LongColumnVector lcv = (LongColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ lcv.vector[rowIndex] = ((IntWritable) writableCol).get();
+ lcv.isNull[rowIndex] = false;
+ } else {
+ lcv.vector[rowIndex] = 1;
+ SetNullColIsNullValue(lcv, rowIndex);
+ }
+ }
+ break;
+ case LONG: {
+ LongColumnVector lcv = (LongColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ lcv.vector[rowIndex] = ((LongWritable) writableCol).get();
+ lcv.isNull[rowIndex] = false;
+ } else {
+ lcv.vector[rowIndex] = 1;
+ SetNullColIsNullValue(lcv, rowIndex);
+ }
+ }
+ break;
+ case FLOAT: {
+ DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ dcv.vector[rowIndex] = ((FloatWritable) writableCol).get();
+ dcv.isNull[rowIndex] = false;
+ } else {
+ dcv.vector[rowIndex] = Double.NaN;
+ SetNullColIsNullValue(dcv, rowIndex);
+ }
+ }
+ break;
+ case DOUBLE: {
+ DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get();
+ dcv.isNull[rowIndex] = false;
+ } else {
+ dcv.vector[rowIndex] = Double.NaN;
+ SetNullColIsNullValue(dcv, rowIndex);
+ }
+ }
+ break;
+ case STRING: {
+ BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ bcv.isNull[rowIndex] = false;
+ Text colText = (Text) writableCol;
+ bcv.setRef(rowIndex, colText.getBytes(), 0, colText.getLength());
+ } else {
+ SetNullColIsNullValue(bcv, rowIndex);
+ }
+ }
+ break;
+ default:
+ throw new HiveException("Vectorizaton is not supported for datatype:"
+ + poi.getPrimitiveCategory());
+ }
+ }
+ }
+
+ /**
+ * Iterates thru all the column vectors and sets noNull to
+ * specified value.
+ *
+ * @param valueToSet
+ * noNull value to set
+ * @param batch
+ * Batch on which noNull is set
+ */
+ public void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) {
+ for (int i = 0; i < batch.numCols; i++) {
+ batch.cols[i].noNulls = true;
+ }
+ }
+
+ public void ConvertRowBatchBlobToVectorizedBatch(Writable[] rowBlobs, VectorizedRowBatch batch) {
+ // No reader supports this operation. If a reader returns a set of rows then
+ // this function can be used to converts that row blob batch into vectorized batch.
+ throw new UnsupportedOperationException();
+ }
+
+ private int GetColIndexBasedOnColName(String colName) throws HiveException
+ {
+ List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
+ for (int i = 0; i < fieldRefs.size(); i++) {
+ if (fieldRefs.get(i).getFieldName() == colName) {
+ return i;
+ }
+ }
+ throw new HiveException("Not able to find column name in row object inspector");
+ }
+
+ /**
+ * Add the partition values to the batch
+ *
+ * @param batch
+ * @throws HiveException
+ */
+ public void AddPartitionColsToBatch(VectorizedRowBatch batch) throws HiveException
+ {
+ int colIndex;
+ String value;
+ BytesColumnVector bcv;
+ for (String key : partitionValues.keySet()) {
+ colIndex = GetColIndexBasedOnColName(key);
+ value = partitionValues.get(key);
+ bcv = (BytesColumnVector) batch.cols[colIndex];
+ bcv.setRef(0, value.getBytes(), 0, value.length());
+ bcv.isRepeating = true;
+ bcv.isNull[0] = false;
+ bcv.noNulls = true;
+ }
+ }
+
+ private void SetNullColIsNullValue(ColumnVector cv, int rowIndex) {
+ cv.isNull[rowIndex] = true;
+ if (cv.noNulls) {
+ cv.noNulls = false;
+ }
+ }
+
+}
Added: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java?rev=1478819&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java (added)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java Fri May 3 14:44:22 2013
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Class that tests the functionality of VectorizedRowBatchCtx
+ */
+public class TestVectorizedRowBatchCtx {
+
+ private Configuration conf;
+ private FileSystem fs;
+ private Path testFilePath;
+ private int colCount;
+ private ColumnarSerDe serDe;
+ private Properties tbl;
+
+ @Before
+ public void openFileSystem() throws Exception {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ fs.setWorkingDirectory(workDir);
+ testFilePath = new Path("TestVectorizedRowBatchCtx.testDump.rc");
+ fs.delete(testFilePath, false);
+ }
+
+ private void InitSerde() {
+ tbl = new Properties();
+
+ // Set the configuration parameters
+ tbl.setProperty(serdeConstants.SERIALIZATION_FORMAT, "6");
+ tbl.setProperty("columns",
+ "ashort,aint,along,adouble,afloat,astring");
+ tbl.setProperty("columns.types",
+ "smallint:int:bigint:double:float:string");
+ colCount = 6;
+ tbl.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL");
+
+ try {
+ serDe = new ColumnarSerDe();
+ serDe.initialize(conf, tbl);
+ } catch (SerDeException e) {
+ new RuntimeException(e);
+ }
+ }
+
+ private void WriteRCFile(FileSystem fs, Path file, Configuration conf)
+ throws IOException, SerDeException {
+ fs.delete(file, true);
+
+ RCFileOutputFormat.setColumnNumber(conf, colCount);
+ RCFile.Writer writer =
+ new RCFile.Writer(fs, conf, file, null, null,
+ new DefaultCodec());
+
+ for (int i = 0; i < 10; ++i) {
+ BytesRefArrayWritable bytes = new BytesRefArrayWritable(colCount);
+ BytesRefWritable cu;
+
+ if (i % 3 != 0) {
+ cu = new BytesRefWritable((i + "").getBytes("UTF-8"), 0, (i + "").getBytes("UTF-8").length);
+ bytes.set(0, cu);
+
+ cu = new BytesRefWritable((i + 100 + "").getBytes("UTF-8"), 0,
+ (i + 100 + "").getBytes("UTF-8").length);
+ bytes.set(1, cu);
+
+ cu = new BytesRefWritable((i + 200 + "").getBytes("UTF-8"), 0,
+ (i + 200 + "").getBytes("UTF-8").length);
+ bytes.set(2, cu);
+
+ cu = new BytesRefWritable((i + 1.23 + "").getBytes("UTF-8"), 0,
+ (i + 1.23 + "").getBytes("UTF-8").length);
+ bytes.set(3, cu);
+
+ cu = new BytesRefWritable((i + 2.23 + "").getBytes("UTF-8"), 0,
+ (i + 2.23 + "").getBytes("UTF-8").length);
+ bytes.set(4, cu);
+
+ cu = new BytesRefWritable(("Test string").getBytes("UTF-8"), 0,
+ ("Test string").getBytes("UTF-8").length);
+ bytes.set(5, cu);
+ } else {
+ cu = new BytesRefWritable((i + "").getBytes("UTF-8"), 0, (i + "").getBytes("UTF-8").length);
+ bytes.set(0, cu);
+
+ cu = new BytesRefWritable(new byte[0], 0, 0);
+ bytes.set(1, cu);
+
+ cu = new BytesRefWritable(new byte[0], 0, 0);
+ bytes.set(2, cu);
+
+ cu = new BytesRefWritable(new byte[0], 0, 0);
+ bytes.set(3, cu);
+
+ cu = new BytesRefWritable(new byte[0], 0, 0);
+ bytes.set(4, cu);
+
+ cu = new BytesRefWritable(("Test string").getBytes("UTF-8"), 0,
+ ("NULL").getBytes("UTF-8").length);
+ bytes.set(5, cu);
+ }
+ writer.append(bytes);
+ }
+ writer.close();
+ }
+
+ private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, IOException {
+
+ RCFile.Reader reader = new RCFile.Reader(fs, this.testFilePath, conf);
+
+ // Get object inspector
+ StructObjectInspector oi = (StructObjectInspector) serDe
+ .getObjectInspector();
+ List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+
+ Assert.assertEquals("Field size should be 6", colCount, fieldRefs.size());
+
+ // Create the context
+ VectorizedRowBatchCtx ctx = new VectorizedRowBatchCtx(oi, oi, serDe, null);
+ VectorizedRowBatch batch = ctx.CreateVectorizedRowBatch();
+ ctx.SetNoNullFields(true, batch);
+
+ // Iterate thru the rows and populate the batch
+ LongWritable rowID = new LongWritable();
+ for (int i = 0; i < 10; i++) {
+ reader.next(rowID);
+ BytesRefArrayWritable cols = new BytesRefArrayWritable();
+ reader.getCurrentRow(cols);
+ cols.resetValid(colCount);
+ ctx.AddRowToBatch(i, cols, batch);
+ }
+ reader.close();
+ batch.size = 10;
+ return batch;
+ }
+
+ void ValidateRowBatch(VectorizedRowBatch batch) throws IOException, SerDeException {
+
+ LongWritable rowID = new LongWritable();
+ RCFile.Reader reader = new RCFile.Reader(fs, this.testFilePath, conf);
+ for (int i = 0; i < batch.size; i++) {
+ reader.next(rowID);
+ BytesRefArrayWritable cols = new BytesRefArrayWritable();
+ reader.getCurrentRow(cols);
+ cols.resetValid(colCount);
+ Object row = serDe.deserialize(cols);
+
+ StructObjectInspector oi = (StructObjectInspector) serDe
+ .getObjectInspector();
+ List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+
+ for (int j = 0; j < fieldRefs.size(); j++) {
+ Object fieldData = oi.getStructFieldData(row, fieldRefs.get(j));
+ ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector();
+
+ // Vectorization only supports PRIMITIVE data types. Assert the same
+ Assert.assertEquals(true, foi.getCategory() == Category.PRIMITIVE);
+
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
+ Object writableCol = poi.getPrimitiveWritableObject(fieldData);
+ if (writableCol != null) {
+ switch (poi.getPrimitiveCategory()) {
+ case SHORT: {
+ LongColumnVector lcv = (LongColumnVector) batch.cols[j];
+ Assert.assertEquals(true, lcv.vector[i] == ((ShortWritable) writableCol).get());
+ }
+ break;
+ case INT: {
+ LongColumnVector lcv = (LongColumnVector) batch.cols[j];
+ Assert.assertEquals(true, lcv.vector[i] == ((IntWritable) writableCol).get());
+ }
+ break;
+ case LONG: {
+ LongColumnVector lcv = (LongColumnVector) batch.cols[j];
+ Assert.assertEquals(true, lcv.vector[i] == ((LongWritable) writableCol).get());
+ }
+ break;
+ case FLOAT: {
+ DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[j];
+ Assert.assertEquals(true, dcv.vector[i] == ((FloatWritable) writableCol).get());
+ }
+ break;
+ case DOUBLE: {
+ DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[j];
+ Assert.assertEquals(true, dcv.vector[i] == ((DoubleWritable) writableCol).get());
+ }
+ break;
+ case STRING: {
+ BytesColumnVector bcv = (BytesColumnVector) batch.cols[j];
+ Text colText = (Text) writableCol;
+ Text batchText = (Text) bcv.getWritableObject(i);
+ String a = colText.toString();
+ String b = batchText.toString();
+ Assert.assertEquals(true, a.equals(b));
+ }
+ break;
+ default:
+ Assert.assertEquals("Unknown type", false);
+ }
+ } else {
+ Assert.assertEquals(true, batch.cols[j].isNull[i]);
+ }
+ }
+
+ // Check repeating
+ Assert.assertEquals(false, batch.cols[0].isRepeating);
+ Assert.assertEquals(false, batch.cols[1].isRepeating);
+ Assert.assertEquals(false, batch.cols[2].isRepeating);
+ Assert.assertEquals(false, batch.cols[3].isRepeating);
+ Assert.assertEquals(false, batch.cols[4].isRepeating);
+
+ // Check non null
+ Assert.assertEquals(true, batch.cols[0].noNulls);
+ Assert.assertEquals(false, batch.cols[1].noNulls);
+ Assert.assertEquals(false, batch.cols[2].noNulls);
+ Assert.assertEquals(false, batch.cols[3].noNulls);
+ Assert.assertEquals(false, batch.cols[4].noNulls);
+ }
+ reader.close();
+ }
+
+ @Test
+ public void TestCtx() throws Exception {
+ InitSerde();
+ WriteRCFile(this.fs, this.testFilePath, this.conf);
+ VectorizedRowBatch batch = GetRowBatch();
+ ValidateRowBatch(batch);
+ }
+}