You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/05/12 21:23:56 UTC

svn commit: r1481626 - in /hive/branches/vectorization: ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/ serde/...

Author: hashutosh
Date: Sun May 12 19:23:55 2013
New Revision: 1481626

URL: http://svn.apache.org/r1481626
Log:
HIVE-4483 : Input format to read vector data from RC file (Sarvesh Sakalanaga via Ashutosh Chauhan)

Added:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
Modified:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
    hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java
    hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java
    hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java?rev=1481626&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java Sun May 12 19:23:55 2013
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+public class VectorizedBatchUtil {
+
+  /**
+   * Sets the IsNull value for ColumnVector at specified index
+   * @param cv
+   * @param rowIndex
+   */
+  public static void SetNullColIsNullValue(ColumnVector cv, int rowIndex) {
+    cv.isNull[rowIndex] = true;
+    if (cv.noNulls) {
+      cv.noNulls = false;
+    }
+  }
+
+  /**
+   * Iterates thru all the column vectors and sets noNull to
+   * specified value.
+   *
+   * @param valueToSet
+   *          noNull value to set
+   * @param batch
+   *          Batch on which noNull is set
+   */
+  public static void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) {
+    for (int i = 0; i < batch.numCols; i++) {
+      batch.cols[i].noNulls = true;
+    }
+  }
+
+  /**
+   * Iterates thru all the columns in a given row and populates the batch
+   * @param row Deserialized row object
+   * @param oi Object insepector for that row
+   * @param rowIndex index to which the row should be added to batch
+   * @param batch Vectorized batch to which the row is added at rowIndex
+   * @throws HiveException
+   */
+  public static void AddRowToBatch(Object row, StructObjectInspector oi, int rowIndex,
+      VectorizedRowBatch batch) throws HiveException {
+    List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+    // Iterate thru the cols and load the batch
+    for (int i = 0; i < fieldRefs.size(); i++) {
+      Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
+      ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector();
+
+      // Vectorization only supports PRIMITIVE data types. Assert the same
+      assert (foi.getCategory() == Category.PRIMITIVE);
+
+      // Get writable object
+      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
+      Object writableCol = poi.getPrimitiveWritableObject(fieldData);
+
+      // NOTE: The default value for null fields in vectorization is 1 for int types, NaN for
+      // float/double. String types have no default value for null.
+      switch (poi.getPrimitiveCategory()) {
+      case SHORT: {
+        LongColumnVector lcv = (LongColumnVector) batch.cols[i];
+        if (writableCol != null) {
+          lcv.vector[rowIndex] = ((ShortWritable) writableCol).get();
+          lcv.isNull[rowIndex] = false;
+        } else {
+          lcv.vector[rowIndex] = 1;
+          SetNullColIsNullValue(lcv, rowIndex);
+        }
+      }
+        break;
+      case INT: {
+        LongColumnVector lcv = (LongColumnVector) batch.cols[i];
+        if (writableCol != null) {
+          lcv.vector[rowIndex] = ((IntWritable) writableCol).get();
+          lcv.isNull[rowIndex] = false;
+        } else {
+          lcv.vector[rowIndex] = 1;
+          SetNullColIsNullValue(lcv, rowIndex);
+        }
+      }
+        break;
+      case LONG: {
+        LongColumnVector lcv = (LongColumnVector) batch.cols[i];
+        if (writableCol != null) {
+          lcv.vector[rowIndex] = ((LongWritable) writableCol).get();
+          lcv.isNull[rowIndex] = false;
+        } else {
+          lcv.vector[rowIndex] = 1;
+          SetNullColIsNullValue(lcv, rowIndex);
+        }
+      }
+        break;
+      case FLOAT: {
+        DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i];
+        if (writableCol != null) {
+          dcv.vector[rowIndex] = ((FloatWritable) writableCol).get();
+          dcv.isNull[rowIndex] = false;
+        } else {
+          dcv.vector[rowIndex] = Double.NaN;
+          SetNullColIsNullValue(dcv, rowIndex);
+        }
+      }
+        break;
+      case DOUBLE: {
+        DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i];
+        if (writableCol != null) {
+          dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get();
+          dcv.isNull[rowIndex] = false;
+        } else {
+          dcv.vector[rowIndex] = Double.NaN;
+          SetNullColIsNullValue(dcv, rowIndex);
+        }
+      }
+        break;
+      case STRING: {
+        BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
+        if (writableCol != null) {
+          bcv.isNull[rowIndex] = false;
+          Text colText = (Text) writableCol;
+          bcv.setRef(rowIndex, colText.getBytes(), 0, colText.getLength());
+        } else {
+          SetNullColIsNullValue(bcv, rowIndex);
+        }
+      }
+        break;
+      default:
+        throw new HiveException("Vectorizaton is not supported for datatype:"
+            + poi.getPrimitiveCategory());
+      }
+    }
+  }
+
+}

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java?rev=1481626&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java Sun May 12 19:23:55 2013
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazyLong;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * VectorizedColumnarSerDe is used by Vectorized query execution engine
+ * for columnar based storage supported by RCFile.
+ */
+public class VectorizedColumnarSerDe extends ColumnarSerDe implements VectorizedSerde {
+
+  public VectorizedColumnarSerDe() throws SerDeException {
+  }
+
+  private final BytesRefArrayWritable[] byteRefArray = new BytesRefArrayWritable[VectorizedRowBatch.DEFAULT_SIZE];
+  private final ObjectWritable ow = new ObjectWritable();
+  private final ByteStream.Output serializeVectorStream = new ByteStream.Output();
+
+  /**
+   * Serialize a vectorized row batch
+   *
+   * @param obj
+   *          Vectorized row batch to serialize
+   * @param objInspector
+   *          The ObjectInspector for the row object
+   * @return The serialized Writable object
+   * @throws SerDeException
+   * @see SerDe#serialize(Object, ObjectInspector)
+   */
+  @Override
+  public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
+      throws SerDeException {
+    try {
+      // Validate that the OI is of struct type
+      if (objInspector.getCategory() != Category.STRUCT) {
+        throw new UnsupportedOperationException(getClass().toString()
+            + " can only serialize struct types, but we got: "
+            + objInspector.getTypeName());
+      }
+
+      VectorizedRowBatch batch = (VectorizedRowBatch) vrg;
+      StructObjectInspector soi = (StructObjectInspector) objInspector;
+      List<? extends StructField> fields = soi.getAllStructFieldRefs();
+
+      // Reset the byte buffer
+      serializeVectorStream.reset();
+      int count = 0;
+      int rowIndex = 0;
+      for (int i = 0; i < batch.size; i++) {
+
+        // If selectedInUse is true then we need to serialize only
+        // the selected indexes
+        if (batch.selectedInUse) {
+          rowIndex = batch.selected[i];
+        } else {
+          rowIndex = i;
+        }
+
+        BytesRefArrayWritable byteRow = byteRefArray[i];
+        int numCols = fields.size();
+
+        if (byteRow == null) {
+          byteRow = new BytesRefArrayWritable(numCols);
+          byteRefArray[i] = byteRow;
+        }
+
+        byteRow.resetValid(numCols);
+
+        for (int k = 0; k < numCols; k++) {
+          ObjectInspector foi = fields.get(k).getFieldObjectInspector();
+          ColumnVector currentColVector = batch.cols[k];
+
+          switch (foi.getCategory()) {
+          case PRIMITIVE: {
+            PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
+            if (!currentColVector.noNulls
+                && (currentColVector.isRepeating || currentColVector.isNull[rowIndex])) {
+              // The column is null hence write null value
+              serializeVectorStream.write(new byte[0], 0, 0);
+            } else {
+              // If here then the vector value is not null.
+              if (currentColVector.isRepeating) {
+                // If the vector has repeating values then set rowindex to zero
+                rowIndex = 0;
+              }
+
+              switch (poi.getPrimitiveCategory()) {
+              case SHORT:
+              case INT:
+              case LONG:
+                LongColumnVector lcv = (LongColumnVector) batch.cols[k];
+                LazyLong.writeUTF8(serializeVectorStream, lcv.vector[rowIndex]);
+                break;
+              case FLOAT:
+              case DOUBLE:
+                DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[k];
+                ByteBuffer b = Text.encode(String.valueOf(dcv.vector[rowIndex]));
+                serializeVectorStream.write(b.array(), 0, b.limit());
+                break;
+              case STRING:
+                BytesColumnVector bcv = (BytesColumnVector) batch.cols[k];
+                LazyUtils.writeEscaped(serializeVectorStream, bcv.vector[rowIndex],
+                    bcv.start[rowIndex],
+                    bcv.length[rowIndex],
+                    serdeParams.isEscaped(), serdeParams.getEscapeChar(), serdeParams
+                        .getNeedsEscape());
+                break;
+              default:
+                throw new UnsupportedOperationException(
+                    "Vectorizaton is not supported for datatype:"
+                        + poi.getPrimitiveCategory());
+              }
+            }
+            break;
+          }
+          case LIST:
+          case MAP:
+          case STRUCT:
+          case UNION:
+            throw new UnsupportedOperationException("Vectorizaton is not supported for datatype:"
+                + foi.getCategory());
+          default:
+            throw new SerDeException("Unknown ObjectInspector category!");
+
+          }
+
+          byteRow.get(k).set(serializeVectorStream.getData(), count, serializeVectorStream
+              .getCount() - count);
+          count = serializeVectorStream.getCount();
+        }
+
+      }
+      ow.set(byteRefArray);
+    } catch (Exception e) {
+      throw new SerDeException(e);
+    }
+    return ow;
+  }
+
+  @Override
+  public SerDeStats getSerDeStats() {
+    return null;
+  }
+
+  @Override
+  public Class<? extends Writable> getSerializedClass() {
+    return BytesRefArrayWritable.class;
+  }
+
+  @Override
+  public Object deserialize(Writable blob) throws SerDeException {
+
+    // Ideally this should throw  UnsupportedOperationException as the serde is
+    // vectorized serde. But since RC file reader does not support vectorized reading this
+    // is left as it is. This function will be called from VectorizedRowBatchCtx::AddRowToBatch
+    // to deserialize the row one by one and populate the batch. Once RC file reader supports vectorized
+    // reading this serde and be standalone serde with no dependency on ColumnarSerDe.
+    return super.deserialize(blob);
+  }
+
+  @Override
+  public ObjectInspector getObjectInspector() throws SerDeException {
+    return cachedObjectInspector;
+  }
+
+  @Override
+  public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Deserializes the rowBlob into Vectorized row batch
+   * @param rowBlob
+   *          rowBlob row batch to deserialize
+   * @param rowsInBlob
+   *          Total number of rows in rowBlob to deserialize
+   * @param reuseBatch
+   *          VectorizedRowBatch to which the rows should be serialized   *
+   * @throws SerDeException
+   */
+  @Override
+  public void deserializeVector(Object rowBlob, int rowsInBlob,
+      VectorizedRowBatch reuseBatch) throws SerDeException {
+
+    BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) rowBlob;
+    for (int i = 0; i < rowsInBlob; i++) {
+      Object row = deserialize(refArray[i]);
+      try {
+        VectorizedBatchUtil.AddRowToBatch(row, (StructObjectInspector) cachedObjectInspector, i, reuseBatch);
+      } catch (HiveException e) {
+        throw new SerDeException(e);
+      }
+    }
+  }
+}

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Sun May 12 19:23:55 2013
@@ -33,19 +33,12 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileSplit;
 
