You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/30 20:32:06 UTC

svn commit: r1477757 - in /hive/branches/vectorization/ql/src: java/org/apache/hadoop/hive/ql/exec/vector/ java/org/apache/hadoop/hive/ql/io/orc/ test/org/apache/hadoop/hive/ql/io/orc/

Author: hashutosh
Date: Tue Apr 30 18:32:06 2013
New Revision: 1477757

URL: http://svn.apache.org/r1477757
Log:
HIVE-4370 : Change ORC tree readers to return batches of rows instead of a row  (Sarvesh Sakalanaga via Ashutosh Chauhan)

Added:
    hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
Modified:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java?rev=1477757&r1=1477756&r2=1477757&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java Tue Apr 30 18:32:06 2013
@@ -18,16 +18,17 @@
 
 package org.apache.hadoop.hive.ql.exec.vector;
 
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
 /**
- * This class supports string and binary data by value reference -- i.e. each field is 
+ * This class supports string and binary data by value reference -- i.e. each field is
  * explicitly present, as opposed to provided by a dictionary reference.
  * In some cases, all the values will be in the same byte array to begin with,
- * but this need not be the case. If each value is in a separate byte 
+ * but this need not be the case. If each value is in a separate byte
  * array to start with, or not all of the values are in the same original
  * byte array, you can still assign data by reference into this column vector.
- * This gives flexibility to use this in multiple situations. 
+ * This gives flexibility to use this in multiple situations.
  * <p>
  * When setting data by reference, the caller
  * is responsible for allocating the byte arrays used to hold the data.
@@ -36,23 +37,23 @@ import org.apache.hadoop.io.Writable;
  * though that use is probably not typical.
  */
 public class BytesColumnVector extends ColumnVector {
-  public byte[][] vector; 
+  public byte[][] vector;
   public int[] start;          // start offset of each field
-  
+
   /*
-   * The length of each field. If the value repeats for every entry, then it is stored 
+   * The length of each field. If the value repeats for every entry, then it is stored
    * in vector[0] and isRepeating from the superclass is set to true.
    */
-  public int[] length; 
+  public int[] length;
   private byte[] buffer;   // optional buffer to use when actually copying in data
   private int nextFree;    // next free position in buffer
-  
+
   // Estimate that there will be 16 bytes per entry
   static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
-  
-  // Proportion of extra space to provide when allocating more buffer space. 
+
+  // Proportion of extra space to provide when allocating more buffer space.
   static final float EXTRA_SPACE_FACTOR = (float) 1.2;
-  
+
   /**
    * Use this constructor for normal operation.
    * All column vectors should be the default size normally.
@@ -60,21 +61,21 @@ public class BytesColumnVector extends C
   public BytesColumnVector() {
     this(VectorizedRowBatch.DEFAULT_SIZE);
   }
-  
+
   /**
    * Don't call this constructor except for testing purposes.
-   * 
+   *
    * @param size  number of elements in the column vector
    */
   public BytesColumnVector(int size) {
     super(size);
     vector = new byte[size][];
     start = new int[size];
-    length = new int[size]; 
+    length = new int[size];
   }
-  
+
   /** Set a field by reference.
-   *  
+   *
    * @param elementNum index within column vector to set
    * @param sourceBuf container of source data
    * @param start start byte position within source
@@ -85,37 +86,37 @@ public class BytesColumnVector extends C
     this.start[elementNum] = start;
     this.length[elementNum] = length;
   }
-  
-  /** 
+
+  /**
    * You must call initBuffer first before using setVal().
    * Provide the estimated number of bytes needed to hold
    * a full column vector worth of byte string data.
-   * 
+   *
    * @param estimatedValueSize  Estimated size of buffer space needed
    */
   public void initBuffer(int estimatedValueSize) {
     nextFree = 0;
-    
+
     // if buffer is already allocated, keep using it, don't re-allocate
     if (buffer != null) {
       return;
     }
-    
+
     // allocate a little extra space to limit need to re-allocate
     int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
     if (bufferSize < DEFAULT_BUFFER_SIZE) {
       bufferSize = DEFAULT_BUFFER_SIZE;
     }
-    buffer = new byte[bufferSize]; 
+    buffer = new byte[bufferSize];
   }
-  
+
   /**
    * Initialize buffer to default size.
    */
   public void initBuffer() {
     initBuffer(0);
   }
-  
+
   /**
    * @return amount of buffer space currently allocated
    */
@@ -125,13 +126,13 @@ public class BytesColumnVector extends C
     }
     return buffer.length;
   }
