You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/05/12 21:23:56 UTC
svn commit: r1481626 - in /hive/branches/vectorization:
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/
ql/src/java/org/apache/hadoop/hive/ql/io/
ql/src/java/org/apache/hadoop/hive/ql/io/orc/
ql/src/test/org/apache/hadoop/hive/ql/exec/vector/ serde/...
Author: hashutosh
Date: Sun May 12 19:23:55 2013
New Revision: 1481626
URL: http://svn.apache.org/r1481626
Log:
HIVE-4483 : Input format to read vector data from RC file (Sarvesh Sakalanaga via Ashutosh Chauhan)
Added:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
Modified:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java
hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java
hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java?rev=1481626&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java Sun May 12 19:23:55 2013
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+public class VectorizedBatchUtil {
+
+ /**
+ * Sets the IsNull value for ColumnVector at specified index
+ * @param cv
+ * @param rowIndex
+ */
+ public static void SetNullColIsNullValue(ColumnVector cv, int rowIndex) {
+ cv.isNull[rowIndex] = true;
+ if (cv.noNulls) {
+ cv.noNulls = false;
+ }
+ }
+
+ /**
+ * Iterates thru all the column vectors and sets noNull to
+ * specified value.
+ *
+ * @param valueToSet
+ * noNull value to set
+ * @param batch
+ * Batch on which noNull is set
+ */
+ public static void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) {
+ for (int i = 0; i < batch.numCols; i++) {
+ batch.cols[i].noNulls = true;
+ }
+ }
+
+ /**
+ * Iterates thru all the columns in a given row and populates the batch
+ * @param row Deserialized row object
+ * @param oi Object insepector for that row
+ * @param rowIndex index to which the row should be added to batch
+ * @param batch Vectorized batch to which the row is added at rowIndex
+ * @throws HiveException
+ */
+ public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIndex,
+ VectorizedRowBatch batch) throws HiveException {
+ List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+ // Iterate thru the cols and load the batch
+ for (int i = 0; i < fieldRefs.size(); i++) {
+ Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
+ ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector();
+
+ // Vectorization only supports PRIMITIVE data types. Assert the same
+ assert (foi.getCategory() == Category.PRIMITIVE);
+
+ // Get writable object
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
+ Object writableCol = poi.getPrimitiveWritableObject(fieldData);
+
+ // NOTE: The default value for null fields in vectorization is 1 for int types, NaN for
+ // float/double. String types have no default value for null.
+ switch (poi.getPrimitiveCategory()) {
+ case SHORT: {
+ LongColumnVector lcv = (LongColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ lcv.vector[rowIndex] = ((ShortWritable) writableCol).get();
+ lcv.isNull[rowIndex] = false;
+ } else {
+ lcv.vector[rowIndex] = 1;
+ SetNullColIsNullValue(lcv, rowIndex);
+ }
+ }
+ break;
+ case INT: {
+ LongColumnVector lcv = (LongColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ lcv.vector[rowIndex] = ((IntWritable) writableCol).get();
+ lcv.isNull[rowIndex] = false;
+ } else {
+ lcv.vector[rowIndex] = 1;
+ SetNullColIsNullValue(lcv, rowIndex);
+ }
+ }
+ break;
+ case LONG: {
+ LongColumnVector lcv = (LongColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ lcv.vector[rowIndex] = ((LongWritable) writableCol).get();
+ lcv.isNull[rowIndex] = false;
+ } else {
+ lcv.vector[rowIndex] = 1;
+ SetNullColIsNullValue(lcv, rowIndex);
+ }
+ }
+ break;
+ case FLOAT: {
+ DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ dcv.vector[rowIndex] = ((FloatWritable) writableCol).get();
+ dcv.isNull[rowIndex] = false;
+ } else {
+ dcv.vector[rowIndex] = Double.NaN;
+ SetNullColIsNullValue(dcv, rowIndex);
+ }
+ }
+ break;
+ case DOUBLE: {
+ DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get();
+ dcv.isNull[rowIndex] = false;
+ } else {
+ dcv.vector[rowIndex] = Double.NaN;
+ SetNullColIsNullValue(dcv, rowIndex);
+ }
+ }
+ break;
+ case STRING: {
+ BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
+ if (writableCol != null) {
+ bcv.isNull[rowIndex] = false;
+ Text colText = (Text) writableCol;
+ bcv.setRef(rowIndex, colText.getBytes(), 0, colText.getLength());
+ } else {
+ SetNullColIsNullValue(bcv, rowIndex);
+ }
+ }
+ break;
+ default:
+ throw new HiveException("Vectorizaton is not supported for datatype:"
+ + poi.getPrimitiveCategory());
+ }
+ }
+ }
+
+}
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java?rev=1481626&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java Sun May 12 19:23:55 2013
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazyLong;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * VectorizedColumnarSerDe is used by Vectorized query execution engine
+ * for columnar based storage supported by RCFile.
+ */
+public class VectorizedColumnarSerDe extends ColumnarSerDe implements VectorizedSerde {
+
+ public VectorizedColumnarSerDe() throws SerDeException {
+ }
+
+ private final BytesRefArrayWritable[] byteRefArray = new BytesRefArrayWritable[VectorizedRowBatch.DEFAULT_SIZE];
+ private final ObjectWritable ow = new ObjectWritable();
+ private final ByteStream.Output serializeVectorStream = new ByteStream.Output();
+
+ /**
+ * Serialize a vectorized row batch
+ *
+ * @param obj
+ * Vectorized row batch to serialize
+ * @param objInspector
+ * The ObjectInspector for the row object
+ * @return The serialized Writable object
+ * @throws SerDeException
+ * @see SerDe#serialize(Object, ObjectInspector)
+ */
+ @Override
+ public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
+ throws SerDeException {
+ try {
+ // Validate that the OI is of struct type
+ if (objInspector.getCategory() != Category.STRUCT) {
+ throw new UnsupportedOperationException(getClass().toString()
+ + " can only serialize struct types, but we got: "
+ + objInspector.getTypeName());
+ }
+
+ VectorizedRowBatch batch = (VectorizedRowBatch) vrg;
+ StructObjectInspector soi = (StructObjectInspector) objInspector;
+ List<? extends StructField> fields = soi.getAllStructFieldRefs();
+
+ // Reset the byte buffer
+ serializeVectorStream.reset();
+ int count = 0;
+ int rowIndex = 0;
+ for (int i = 0; i < batch.size; i++) {
+
+ // If selectedInUse is true then we need to serialize only
+ // the selected indexes
+ if (batch.selectedInUse) {
+ rowIndex = batch.selected[i];
+ } else {
+ rowIndex = i;
+ }
+
+ BytesRefArrayWritable byteRow = byteRefArray[i];
+ int numCols = fields.size();
+
+ if (byteRow == null) {
+ byteRow = new BytesRefArrayWritable(numCols);
+ byteRefArray[i] = byteRow;
+ }
+
+ byteRow.resetValid(numCols);
+
+ for (int k = 0; k < numCols; k++) {
+ ObjectInspector foi = fields.get(k).getFieldObjectInspector();
+ ColumnVector currentColVector = batch.cols[k];
+
+ switch (foi.getCategory()) {
+ case PRIMITIVE: {
+ PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
+ if (!currentColVector.noNulls
+ && (currentColVector.isRepeating || currentColVector.isNull[rowIndex])) {
+ // The column is null hence write null value
+ serializeVectorStream.write(new byte[0], 0, 0);
+ } else {
+ // If here then the vector value is not null.
+ if (currentColVector.isRepeating) {
+ // If the vector has repeating values then set rowindex to zero
+ rowIndex = 0;
+ }
+
+ switch (poi.getPrimitiveCategory()) {
+ case SHORT:
+ case INT:
+ case LONG:
+ LongColumnVector lcv = (LongColumnVector) batch.cols[k];
+ LazyLong.writeUTF8(serializeVectorStream, lcv.vector[rowIndex]);
+ break;
+ case FLOAT:
+ case DOUBLE:
+ DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[k];
+ ByteBuffer b = Text.encode(String.valueOf(dcv.vector[rowIndex]));
+ serializeVectorStream.write(b.array(), 0, b.limit());
+ break;
+ case STRING:
+ BytesColumnVector bcv = (BytesColumnVector) batch.cols[k];
+ LazyUtils.writeEscaped(serializeVectorStream, bcv.vector[rowIndex],
+ bcv.start[rowIndex],
+ bcv.length[rowIndex],
+ serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams
+ .getNeedsEscape());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Vectorizaton is not supported for datatype:"
+ + poi.getPrimitiveCategory());
+ }
+ }
+ break;
+ }
+ case LIST:
+ case MAP:
+ case STRUCT:
+ case UNION:
+ throw new UnsupportedOperationException("Vectorizaton is not supported for datatype:"
+ + foi.getCategory());
+ default:
+ throw new SerDeException("Unknown ObjectInspector category!");
+
+ }
+
+ byteRow.get(k).set(serializeVectorStream.getData(), count, serializeVectorStream
+ .getCount() - count);
+ count = serializeVectorStream.getCount();
+ }
+
+ }
+ ow.set(byteRefArray);
+ } catch (Exception e) {
+ throw new SerDeException(e);
+ }
+ return ow;
+ }
+
+ @Override
+ public SerDeStats getSerDeStats() {
+ return null;
+ }
+
+ @Override
+ public Class<? extends Writable> getSerializedClass() {
+ return BytesRefArrayWritable.class;
+ }
+
+ @Override
+ public Object deserialize(Writable blob) throws SerDeException {
+
+ // Ideally this should throw UnsupportedOperationException as the serde is
+ // vectorized serde. But since RC file reader does not support vectorized reading this
+ // is left as it is. This function will be called from VectorizedRowBatchCtx::AddRowToBatch
+ // to deserialize the row one by one and populate the batch. Once RC file reader supports vectorized
+ // reading this serde and be standalone serde with no dependency on ColumnarSerDe.
+ return super.deserialize(blob);
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return cachedObjectInspector;
+ }
+
+ @Override
+ public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Deserializes the rowBlob into Vectorized row batch
+ * @param rowBlob
+ * rowBlob row batch to deserialize
+ * @param rowsInBlob
+ * Total number of rows in rowBlob to deserialize
+ * @param reuseBatch
+ * VectorizedRowBatch to which the rows should be serialized *
+ * @throws SerDeException
+ */
+ @Override
+ public void deserializeVector(Object rowBlob, int rowsInBlob,
+ VectorizedRowBatch reuseBatch) throws SerDeException {
+
+ BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) rowBlob;
+ for (int i = 0; i < rowsInBlob; i++) {
+ Object row = deserialize(refArray[i]);
+ try {
+ VectorizedBatchUtil.AddRowToBatch(row, (StructObjectInspector) cachedObjectInspector, i, reuseBatch);
+ } catch (HiveException e) {
+ throw new SerDeException(e);
+ }
+ }
+ }
+}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Sun May 12 19:23:55 2013
@@ -33,19 +33,12 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
@@ -239,6 +232,9 @@ public class VectorizedRowBatchCtx {
return result;
}
+
+
+
/**
* Adds the row to the batch after deserializing the row
*
@@ -254,122 +250,25 @@ public class VectorizedRowBatchCtx {
public void AddRowToBatch(int rowIndex, Writable rowBlob, VectorizedRowBatch batch)
throws HiveException, SerDeException
{
- List<? extends StructField> fieldRefs = rawRowOI.getAllStructFieldRefs();
Object row = this.deserializer.deserialize(rowBlob);
- // Iterate thru the cols and load the batch
- for (int i = 0; i < fieldRefs.size(); i++) {
- Object fieldData = rawRowOI.getStructFieldData(row, fieldRefs.get(i));
- ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector();
-
- // Vectorization only supports PRIMITIVE data types. Assert the same
- assert (foi.getCategory() == Category.PRIMITIVE);
-
- // Get writable object
- PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
- Object writableCol = poi.getPrimitiveWritableObject(fieldData);
-
- // NOTE: The default value for null fields in vectorization is -1 for int types
- switch (poi.getPrimitiveCategory()) {
- case SHORT: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[i];
- if (writableCol != null) {
- lcv.vector[rowIndex] = ((ShortWritable) writableCol).get();
- lcv.isNull[rowIndex] = false;
- } else {
- lcv.vector[rowIndex] = 1;
- SetNullColIsNullValue(lcv, rowIndex);
- }
- }
- break;
- case INT: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[i];
- if (writableCol != null) {
- lcv.vector[rowIndex] = ((IntWritable) writableCol).get();
- lcv.isNull[rowIndex] = false;
- } else {
- lcv.vector[rowIndex] = 1;
- SetNullColIsNullValue(lcv, rowIndex);
- }
- }
- break;
- case LONG: {
- LongColumnVector lcv = (LongColumnVector) batch.cols[i];
- if (writableCol != null) {
- lcv.vector[rowIndex] = ((LongWritable) writableCol).get();
- lcv.isNull[rowIndex] = false;
- } else {
- lcv.vector[rowIndex] = 1;
- SetNullColIsNullValue(lcv, rowIndex);
- }
- }
- break;
- case FLOAT: {
- DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i];
- if (writableCol != null) {
- dcv.vector[rowIndex] = ((FloatWritable) writableCol).get();
- dcv.isNull[rowIndex] = false;
- } else {
- dcv.vector[rowIndex] = Double.NaN;
- SetNullColIsNullValue(dcv, rowIndex);
- }
- }
- break;
- case DOUBLE: {
- DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i];
- if (writableCol != null) {
- dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get();
- dcv.isNull[rowIndex] = false;
- } else {
- dcv.vector[rowIndex] = Double.NaN;
- SetNullColIsNullValue(dcv, rowIndex);
- }
- }
- break;
- case STRING: {
- BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
- if (writableCol != null) {
- bcv.isNull[rowIndex] = false;
- Text colText = (Text) writableCol;
- bcv.setRef(rowIndex, colText.getBytes(), 0, colText.getLength());
- } else {
- SetNullColIsNullValue(bcv, rowIndex);
- }
- }
- break;
- default:
- throw new HiveException("Vectorizaton is not supported for datatype:"
- + poi.getPrimitiveCategory());
- }
- }
+ VectorizedBatchUtil.AddRowToBatch(row, this.rawRowOI, rowIndex, batch);
}
/**
- * Iterates thru all the column vectors and sets noNull to
- * specified value.
+ * Deserialized set of rows and populates the batch
*
- * @param valueToSet
- * noNull value to set
+ * @param rowBlob
+ * to deserialize
* @param batch
- * Batch on which noNull is set
- */
- public void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) {
- for (int i = 0; i < batch.numCols; i++) {
- batch.cols[i].noNulls = true;
- }
- }
-
- /**
- * Deserialized set of rows and populates the batch
- * @param rowBlob to deserialize
- * @param batch Vectorized row batch which contains deserialized data
+ * Vectorized row batch which contains deserialized data
* @throws SerDeException
*/
- public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, VectorizedRowBatch batch)
+ public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob,
+ VectorizedRowBatch batch)
throws SerDeException {
if (deserializer instanceof VectorizedSerde) {
- batch = ((VectorizedSerde) deserializer).deserializeVector(rowBlob,
- deserializer.getObjectInspector(), batch);
+ ((VectorizedSerde) deserializer).deserializeVector(rowBlob, rowsInBlob, batch);
} else {
throw new SerDeException(
"Not able to deserialize row batch. Serde does not implement VectorizedSerde");
@@ -410,12 +309,4 @@ public class VectorizedRowBatchCtx {
}
}
}
-
- private void SetNullColIsNullValue(ColumnVector cv, int rowIndex) {
- cv.isNull[rowIndex] = true;
- if (cv.noNulls) {
- cv.noNulls = false;
- }
- }
-
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java Sun May 12 19:23:55 2013
@@ -27,7 +27,6 @@ public interface VectorizedSerde {
Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
throws SerDeException;
- VectorizedRowBatch deserializeVector(Object rowBlob, ObjectInspector objInspector,
- VectorizedRowBatch reuseBatch)
- throws SerDeException;
+ void deserializeVector(Object rowBlob, int rowsInBlob, VectorizedRowBatch reuseBatch)
+ throws SerDeException;
}
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java?rev=1481626&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java Sun May 12 19:23:55 2013
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * CommonRCFileInputFormat.
+ * Wrapper class that calls the correct input format for RC file base on
+ * HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED parameter
+ */
+public class CommonRCFileInputFormat extends FileInputFormat<Writable, Writable>
+ implements InputFormatChecker, VectorizedInputFormatInterface{
+
+ RCFileInputFormat<LongWritable, BytesRefArrayWritable> rcif =
+ new RCFileInputFormat<LongWritable, BytesRefArrayWritable>();
+ VectorizedRCFileInputFormat vrcif = new VectorizedRCFileInputFormat();
+
+ private static class CommonOrcRecordReader
+ implements RecordReader<Writable, Writable> {
+
+ final RecordReader<NullWritable, VectorizedRowBatch> vrcrr;
+ final RecordReader<LongWritable, BytesRefArrayWritable> rcrr;
+
+ public CommonOrcRecordReader(RecordReader<NullWritable, VectorizedRowBatch> vrcrr,
+ RecordReader<LongWritable, BytesRefArrayWritable> rcrr) {
+ this.vrcrr = vrcrr;
+ this.rcrr = rcrr;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (vrcrr != null) {
+ vrcrr.close();
+ } else {
+ rcrr.close();
+ }
+
+ }
+
+ @Override
+ public Writable createKey() {
+ if (vrcrr != null) {
+ return vrcrr.createKey();
+ } else {
+ return rcrr.createKey();
+ }
+ }
+
+ @Override
+ public Writable createValue() {
+ if (vrcrr != null) {
+ return vrcrr.createValue();
+ } else {
+ return rcrr.createValue();
+ }
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ if (vrcrr != null) {
+ return vrcrr.getPos();
+ } else {
+ return rcrr.getPos();
+ }
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (vrcrr != null) {
+ return vrcrr.getProgress();
+ } else {
+ return rcrr.getProgress();
+ }
+ }
+
+ @Override
+ public boolean next(Writable arg0, Writable arg1) throws IOException {
+ if (vrcrr != null) {
+ return vrcrr.next(NullWritable.get(), (VectorizedRowBatch) arg1);
+ } else {
+ LongWritable d = new LongWritable();
+ return rcrr.next(d, (BytesRefArrayWritable) arg1);
+ }
+ }
+
+ }
+
+ @Override
+ public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files)
+ throws IOException {
+ boolean vectorPath =
+ conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(), true);
+ if (vectorPath) {
+ return vrcif.validateInput(fs, conf, files);
+ } else {
+ return rcif.validateInput(fs, conf, files);
+ }
+ }
+
+ @Override
+ public RecordReader<Writable, Writable> getRecordReader(InputSplit split, JobConf conf,
+ Reporter reporter) throws IOException {
+ boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(), true);
+ if (vectorPath) {
+ RecordReader<NullWritable, VectorizedRowBatch> vrcrr = vrcif.getRecordReader(split, conf, reporter);
+ return new CommonOrcRecordReader(vrcrr, null);
+ } else {
+ RecordReader<LongWritable, BytesRefArrayWritable> rcrr = rcif.getRecordReader(split, conf, reporter);
+ return new CommonOrcRecordReader(null, rcrr);
+ }
+ }
+}
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java?rev=1481626&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java Sun May 12 19:23:55 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * A MapReduce/Hive Vectorized input format for RC files.
+ */
+public class VectorizedRCFileInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
+ implements InputFormatChecker {
+
+ public VectorizedRCFileInputFormat() {
+ setMinSplitSize(SequenceFile.SYNC_INTERVAL);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(InputSplit split, JobConf job,
+ Reporter reporter) throws IOException {
+
+ reporter.setStatus(split.toString());
+
+ return new VectorizedRCFileRecordReader(job, (FileSplit) split);
+ }
+
+ @Override
+ public boolean validateInput(FileSystem fs, HiveConf conf,
+ ArrayList<FileStatus> files) throws IOException {
+ if (files.size() <= 0) {
+ return false;
+ }
+ for (int fileId = 0; fileId < files.size(); fileId++) {
+ RCFile.Reader reader = null;
+ try {
+ reader = new RCFile.Reader(fs, files.get(fileId)
+ .getPath(), conf);
+ reader.close();
+ reader = null;
+ } catch (IOException e) {
+ return false;
+ } finally {
+ if (null != reader) {
+ reader.close();
+ }
+ }
+ }
+ return true;
+ }
+}
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java?rev=1481626&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java Sun May 12 19:23:55 2013
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer;
+import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * RCFileRecordReader.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class VectorizedRCFileRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> {
+
+ private final Reader in;
+ private final long start;
+ private final long end;
+ private boolean more = true;
+ protected Configuration conf;
+ private final FileSplit split;
+ private final boolean useCache;
+ private VectorizedRowBatchCtx rbCtx;
+ private final LongWritable keyCache = new LongWritable();
+ private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable();
+
+ private static RCFileSyncCache syncCache = new RCFileSyncCache();
+
+ private static final class RCFileSyncEntry {
+ long end;
+ long endSync;
+ }
+
+ private static final class RCFileSyncCache {
+
+ private final Map<String, RCFileSyncEntry> cache;
+
+ public RCFileSyncCache() {
+ cache = Collections.synchronizedMap(new WeakHashMap<String, RCFileSyncEntry>());
+ }
+
+ public void put(FileSplit split, long endSync) {
+ Path path = split.getPath();
+ long end = split.getStart() + split.getLength();
+ String key = path.toString() + "+" + String.format("%d", end);
+
+ RCFileSyncEntry entry = new RCFileSyncEntry();
+ entry.end = end;
+ entry.endSync = endSync;
+ if (entry.endSync >= entry.end) {
+ cache.put(key, entry);
+ }
+ }
+
+ public long get(FileSplit split) {
+ Path path = split.getPath();
+ long start = split.getStart();
+ String key = path.toString() + "+" + String.format("%d", start);
+ RCFileSyncEntry entry = cache.get(key);
+ if (entry != null) {
+ return entry.endSync;
+ }
+ return -1;
+ }
+ }
+
+ public VectorizedRCFileRecordReader(Configuration conf, FileSplit split)
+ throws IOException {
+
+ Path path = split.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ this.in = new RCFile.Reader(fs, path, conf);
+ this.end = split.getStart() + split.getLength();
+ this.conf = conf;
+ this.split = split;
+
+ useCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSERCFILESYNCCACHE);
+
+ if (split.getStart() > in.getPosition()) {
+ long oldSync = useCache ? syncCache.get(split) : -1;
+ if (oldSync == -1) {
+ in.sync(split.getStart()); // sync to start
+ } else {
+ in.seek(oldSync);
+ }
+ }
+
+ this.start = in.getPosition();
+
+ more = start < end;
+ try {
+ rbCtx = new VectorizedRowBatchCtx();
+ rbCtx.Init(conf, split);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Class<?> getKeyClass() {
+ return LongWritable.class;
+ }
+
+ public Class<?> getValueClass() {
+ return BytesRefArrayWritable.class;
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public VectorizedRowBatch createValue() {
+ VectorizedRowBatch result = null;
+ try {
+ result = rbCtx.CreateVectorizedRowBatch();
+ // Since the record reader works only on one split and
+ // given a split the partition cannot change, we are setting the partition
+ // values only once during batch creation
+ rbCtx.AddPartitionColsToBatch(result);
+ } catch (HiveException e) {
+ new RuntimeException("Error creating a batch", e);
+ }
+ return result;
+ }
+
+ public boolean nextBlock() throws IOException {
+ return in.nextBlock();
+ }
+
+ @Override
+ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
+
+ // Reset column fields noNull values to true
+ VectorizedBatchUtil.SetNoNullFields(true, value);
+ int i = 0;
+ try {
+ for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) {
+ more = next(keyCache);
+ if (more) {
+ in.getCurrentRow(colsCache);
+ // Currently RCFile reader does not support reading vectorized
+ // data. Populating the batch by adding one row at a time.
+ rbCtx.AddRowToBatch(i, (Writable) colsCache, value);
+ } else {
+ break;
+ }
+ }
+ } catch (Exception e) {
+ new RuntimeException("Error while getting next row", e);
+ }
+ value.size = i;
+ return more;
+ }
+
+ protected boolean next(LongWritable key) throws IOException {
+ if (!more) {
+ return false;
+ }
+
+ more = in.next(key);
+
+ long lastSeenSyncPos = in.lastSeenSyncPos();
+
+ if (lastSeenSyncPos >= end) {
+ if (useCache) {
+ syncCache.put(split, lastSeenSyncPos);
+ }
+ more = false;
+ return more;
+ }
+ return more;
+ }
+
+ /**
+ * Return the progress within the input split.
+ *
+ * @return 0.0 to 1.0 of the input byte range
+ */
+ public float getProgress() throws IOException {
+ if (end == start) {
+ return 0.0f;
+ } else {
+ return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start));
+ }
+ }
+
+ public long getPos() throws IOException {
+ return in.getPosition();
+ }
+
+ public KeyBuffer getKeyBuffer() {
+ return in.getCurrentKeyBufferObj();
+ }
+
+ protected void seek(long pos) throws IOException {
+ in.seek(pos);
+ }
+
+ public void sync(long pos) throws IOException {
+ in.sync(pos);
+ }
+
+ public void resetBuffer() {
+ in.resetBuffer();
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public void close() throws IOException {
+ in.close();
+ }
+}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Sun May 12 19:23:55 2013
@@ -78,7 +78,7 @@ public class OrcSerde implements SerDe,
// Parse the configuration parameters
ArrayList<String> columnNames = new ArrayList<String>();
if (columnNameProperty != null && columnNameProperty.length() > 0) {
- for(String name: columnNameProperty.split(",")) {
+ for (String name : columnNameProperty.split(",")) {
columnNames.add(name);
}
}
@@ -95,7 +95,7 @@ public class OrcSerde implements SerDe,
}
ArrayList<TypeInfo> fieldTypes =
- TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
StructTypeInfo rootType = new StructTypeInfo();
rootType.setAllStructFieldNames(columnNames);
rootType.setAllStructFieldTypeInfos(fieldTypes);
@@ -127,6 +127,7 @@ public class OrcSerde implements SerDe,
/**
* Always returns null, since serialized size doesn't make sense in the
* context of ORC files.
+ *
* @return null
*/
@Override
@@ -144,8 +145,8 @@ public class OrcSerde implements SerDe,
}
@Override
- public VectorizedRowBatch deserializeVector(Object rowBlob, ObjectInspector objInspector,
- VectorizedRowBatch reuseBatch) throws SerDeException {
- return ((VectorizedRowBatch) rowBlob);
+ public void deserializeVector(Object rowBlob, int rowsInBatch, VectorizedRowBatch reuseBatch)
+ throws SerDeException {
+ // nothing to do here
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Sun May 12 19:23:55 2013
@@ -80,7 +80,7 @@ public class VectorizedOrcInputFormat ex
}
reader.nextBatch(value);
try {
- rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value);
+ rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value.size, value);
} catch (SerDeException e) {
new RuntimeException(e);
}
Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java Sun May 12 19:23:55 2013
@@ -46,7 +46,9 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.junit.Before;
import org.junit.Test;
@@ -147,7 +149,7 @@ public class TestVectorizedRowBatchCtx {
bytes.set(4, cu);
cu = new BytesRefWritable(("Test string").getBytes("UTF-8"), 0,
- ("NULL").getBytes("UTF-8").length);
+ ("Test string").getBytes("UTF-8").length);
bytes.set(5, cu);
}
writer.append(bytes);
@@ -169,7 +171,7 @@ public class TestVectorizedRowBatchCtx {
// Create the context
VectorizedRowBatchCtx ctx = new VectorizedRowBatchCtx(oi, oi, serDe, null);
VectorizedRowBatch batch = ctx.CreateVectorizedRowBatch();
- ctx.SetNoNullFields(true, batch);
+ VectorizedBatchUtil.SetNoNullFields(true, batch);
// Iterate thru the rows and populate the batch
LongWritable rowID = new LongWritable();
@@ -272,9 +274,19 @@ public class TestVectorizedRowBatchCtx {
@Test
public void TestCtx() throws Exception {
+
InitSerde();
WriteRCFile(this.fs, this.testFilePath, this.conf);
VectorizedRowBatch batch = GetRowBatch();
ValidateRowBatch(batch);
+
+ // Test VectorizedColumnarSerDe
+ VectorizedColumnarSerDe vcs = new VectorizedColumnarSerDe();
+ vcs.initialize(this.conf, tbl);
+ Writable w = vcs.serializeVector(batch, (StructObjectInspector) serDe
+ .getObjectInspector());
+ BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[])((ObjectWritable)w).get();
+ vcs.deserializeVector(refArray, 10, batch);
+ ValidateRowBatch(batch);
}
}
Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java Sun May 12 19:23:55 2013
@@ -26,11 +26,9 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
@@ -72,13 +70,14 @@ public class ColumnarSerDe extends Colum
public ColumnarSerDe() throws SerDeException {
}
- SerDeParameters serdeParams = null;
+ protected SerDeParameters serdeParams = null;
/**
* Initialize the SerDe given the parameters.
*
* @see SerDe#initialize(Configuration, Properties)
*/
+ @Override
public void initialize(Configuration job, Properties tbl) throws SerDeException {
serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, getClass().getName());
@@ -114,6 +113,7 @@ public class ColumnarSerDe extends Colum
* @return The serialized Writable object
* @see SerDe#serialize(Object, ObjectInspector)
*/
+ @Override
public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
if (objInspector.getCategory() != Category.STRUCT) {
Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java Sun May 12 19:23:55 2013
@@ -27,18 +27,18 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;
-import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
@@ -141,7 +141,7 @@ public final class LazyUtils {
* if escaped, whether a specific character needs escaping. This
* array should have size of 128.
*/
- private static void writeEscaped(OutputStream out, byte[] bytes, int start,
+ public static void writeEscaped(OutputStream out, byte[] bytes, int start,
int len, boolean escaped, byte escapeChar, boolean[] needsEscape)
throws IOException {
if (escaped) {