@@ -239,6 +232,9 @@ public class VectorizedRowBatchCtx {
     return result;
   }
 
+
+
+
   /**
    * Adds the row to the batch after deserializing the row
    *
@@ -254,122 +250,25 @@ public class VectorizedRowBatchCtx {
   public void AddRowToBatch(int rowIndex, Writable rowBlob, VectorizedRowBatch batch)
       throws HiveException, SerDeException
   {
-    List<? extends StructField> fieldRefs = rawRowOI.getAllStructFieldRefs();
     Object row = this.deserializer.deserialize(rowBlob);
-    // Iterate thru the cols and load the batch
-    for (int i = 0; i < fieldRefs.size(); i++) {
-      Object fieldData = rawRowOI.getStructFieldData(row, fieldRefs.get(i));
-      ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector();
-
-      // Vectorization only supports PRIMITIVE data types. Assert the same
-      assert (foi.getCategory() == Category.PRIMITIVE);
-
-      // Get writable object
-      PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi;
-      Object writableCol = poi.getPrimitiveWritableObject(fieldData);
-
-      // NOTE: The default value for null fields in vectorization is -1 for int types
-      switch (poi.getPrimitiveCategory()) {
-      case SHORT: {
-        LongColumnVector lcv = (LongColumnVector) batch.cols[i];
-        if (writableCol != null) {
-          lcv.vector[rowIndex] = ((ShortWritable) writableCol).get();
-          lcv.isNull[rowIndex] = false;
-        } else {
-          lcv.vector[rowIndex] = 1;
-          SetNullColIsNullValue(lcv, rowIndex);
-        }
-      }
-        break;
-      case INT: {
-        LongColumnVector lcv = (LongColumnVector) batch.cols[i];
-        if (writableCol != null) {
-          lcv.vector[rowIndex] = ((IntWritable) writableCol).get();
-          lcv.isNull[rowIndex] = false;
-        } else {
-          lcv.vector[rowIndex] = 1;
-          SetNullColIsNullValue(lcv, rowIndex);
-        }
-      }
-        break;
-      case LONG: {
-        LongColumnVector lcv = (LongColumnVector) batch.cols[i];
-        if (writableCol != null) {
-          lcv.vector[rowIndex] = ((LongWritable) writableCol).get();
-          lcv.isNull[rowIndex] = false;
-        } else {
-          lcv.vector[rowIndex] = 1;
-          SetNullColIsNullValue(lcv, rowIndex);
-        }
-      }
-        break;
-      case FLOAT: {
-        DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i];
-        if (writableCol != null) {
-          dcv.vector[rowIndex] = ((FloatWritable) writableCol).get();
-          dcv.isNull[rowIndex] = false;
-        } else {
-          dcv.vector[rowIndex] = Double.NaN;
-          SetNullColIsNullValue(dcv, rowIndex);
-        }
-      }
-        break;
-      case DOUBLE: {
-        DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[i];
-        if (writableCol != null) {
-          dcv.vector[rowIndex] = ((DoubleWritable) writableCol).get();
-          dcv.isNull[rowIndex] = false;
-        } else {
-          dcv.vector[rowIndex] = Double.NaN;
-          SetNullColIsNullValue(dcv, rowIndex);
-        }
-      }
-        break;
-      case STRING: {
-        BytesColumnVector bcv = (BytesColumnVector) batch.cols[i];
-        if (writableCol != null) {
-          bcv.isNull[rowIndex] = false;
-          Text colText = (Text) writableCol;
-          bcv.setRef(rowIndex, colText.getBytes(), 0, colText.getLength());
-        } else {
-          SetNullColIsNullValue(bcv, rowIndex);
-        }
-      }
-        break;
-      default:
-        throw new HiveException("Vectorizaton is not supported for datatype:"
-            + poi.getPrimitiveCategory());
-      }
-    }
+    VectorizedBatchUtil.AddRowToBatch(row, this.rawRowOI, rowIndex, batch);
   }
 
   /**
-   * Iterates thru all the column vectors and sets noNull to
-   * specified value.
+   * Deserialized set of rows and populates the batch
    *
-   * @param valueToSet
-   *          noNull value to set
+   * @param rowBlob
+   *          to deserialize
    * @param batch
-   *          Batch on which noNull is set
-   */
-  public void SetNoNullFields(boolean valueToSet, VectorizedRowBatch batch) {
-    for (int i = 0; i < batch.numCols; i++) {
-      batch.cols[i].noNulls = true;
-    }
-  }
-
-  /**
-   * Deserialized set of rows and populates the batch
-   * @param rowBlob to deserialize
-   * @param batch Vectorized row batch which contains deserialized data
+   *          Vectorized row batch which contains deserialized data
    * @throws SerDeException
    */