-  
+
   /**
    * Set a field by actually copying in to a local buffer.
    * If you must actually copy data in to the array, use this method.
    * DO NOT USE this method unless it's not practical to set data by reference with setRef().
    * Setting data by reference tends to run a lot faster than copying data in.
-   * 
+   *
    * @param elementNum index within column vector to set
    * @param sourceBuf container of source data
    * @param start start byte position within source
@@ -147,24 +148,24 @@ public class BytesColumnVector extends C
     this.length[elementNum] = length;
     nextFree += length;
   }
-  
+
   /**
    * Increase buffer space enough to accommodate next element.
-   * This uses an exponential increase mechanism to rapidly 
+   * This uses an exponential increase mechanism to rapidly
    * increase buffer size to enough to hold all data.
    * As batches get re-loaded, buffer space allocated will quickly
    * stabilize.
-   * 
+   *
    * @param nextElemLength size of next element to be added
    */
   public void increaseBufferSpace(int nextElemLength) {
-    
+
     // Keep doubling buffer size until there will be enough space for next element.
-    int newLength = 2 * buffer.length; 
+    int newLength = 2 * buffer.length;
     while((nextFree + nextElemLength) > newLength) {
       newLength *= 2;
     }
-    
+
     // Allocate new buffer, copy data to it, and set buffer to new buffer.
     byte[] newBuffer = new byte[newLength];
     System.arraycopy(buffer, 0, newBuffer, 0, nextFree);
@@ -173,9 +174,11 @@ public class BytesColumnVector extends C
 
   @Override
   public Writable getWritableObject(int index) {
-    
-    // TODO finish this
-    throw new UnsupportedOperationException("unfinished");
+    Text result = null;
+    if (!isNull[index]) {
+      result = new Text();
+      result.append(vector[index], start[index], length[index]);
+    }
+    return result;
   }
-  
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java?rev=1477757&r1=1477756&r2=1477757&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java Tue Apr 30 18:32:06 2013
@@ -17,13 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
-import org.apache.hadoop.io.Text;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.io.Text;
+
 /**
  * A class that is a growable array of bytes. Growth is managed in terms of
  * chunks that are allocated when needed.
@@ -237,6 +237,7 @@ final class DynamicByteArray {
     }
   }
 
+  @Override
   public String toString() {
     int i;
     StringBuilder sb = new StringBuilder(length * 3);
@@ -266,5 +267,30 @@ final class DynamicByteArray {
       currentLength = Math.min(length, chunkSize - currentOffset);
     }
   }
-}
 
+  /**
+   * Gets all the bytes of the array.
+   *
+   * @return Bytes of the array
+   */
+  public byte[] get() {
+    byte[] result = null;
+    if (length > 0) {
+      int currentChunk = 0;
+      int currentOffset = 0;
+      int currentLength = Math.min(length, chunkSize);
+      int destOffset = 0;
+      result = new byte[length];
+      int totalLength = length;
+      while (totalLength > 0) {
+        System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
+        destOffset += currentLength;
+        totalLength -= currentLength;
+        currentChunk += 1;
+        currentOffset = 0;
+        currentLength = Math.min(totalLength, chunkSize - currentOffset);
+      }
+    }
+    return result;
+  }
+}

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1477757&r1=1477756&r2=1477757&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Tue Apr 30 18:32:06 2013
@@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.io.orc
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
 /**
  * A row-by-row iterator for ORC files.
  */
@@ -39,6 +41,16 @@ public interface RecordReader {
   Object next(Object previous) throws IOException;
 
   /**
+   * Read the next row batch. The size of the batch to read cannot be controlled
+   * by the callers. Caller need to look at VectorizedRowBatch.size of the retunred
+   * object to know the batch size read.
+   * @param previousBatch a row batch object that can be reused by the reader
+   * @return the row batch that was read
+   * @throws java.io.IOException
+   */
+  VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch) throws IOException;
+
+  /**
    * Get the row number of the row that will be returned by the following
    * call to next().
    * @return the row number from 0 to the number of rows in the file

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1477757&r1=1477756&r2=1477757&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Apr 30 18:32:06 2013
@@ -30,6 +30,11 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -164,6 +169,31 @@ class RecordReaderImpl implements Record
       }
       return previous;
     }
+    /**
+     * Populates the isNull vector array in the previousVector object based on
+     * the present stream values. This function is called from all the child
+     * readers, and they all set the values based on isNull field value.
+     * @param previousVector The columnVector object whose isNull value is populated
+     * @param batchSize Size of the column vector
+     * @return
+     * @throws IOException
+     */
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      if (present != null) {
+
+        // Set noNulls and isNull vector of the ColumnVector based on
+        // present stream
+        ColumnVector result = (ColumnVector) previousVector;
+        result.noNulls = true;
+        for (int i = 0; i < batchSize; i++) {
+          result.isNull[i] = (present.next() != 1);
+          if (result.noNulls && result.isNull[i]) {
+            result.noNulls = false;
+          }
+        }
+      }
+      return previousVector;
+    }
   }
 
   private static class BooleanTreeReader extends TreeReader{
@@ -207,6 +237,12 @@ class RecordReaderImpl implements Record
       }
       return result;
     }
+
+    @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      throw new UnsupportedOperationException(
+          "NextVector is not supported operation on Boolean type");
+    }
   }
 
   private static class ByteTreeReader extends TreeReader{
@@ -247,6 +283,12 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      throw new UnsupportedOperationException(
+          "NextVector is not supported operation for Byte type");
+    }
+
+    @Override
     void skipRows(long items) throws IOException {
       reader.skip(countNonNulls(items));
     }
@@ -291,6 +333,23 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      LongColumnVector result = null;
+      if (previousVector == null) {
+        result = new LongColumnVector();
+      } else {
+        result = (LongColumnVector) previousVector;
+      }
+
+      // Read present/isNull stream
+      super.nextVector(result, batchSize);
+
+      // Read value entries based on isNull entries
+      reader.nextVector(result, batchSize);
+      return result;
+    }
+
+    @Override
     void skipRows(long items) throws IOException {
       reader.skip(countNonNulls(items));
     }
@@ -335,6 +394,23 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      LongColumnVector result = null;
+      if (previousVector == null) {
+        result = new LongColumnVector();
+      } else {
+        result = (LongColumnVector) previousVector;
+      }
+
+      // Read present/isNull stream
+      super.nextVector(result, batchSize);
+
+      // Read value entries based on isNull entries
+      reader.nextVector(result, batchSize);
+      return result;
+    }
+
+    @Override
     void skipRows(long items) throws IOException {
       reader.skip(countNonNulls(items));
     }
@@ -379,6 +455,23 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      LongColumnVector result = null;
+      if (previousVector == null) {
+        result = new LongColumnVector();
+      } else {
+        result = (LongColumnVector) previousVector;
+      }
+
+      // Read present/isNull stream
+      super.nextVector(result, batchSize);
+
+      // Read value entries based on isNull entries
+      reader.nextVector(result, batchSize);
+      return result;
+    }
+
+    @Override
     void skipRows(long items) throws IOException {
       reader.skip(countNonNulls(items));
     }