-  public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, VectorizedRowBatch batch)
+  public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, int rowsInBlob,
+      VectorizedRowBatch batch)
       throws SerDeException {
 
     if (deserializer instanceof VectorizedSerde) {
-      batch = ((VectorizedSerde) deserializer).deserializeVector(rowBlob,
-          deserializer.getObjectInspector(), batch);
+      ((VectorizedSerde) deserializer).deserializeVector(rowBlob, rowsInBlob, batch);
     } else {
       throw new SerDeException(
           "Not able to deserialize row batch. Serde does not implement VectorizedSerde");
@@ -410,12 +309,4 @@ public class VectorizedRowBatchCtx {
       }
     }
   }
-
-  private void SetNullColIsNullValue(ColumnVector cv, int rowIndex) {
-    cv.isNull[rowIndex] = true;
-    if (cv.noNulls) {
-      cv.noNulls = false;
-    }
-  }
-
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java Sun May 12 19:23:55 2013
@@ -27,7 +27,6 @@ public interface VectorizedSerde {
   Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
       throws SerDeException;
 
-  VectorizedRowBatch deserializeVector(Object rowBlob, ObjectInspector objInspector,
-      VectorizedRowBatch reuseBatch)
-      throws SerDeException;
+  void deserializeVector(Object rowBlob, int rowsInBlob, VectorizedRowBatch reuseBatch)
+      throws SerDeException;      
 }

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java?rev=1481626&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java Sun May 12 19:23:55 2013
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * CommonRCFileInputFormat.
+ * Wrapper class that calls the correct input format for RC file base on
+ * HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED parameter
+ */
+public class CommonRCFileInputFormat extends FileInputFormat<Writable, Writable>
+    implements InputFormatChecker, VectorizedInputFormatInterface{
+
+  RCFileInputFormat<LongWritable, BytesRefArrayWritable> rcif =
+      new RCFileInputFormat<LongWritable, BytesRefArrayWritable>();
+  VectorizedRCFileInputFormat vrcif = new VectorizedRCFileInputFormat();
+
+  private static class CommonOrcRecordReader
+      implements RecordReader<Writable, Writable> {
+
+    final RecordReader<NullWritable, VectorizedRowBatch> vrcrr;
+    final RecordReader<LongWritable, BytesRefArrayWritable> rcrr;
+
+    public CommonOrcRecordReader(RecordReader<NullWritable, VectorizedRowBatch> vrcrr,
+        RecordReader<LongWritable, BytesRefArrayWritable> rcrr) {
+      this.vrcrr = vrcrr;
+      this.rcrr = rcrr;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (vrcrr != null) {
+        vrcrr.close();
+      } else {
+        rcrr.close();
+      }
+
+    }
+
+    @Override
+    public Writable createKey() {
+      if (vrcrr != null) {
+        return vrcrr.createKey();
+      } else {
+        return rcrr.createKey();
+      }
+    }
+
+    @Override
+    public Writable createValue() {
+      if (vrcrr != null) {
+        return vrcrr.createValue();
+      } else {
+        return rcrr.createValue();
+      }
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      if (vrcrr != null) {
+        return vrcrr.getPos();
+      } else {
+        return rcrr.getPos();
+      }
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      if (vrcrr != null) {
+        return vrcrr.getProgress();
+      } else {
+        return rcrr.getProgress();
+      }
+    }
+
+    @Override
+    public boolean next(Writable arg0, Writable arg1) throws IOException {
+      if (vrcrr != null) {
+        return vrcrr.next(NullWritable.get(), (VectorizedRowBatch) arg1);
+      } else {
+        LongWritable d = new LongWritable();
+        return rcrr.next(d, (BytesRefArrayWritable) arg1);
+      }
+    }
+
+  }
+
+  @Override
+  public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files)
+      throws IOException {
+    boolean vectorPath =
+        conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(), true);
+    if (vectorPath) {
+      return vrcif.validateInput(fs, conf, files);
+    } else {
+      return rcif.validateInput(fs, conf, files);
+    }
+  }
+
+  @Override
+  public RecordReader<Writable, Writable> getRecordReader(InputSplit split, JobConf conf,
+      Reporter reporter) throws IOException {
+    boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(), true);
+    if (vectorPath) {
+      RecordReader<NullWritable, VectorizedRowBatch> vrcrr = vrcif.getRecordReader(split, conf, reporter);
+      return new CommonOrcRecordReader(vrcrr, null);
+    } else {
+      RecordReader<LongWritable, BytesRefArrayWritable> rcrr = rcif.getRecordReader(split, conf, reporter);
+      return new CommonOrcRecordReader(null, rcrr);
+    }
+  }
+}

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java?rev=1481626&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java Sun May 12 19:23:55 2013
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * A MapReduce/Hive Vectorized input format for RC files.
+ */
+public class VectorizedRCFileInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
+  implements InputFormatChecker {
+
+  public VectorizedRCFileInputFormat() {
+    setMinSplitSize(SequenceFile.SYNC_INTERVAL);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(InputSplit split, JobConf job,
+      Reporter reporter) throws IOException {
+
+    reporter.setStatus(split.toString());
+
+    return new VectorizedRCFileRecordReader(job, (FileSplit) split);
+  }
+
+  @Override
+  public boolean validateInput(FileSystem fs, HiveConf conf,
+      ArrayList<FileStatus> files) throws IOException {
+    if (files.size() <= 0) {
+      return false;
+    }
+    for (int fileId = 0; fileId < files.size(); fileId++) {
+      RCFile.Reader reader = null;
+      try {
+        reader = new RCFile.Reader(fs, files.get(fileId)
+            .getPath(), conf);
+        reader.close();
+        reader = null;
+      } catch (IOException e) {
+        return false;
+      } finally {
+        if (null != reader) {
+          reader.close();
+        }
+      }
+    }
+    return true;
+  }
+}

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java?rev=1481626&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java Sun May 12 19:23:55 2013
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
+import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer;
+import org.apache.hadoop.hive.ql.io.RCFile.Reader;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * RCFileRecordReader.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class VectorizedRCFileRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> {
+
+  private final Reader in;
+  private final long start;
+  private final long end;
+  private boolean more = true;
+  protected Configuration conf;
+  private final FileSplit split;
+  private final boolean useCache;
+  private VectorizedRowBatchCtx rbCtx;
+  private final LongWritable keyCache = new LongWritable();
+  private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable();
+
+  private static RCFileSyncCache syncCache = new RCFileSyncCache();
+
+  private static final class RCFileSyncEntry {
+    long end;
+    long endSync;
+  }
+
+  private static final class RCFileSyncCache {
+
+    private final Map<String, RCFileSyncEntry> cache;
+
+    public RCFileSyncCache() {
+      cache = Collections.synchronizedMap(new WeakHashMap<String, RCFileSyncEntry>());
+    }
+
+    public void put(FileSplit split, long endSync) {
+      Path path = split.getPath();
+      long end = split.getStart() + split.getLength();
+      String key = path.toString() + "+" + String.format("%d", end);
+
+      RCFileSyncEntry entry = new RCFileSyncEntry();
+      entry.end = end;
+      entry.endSync = endSync;
+      if (entry.endSync >= entry.end) {
+        cache.put(key, entry);
+      }
+    }
+
+    public long get(FileSplit split) {
+      Path path = split.getPath();
+      long start = split.getStart();
+      String key = path.toString() + "+" + String.format("%d", start);
+      RCFileSyncEntry entry = cache.get(key);
+      if (entry != null) {
+        return entry.endSync;
+      }
+      return -1;
+    }
+  }
+
+  public VectorizedRCFileRecordReader(Configuration conf, FileSplit split)
+      throws IOException {
+
+    Path path = split.getPath();
+    FileSystem fs = path.getFileSystem(conf);
+    this.in = new RCFile.Reader(fs, path, conf);
+    this.end = split.getStart() + split.getLength();
+    this.conf = conf;
+    this.split = split;
+
+    useCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSERCFILESYNCCACHE);
+
+    if (split.getStart() > in.getPosition()) {
+      long oldSync = useCache ? syncCache.get(split) : -1;
+      if (oldSync == -1) {
+        in.sync(split.getStart()); // sync to start
+      } else {
+        in.seek(oldSync);
+      }
+    }
+
+    this.start = in.getPosition();
+
+    more = start < end;
+    try {
+      rbCtx = new VectorizedRowBatchCtx();
+      rbCtx.Init(conf, split);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public Class<?> getKeyClass() {
+    return LongWritable.class;
+  }
+
+  public Class<?> getValueClass() {
+    return BytesRefArrayWritable.class;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return NullWritable.get();
+  }
+
+  @Override
+  public VectorizedRowBatch createValue() {
+    VectorizedRowBatch result = null;
+    try {
+      result = rbCtx.CreateVectorizedRowBatch();
+      // Since the record reader works only on one split and
+      // given a split the partition cannot change, we are setting the partition
+      // values only once during batch creation
+      rbCtx.AddPartitionColsToBatch(result);
+    } catch (HiveException e) {
+      new RuntimeException("Error creating a batch", e);
+    }
+    return result;
+  }
+
+  public boolean nextBlock() throws IOException {
+    return in.nextBlock();
+  }
+
+  @Override
+  public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
+
+    // Reset column fields noNull values to true
+    VectorizedBatchUtil.SetNoNullFields(true, value);
+    int i = 0;
+    try {
+      for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) {
+        more = next(keyCache);
+        if (more) {
+          in.getCurrentRow(colsCache);
+          // Currently RCFile reader does not support reading vectorized
+          // data. Populating the batch by adding one row at a time.
+          rbCtx.AddRowToBatch(i, (Writable) colsCache, value);
+        } else {
+          break;
+        }
+      }
+    } catch (Exception e) {
+      new RuntimeException("Error while getting next row", e);
+    }
+    value.size = i;
+    return more;
+  }
+
+  protected boolean next(LongWritable key) throws IOException {
+    if (!more) {
+      return false;
+    }
+
+    more = in.next(key);
+
+    long lastSeenSyncPos = in.lastSeenSyncPos();
+
+    if (lastSeenSyncPos >= end) {
+      if (useCache) {
+        syncCache.put(split, lastSeenSyncPos);
+      }
+      more = false;
+      return more;
+    }
+    return more;
+  }
+
+  /**
+   * Return the progress within the input split.
+   *
+   * @return 0.0 to 1.0 of the input byte range
+   */
+  public float getProgress() throws IOException {
+    if (end == start) {
+      return 0.0f;
+    } else {
+      return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start));
+    }
+  }
+
+  public long getPos() throws IOException {
+    return in.getPosition();
+  }
+
+  public KeyBuffer getKeyBuffer() {
+    return in.getCurrentKeyBufferObj();
+  }
+
+  protected void seek(long pos) throws IOException {
+    in.seek(pos);
+  }
+
+  public void sync(long pos) throws IOException {
+    in.sync(pos);
+  }
+
+  public void resetBuffer() {
+    in.resetBuffer();
+  }
+
+  public long getStart() {
+    return start;
+  }
+
+  public void close() throws IOException {
+    in.close();
+  }
+}

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Sun May 12 19:23:55 2013
@@ -78,7 +78,7 @@ public class OrcSerde implements SerDe, 
     // Parse the configuration parameters
     ArrayList<String> columnNames = new ArrayList<String>();
     if (columnNameProperty != null && columnNameProperty.length() > 0) {
-      for(String name: columnNameProperty.split(",")) {
+      for (String name : columnNameProperty.split(",")) {
         columnNames.add(name);
       }
     }
@@ -95,7 +95,7 @@ public class OrcSerde implements SerDe, 
     }
 
     ArrayList<TypeInfo> fieldTypes =