@@ -423,6 +516,39 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      DoubleColumnVector result = null;
+      if (previousVector == null) {
+        result = new DoubleColumnVector();
+      } else {
+        result = (DoubleColumnVector) previousVector;
+      }
+
+      // Read present/isNull stream
+      super.nextVector(result, batchSize);
+
+      // Read value entries based on isNull entries
+      for (int i = 0; i < batchSize; i++) {
+        if (!result.isNull[i]) {
+          result.vector[i] = SerializationUtils.readDouble(stream);
+        } else {
+
+          // If the value is not present then set NaN
+          result.vector[i] = Double.NaN;
+        }
+      }
+
+      // Set isRepeating flag
+      result.isRepeating = true;
+      for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) {
+        if (result.vector[i] != result.vector[i + 1]) {
+          result.isRepeating = false;
+        }
+      }
+      return result;
+    }
+
+    @Override
     void skipRows(long items) throws IOException {
       items = countNonNulls(items);
       for(int i=0; i < items; ++i) {
@@ -471,6 +597,38 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      DoubleColumnVector result = null;
+      if (previousVector == null) {
+        result = new DoubleColumnVector();
+      } else {
+        result = (DoubleColumnVector) previousVector;
+      }
+
+      // Read present/isNull stream
+      super.nextVector(result, batchSize);
+
+      // Read value entries based on isNull entries
+      for (int i = 0; i < batchSize; i++) {
+        if (!result.isNull[i]) {
+          result.vector[i] = SerializationUtils.readDouble(stream);
+        } else {
+          // If the value is not present then set NaN
+          result.vector[i] = Double.NaN;
+        }
+      }
+
+      // Set isRepeating flag
+      result.isRepeating = true;
+      for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) {
+        if (result.vector[i] != result.vector[i + 1]) {
+          result.isRepeating = false;
+        }
+      }
+      return result;
+    }
+
+    @Override
     void skipRows(long items) throws IOException {
       items = countNonNulls(items);
       stream.skip(items * 8);
@@ -531,6 +689,12 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      throw new UnsupportedOperationException(
+          "NextBatch is not supported operation for Binary type");
+    }
+
+    @Override
     void skipRows(long items) throws IOException {
       items = countNonNulls(items);
       long lengthToSkip = 0;
@@ -592,6 +756,12 @@ class RecordReaderImpl implements Record
       return result;
     }
 
+    @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      throw new UnsupportedOperationException(
+          "NextVector is not supported operation for TimeStamp type");
+    }
+
     private static int parseNanos(long serialized) {
       int zeros = 7 & (int) serialized;
       int result = (int) serialized >>> 3;
@@ -648,6 +818,12 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      throw new UnsupportedOperationException(
+          "NextVector is not supported operation for Decimal type");
+    }
+
+    @Override
     void skipRows(long items) throws IOException {
       items = countNonNulls(items);
       for(int i=0; i < items; i++) {
@@ -663,8 +839,11 @@ class RecordReaderImpl implements Record
     private int[] dictionaryOffsets;
     private RunLengthIntegerReader reader;
 
+    private final LongColumnVector scratchlcv;
+
     StringTreeReader(int columnId) {
       super(columnId);
+      scratchlcv = new LongColumnVector();
     }
 
     @Override
@@ -725,14 +904,7 @@ class RecordReaderImpl implements Record
           result = (Text) previous;
         }
         int offset = dictionaryOffsets[entry];
-        int length;
-        // if it isn't the last entry, subtract the offsets otherwise use
-        // the buffer length.
-        if (entry < dictionaryOffsets.length - 1) {
-          length = dictionaryOffsets[entry + 1] - offset;
-        } else {
-          length = dictionaryBuffer.size() - offset;
-        }
+        int length = getDictionaryEntryLength(entry, offset);
         // If the column is just empty strings, the size will be zero,
         // so the buffer will be null, in that case just return result
         // as it will default to empty
@@ -746,6 +918,62 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      BytesColumnVector result = null;
+      int offset = 0, length = 0;
+      if (previousVector == null) {
+        result = new BytesColumnVector();
+      } else {
+        result = (BytesColumnVector) previousVector;
+      }
+
+      // Read present/isNull stream
+      super.nextVector(result, batchSize);
+
+      byte[] dictionaryBytes = dictionaryBuffer.get();
+
+      // Read string offsets
+      scratchlcv.isNull = result.isNull;
+      reader.nextVector(scratchlcv, batchSize);
+      if (!scratchlcv.isRepeating) {
+
+        // The vector has non-repeating strings. Iterate thru the batch
+        // and set strings one by one
+        for (int i = 0; i < batchSize; i++) {
+          if (!scratchlcv.isNull[i]) {
+            offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
+            length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
+            result.setRef(i, dictionaryBytes, offset, length);
+          } else {
+            // If the value is null then set offset and length to zero (null string)
+            result.setRef(i, dictionaryBytes, 0, 0);
+          }
+        }
+      } else {
+        // If the value is repeating then just set the first value in the
+        // vector and set the isRepeating flag to true. No need to iterate thru and
+        // set all the elements to the same value
+        offset = dictionaryOffsets[(int) scratchlcv.vector[0]];
+        length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset);
+        result.setRef(0, dictionaryBytes, offset, length);
+      }
+      result.isRepeating = scratchlcv.isRepeating;
+      return result;
+    }
+
+    int getDictionaryEntryLength(int entry, int offset) {
+      int length = 0;
+      // if it isn't the last entry, subtract the offsets otherwise use
+      // the buffer length.
+      if (entry < dictionaryOffsets.length - 1) {
+        length = dictionaryOffsets[entry + 1] - offset;
+      } else {
+        length = dictionaryBuffer.size() - offset;
+      }
+      return length;
+    }
+
+    @Override
     void skipRows(long items) throws IOException {
       reader.skip(countNonNulls(items));
     }