-      TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+        TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
     StructTypeInfo rootType = new StructTypeInfo();
     rootType.setAllStructFieldNames(columnNames);
     rootType.setAllStructFieldTypeInfos(fieldTypes);
@@ -127,6 +127,7 @@ public class OrcSerde implements SerDe, 
   /**
    * Always returns null, since serialized size doesn't make sense in the
    * context of ORC files.
+   *
    * @return null
    */
   @Override
@@ -144,8 +145,8 @@ public class OrcSerde implements SerDe, 
   }
 
   @Override
-  public VectorizedRowBatch deserializeVector(Object rowBlob, ObjectInspector objInspector,
-      VectorizedRowBatch reuseBatch) throws SerDeException {
-    return ((VectorizedRowBatch) rowBlob);
+  public void deserializeVector(Object rowBlob, int rowsInBatch, VectorizedRowBatch reuseBatch)
+      throws SerDeException {
+    // nothing to do here
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Sun May 12 19:23:55 2013
@@ -80,7 +80,7 @@ public class VectorizedOrcInputFormat ex
       }
       reader.nextBatch(value);
       try {
-        rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value);
+        rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value.size, value);
       } catch (SerDeException e) {
         new RuntimeException(e);
       }

Modified: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java (original)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java Sun May 12 19:23:55 2013
@@ -46,7 +46,9 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.junit.Before;
 import org.junit.Test;
@@ -147,7 +149,7 @@ public class TestVectorizedRowBatchCtx {
         bytes.set(4, cu);
 
         cu = new BytesRefWritable(("Test string").getBytes("UTF-8"), 0,
-            ("NULL").getBytes("UTF-8").length);
+            ("Test string").getBytes("UTF-8").length);
         bytes.set(5, cu);
       }
       writer.append(bytes);
@@ -169,7 +171,7 @@ public class TestVectorizedRowBatchCtx {
     // Create the context
     VectorizedRowBatchCtx ctx = new VectorizedRowBatchCtx(oi, oi, serDe, null);
     VectorizedRowBatch batch = ctx.CreateVectorizedRowBatch();
-    ctx.SetNoNullFields(true, batch);
+    VectorizedBatchUtil.SetNoNullFields(true, batch);
 
     // Iterate thru the rows and populate the batch
     LongWritable rowID = new LongWritable();
@@ -272,9 +274,19 @@ public class TestVectorizedRowBatchCtx {
 
   @Test
   public void TestCtx() throws Exception {
+
       InitSerde();
       WriteRCFile(this.fs, this.testFilePath, this.conf);
       VectorizedRowBatch batch = GetRowBatch();
       ValidateRowBatch(batch);
+
+      // Test VectorizedColumnarSerDe
+      VectorizedColumnarSerDe vcs = new VectorizedColumnarSerDe();
+      vcs.initialize(this.conf, tbl);
+      Writable w = vcs.serializeVector(batch, (StructObjectInspector) serDe
+          .getObjectInspector());
+      BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[])((ObjectWritable)w).get();
+      vcs.deserializeVector(refArray, 10, batch);
+      ValidateRowBatch(batch);
   }
 }

Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java Sun May 12 19:23:55 2013
@@ -26,11 +26,9 @@ import java.util.Properties;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.serde2.ByteStream;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
@@ -72,13 +70,14 @@ public class ColumnarSerDe extends Colum
   public ColumnarSerDe() throws SerDeException {
   }
 