@@ -807,6 +1035,28 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      ColumnVector[] result = null;
+      if (previousVector == null) {
+        result = new ColumnVector[fields.length];
+      } else {
+        result = (ColumnVector[]) previousVector;
+      }
+
+      // Read all the members of struct as column vectors
+      for (int i = 0; i < fields.length; i++) {
+        if (fields[i] != null) {
+          if (result[i] == null) {
+            result[i] = (ColumnVector) fields[i].nextVector(null, batchSize);
+          } else {
+            fields[i].nextVector(result[i], batchSize);
+          }
+        }
+      }
+      return result;
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
@@ -874,6 +1124,12 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previousVector, long batchSize) throws IOException {
+      throw new UnsupportedOperationException(
+          "NextVector is not supported operation for Union type");
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                      ) throws IOException {
@@ -950,6 +1206,12 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previous, long batchSize) throws IOException {
+      throw new UnsupportedOperationException(
+          "NextVector is not supported operation for List type");
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
@@ -1027,6 +1289,12 @@ class RecordReaderImpl implements Record
     }
 
     @Override
+    Object nextVector(Object previous, long batchSize) throws IOException {
+      throw new UnsupportedOperationException(
+          "NextVector is not supported operation for Map type");
+    }
+
+    @Override
     void startStripe(Map<StreamName, InStream> streams,
                      List<OrcProto.ColumnEncoding> encodings
                     ) throws IOException {
@@ -1215,6 +1483,29 @@ class RecordReaderImpl implements Record
   }
 
   @Override
+  public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException {
+    VectorizedRowBatch result = null;
+    if (rowInStripe >= rowCountInStripe) {
+      currentStripe += 1;
+      readStripe();
+    }
+
+    long batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (rowCountInStripe - rowInStripe));
+    rowInStripe += batchSize;
+    if (previous == null) {
+      ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);
+      result = new VectorizedRowBatch(cols.length);
+      result.cols = cols;
+    } else {
+      result = (VectorizedRowBatch) previous;
+      reader.nextVector(result.cols, (int) batchSize);
+    }
+
+    result.size = (int) batchSize;
+    return result;
+  }
+
+  @Override
   public void close() throws IOException {
     file.close();
   }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java?rev=1477757&r1=1477756&r2=1477757&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java Tue Apr 30 18:32:06 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.io.orc
 import java.io.EOFException;
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
 /**
  * A reader that reads a sequence of integers.
  * */
@@ -88,6 +90,24 @@ class RunLengthIntegerReader {
     return result;
   }
 
+  void nextVector(LongColumnVector previous, long previousLen)
+      throws IOException {
+    previous.isRepeating = true;
+    for (int i = 0; i < previousLen; i++) {
+      if (!previous.isNull[i]) {
+        previous.vector[i] = next();
+      } else {
+        // The default value of null for int type in vectorized
+        // processing is 1, so set that if the value is null
+        previous.vector[i] = 1;
+      }
+      if (previous.isRepeating && (delta != 0 || !repeat)) {
+        previous.isRepeating = false;
+      }
+    }
+  }
+
+
   void seek(PositionProvider index) throws IOException {
     input.seek(index);
     int consumed = (int) index.getNext();

Added: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java?rev=1477757&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java (added)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java Tue Apr 30 18:32:06 2013
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.File;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.Writable;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+*
+* Class that tests ORC reader vectorization by comparing records that are
+* returned by "row by row" reader with batch reader.
+*
+*/
+public class TestVectorizedORCReader {
+
+  private Configuration conf;
+  private FileSystem fs;
+  private Path testFilePath;
+
+  @Before
+  public void openFileSystem() throws Exception {
+    conf = new Configuration();
+    fs = FileSystem.getLocal(conf);
+    Path workDir = new Path(System.getProperty("test.tmp.dir",
+        "target" + File.separator + "test" + File.separator + "tmp"));
+    fs.setWorkingDirectory(workDir);
+    testFilePath = new Path("TestVectorizedORCReader.testDump.orc");
+    fs.delete(testFilePath, false);
+  }
+
+  static class MyRecord {
+    private final Integer i;
+    private final Long l;
+    private final Short s;
+    private final Double d;
+    private final String k;
+
+    MyRecord(Integer i, Long l, Short s, Double d, String k) {
+      this.i = i;
+      this.l = l;
+      this.s = s;
+      this.d = d;
+      this.k = k;
+    }
+  }
+
+  @Test
+  public void createFile() throws Exception {
+    ObjectInspector inspector;
+    synchronized (TestVectorizedORCReader.class) {
+      inspector = ObjectInspectorFactory.getReflectionObjectInspector
+          (MyRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+    }
+
+    Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+        100000, CompressionKind.ZLIB, 10000, 10000);
+    Random r1 = new Random(1);
+    String[] words = new String[] {"It", "was", "the", "best", "of", "times,",
+        "it", "was", "the", "worst", "of", "times,", "it", "was", "the", "age",
+        "of", "wisdom,", "it", "was", "the", "age", "of", "foolishness,", "it",
+        "was", "the", "epoch", "of", "belief,", "it", "was", "the", "epoch",
+        "of", "incredulity,", "it", "was", "the", "season", "of", "Light,",
+        "it", "was", "the", "season", "of", "Darkness,", "it", "was", "the",
+        "spring", "of", "hope,", "it", "was", "the", "winter", "of", "despair,",
+        "we", "had", "everything", "before", "us,", "we", "had", "nothing",
+        "before", "us,", "we", "were", "all", "going", "direct", "to",
+        "Heaven,", "we", "were", "all", "going", "direct", "the", "other",
+        "way"};
+    for (int i = 0; i < 21000; ++i) {
+      if ((i % 3) != 0) {
+        writer.addRow(new MyRecord(i, (long) 200, (short) (300 + i), (double) (400 + i),
+            words[r1.nextInt(words.length)]));
+      } else {
+        writer.addRow(new MyRecord(i, (long) 200, null, null, null));
+      }
+    }
+    writer.close();
+    checkVectorizedReader();
+  }
+
+  private void checkVectorizedReader() throws Exception {
+
+    Reader vreader = OrcFile.createReader(testFilePath.getFileSystem(conf), testFilePath);
+    Reader reader = OrcFile.createReader(testFilePath.getFileSystem(conf), testFilePath);
+    RecordReaderImpl vrr = (RecordReaderImpl) vreader.rows(null);
+    RecordReaderImpl rr = (RecordReaderImpl) reader.rows(null);
+    VectorizedRowBatch batch = null;
+    OrcStruct row = null;
+
+    // Check Vectorized ORC reader against ORC row reader
+    while (vrr.hasNext()) {
+      batch = vrr.nextBatch(batch);
+      for (int i = 0; i < batch.size; i++) {
+        row = (OrcStruct) rr.next((Object) row);
+        for (int j = 0; j < batch.cols.length; j++) {
+          Object a = ((Writable) row.getFieldValue(j));
+          Object b = batch.cols[j].getWritableObject(i);
+          if (null == a) {
+            Assert.assertEquals(true, (b == null));
+          } else {
+            Assert.assertEquals(true, b.toString().equals(a.toString()));
+          }
+        }
+      }
+
+      // Check repeating
+      Assert.assertEquals(false, batch.cols[0].isRepeating);
+      Assert.assertEquals(true, batch.cols[1].isRepeating);
+      Assert.assertEquals(false, batch.cols[2].isRepeating);
+      Assert.assertEquals(false, batch.cols[3].isRepeating);
+      Assert.assertEquals(false, batch.cols[4].isRepeating);
+
+      // Check non null
+      Assert.assertEquals(true, batch.cols[0].noNulls);
+      Assert.assertEquals(true, batch.cols[1].noNulls);
+      Assert.assertEquals(false, batch.cols[2].noNulls);
+      Assert.assertEquals(false, batch.cols[3].noNulls);
+      Assert.assertEquals(false, batch.cols[4].noNulls);
+    }
+    Assert.assertEquals(false, rr.hasNext());
+  }
+}