-  SerDeParameters serdeParams = null;
+  protected SerDeParameters serdeParams = null;
 
   /**
    * Initialize the SerDe given the parameters.
    *
    * @see SerDe#initialize(Configuration, Properties)
    */
+  @Override
   public void initialize(Configuration job, Properties tbl) throws SerDeException {
 
     serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, getClass().getName());
@@ -114,6 +113,7 @@ public class ColumnarSerDe extends Colum
    * @return The serialized Writable object
    * @see SerDe#serialize(Object, ObjectInspector)
    */
+  @Override
   public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
 
     if (objInspector.getCategory() != Category.STRUCT) {

Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java?rev=1481626&r1=1481625&r2=1481626&view=diff
==============================================================================
--- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java (original)
+++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java Sun May 12 19:23:55 2013
@@ -27,18 +27,18 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Properties;
 
-import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
@@ -141,7 +141,7 @@ public final class LazyUtils {
    *          if escaped, whether a specific character needs escaping. This
    *          array should have size of 128.
    */
-  private static void writeEscaped(OutputStream out, byte[] bytes, int start,
+  public static void writeEscaped(OutputStream out, byte[] bytes, int start,
       int len, boolean escaped, byte escapeChar, boolean[] needsEscape)
       throws IOException {
     if (escaped) {