You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/30 20:32:06 UTC
svn commit: r1477757 - in /hive/branches/vectorization/ql/src:
java/org/apache/hadoop/hive/ql/exec/vector/
java/org/apache/hadoop/hive/ql/io/orc/ test/org/apache/hadoop/hive/ql/io/orc/
Author: hashutosh
Date: Tue Apr 30 18:32:06 2013
New Revision: 1477757
URL: http://svn.apache.org/r1477757
Log:
HIVE-4370 : Change ORC tree readers to return batches of rows instead of a row (Sarvesh Sakalanaga via Ashutosh Chauhan)
Added:
hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
Modified:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java?rev=1477757&r1=1477756&r2=1477757&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java Tue Apr 30 18:32:06 2013
@@ -18,16 +18,17 @@
package org.apache.hadoop.hive.ql.exec.vector;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
/**
- * This class supports string and binary data by value reference -- i.e. each field is
+ * This class supports string and binary data by value reference -- i.e. each field is
* explicitly present, as opposed to provided by a dictionary reference.
* In some cases, all the values will be in the same byte array to begin with,
- * but this need not be the case. If each value is in a separate byte
+ * but this need not be the case. If each value is in a separate byte
* array to start with, or not all of the values are in the same original
* byte array, you can still assign data by reference into this column vector.
- * This gives flexibility to use this in multiple situations.
+ * This gives flexibility to use this in multiple situations.
* <p>
* When setting data by reference, the caller
* is responsible for allocating the byte arrays used to hold the data.
@@ -36,23 +37,23 @@ import org.apache.hadoop.io.Writable;
* though that use is probably not typical.
*/
public class BytesColumnVector extends ColumnVector {
- public byte[][] vector;
+ public byte[][] vector;
public int[] start; // start offset of each field
-
+
/*
- * The length of each field. If the value repeats for every entry, then it is stored
+ * The length of each field. If the value repeats for every entry, then it is stored
* in vector[0] and isRepeating from the superclass is set to true.
*/
- public int[] length;
+ public int[] length;
private byte[] buffer; // optional buffer to use when actually copying in data
private int nextFree; // next free position in buffer
-
+
// Estimate that there will be 16 bytes per entry
static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE;
-
- // Proportion of extra space to provide when allocating more buffer space.
+
+ // Proportion of extra space to provide when allocating more buffer space.
static final float EXTRA_SPACE_FACTOR = (float) 1.2;
-
+
/**
* Use this constructor for normal operation.
* All column vectors should be the default size normally.
@@ -60,21 +61,21 @@ public class BytesColumnVector extends C
public BytesColumnVector() {
this(VectorizedRowBatch.DEFAULT_SIZE);
}
-
+
/**
* Don't call this constructor except for testing purposes.
- *
+ *
* @param size number of elements in the column vector
*/
public BytesColumnVector(int size) {
super(size);
vector = new byte[size][];
start = new int[size];
- length = new int[size];
+ length = new int[size];
}
-
+
/** Set a field by reference.
- *
+ *
* @param elementNum index within column vector to set
* @param sourceBuf container of source data
* @param start start byte position within source
@@ -85,37 +86,37 @@ public class BytesColumnVector extends C
this.start[elementNum] = start;
this.length[elementNum] = length;
}
-
- /**
+
+ /**
* You must call initBuffer first before using setVal().
* Provide the estimated number of bytes needed to hold
* a full column vector worth of byte string data.
- *
+ *
* @param estimatedValueSize Estimated size of buffer space needed
*/
public void initBuffer(int estimatedValueSize) {
nextFree = 0;
-
+
// if buffer is already allocated, keep using it, don't re-allocate
if (buffer != null) {
return;
}
-
+
// allocate a little extra space to limit need to re-allocate
int bufferSize = this.vector.length * (int)(estimatedValueSize * EXTRA_SPACE_FACTOR);
if (bufferSize < DEFAULT_BUFFER_SIZE) {
bufferSize = DEFAULT_BUFFER_SIZE;
}
- buffer = new byte[bufferSize];
+ buffer = new byte[bufferSize];
}
-
+
/**
* Initialize buffer to default size.
*/
public void initBuffer() {
initBuffer(0);
}
-
+
/**
* @return amount of buffer space currently allocated
*/
@@ -125,13 +126,13 @@ public class BytesColumnVector extends C
}
return buffer.length;
}
-
+
/**
* Set a field by actually copying in to a local buffer.
* If you must actually copy data in to the array, use this method.
* DO NOT USE this method unless it's not practical to set data by reference with setRef().
* Setting data by reference tends to run a lot faster than copying data in.
- *
+ *
* @param elementNum index within column vector to set
* @param sourceBuf container of source data
* @param start start byte position within source
@@ -147,24 +148,24 @@ public class BytesColumnVector extends C
this.length[elementNum] = length;
nextFree += length;
}
-
+
/**
* Increase buffer space enough to accommodate next element.
- * This uses an exponential increase mechanism to rapidly
+ * This uses an exponential increase mechanism to rapidly
* increase buffer size to enough to hold all data.
* As batches get re-loaded, buffer space allocated will quickly
* stabilize.
- *
+ *
* @param nextElemLength size of next element to be added
*/
public void increaseBufferSpace(int nextElemLength) {
-
+
// Keep doubling buffer size until there will be enough space for next element.
- int newLength = 2 * buffer.length;
+ int newLength = 2 * buffer.length;
while((nextFree + nextElemLength) > newLength) {
newLength *= 2;
}
-
+
// Allocate new buffer, copy data to it, and set buffer to new buffer.
byte[] newBuffer = new byte[newLength];
System.arraycopy(buffer, 0, newBuffer, 0, nextFree);
@@ -173,9 +174,11 @@ public class BytesColumnVector extends C
@Override
public Writable getWritableObject(int index) {
-
- // TODO finish this
- throw new UnsupportedOperationException("unfinished");
+ Text result = null;
+ if (!isNull[index]) {
+ result = new Text();
+ result.append(vector[index], start[index], length[index]);
+ }
+ return result;
}
-
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java?rev=1477757&r1=1477756&r2=1477757&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java Tue Apr 30 18:32:06 2013
@@ -17,13 +17,13 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
-import org.apache.hadoop.io.Text;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.apache.hadoop.io.Text;
+
/**
* A class that is a growable array of bytes. Growth is managed in terms of
* chunks that are allocated when needed.
@@ -237,6 +237,7 @@ final class DynamicByteArray {
}
}
+ @Override
public String toString() {
int i;
StringBuilder sb = new StringBuilder(length * 3);
@@ -266,5 +267,30 @@ final class DynamicByteArray {
currentLength = Math.min(length, chunkSize - currentOffset);
}
}
-}
+ /**
+ * Gets all the bytes of the array.
+ *
+ * @return Bytes of the array
+ */
+ public byte[] get() {
+ byte[] result = null;
+ if (length > 0) {
+ int currentChunk = 0;
+ int currentOffset = 0;
+ int currentLength = Math.min(length, chunkSize);
+ int destOffset = 0;
+ result = new byte[length];
+ int totalLength = length;
+ while (totalLength > 0) {
+ System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
+ destOffset += currentLength;
+ totalLength -= currentLength;
+ currentChunk += 1;
+ currentOffset = 0;
+ currentLength = Math.min(totalLength, chunkSize - currentOffset);
+ }
+ }
+ return result;
+ }
+}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1477757&r1=1477756&r2=1477757&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Tue Apr 30 18:32:06 2013
@@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
/**
* A row-by-row iterator for ORC files.
*/
@@ -39,6 +41,16 @@ public interface RecordReader {
Object next(Object previous) throws IOException;
/**
+ * Read the next row batch. The size of the batch to read cannot be controlled
+ * by the callers. Caller need to look at VectorizedRowBatch.size of the retunred
+ * object to know the batch size read.
+ * @param previousBatch a row batch object that can be reused by the reader
+ * @return the row batch that was read
+ * @throws java.io.IOException
+ */
+ VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch) throws IOException;
+
+ /**
* Get the row number of the row that will be returned by the following
* call to next().
* @return the row number from 0 to the number of rows in the file
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1477757&r1=1477756&r2=1477757&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Apr 30 18:32:06 2013
@@ -30,6 +30,11 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -164,6 +169,31 @@ class RecordReaderImpl implements Record
}
return previous;
}
+ /**
+ * Populates the isNull vector array in the previousVector object based on
+ * the present stream values. This function is called from all the child
+ * readers, and they all set the values based on isNull field value.
+ * @param previousVector The columnVector object whose isNull value is populated
+ * @param batchSize Size of the column vector
+ * @return
+ * @throws IOException
+ */
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ if (present != null) {
+
+ // Set noNulls and isNull vector of the ColumnVector based on
+ // present stream
+ ColumnVector result = (ColumnVector) previousVector;
+ result.noNulls = true;
+ for (int i = 0; i < batchSize; i++) {
+ result.isNull[i] = (present.next() != 1);
+ if (result.noNulls && result.isNull[i]) {
+ result.noNulls = false;
+ }
+ }
+ }
+ return previousVector;
+ }
}
private static class BooleanTreeReader extends TreeReader{
@@ -207,6 +237,12 @@ class RecordReaderImpl implements Record
}
return result;
}
+
+ @Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation on Boolean type");
+ }
}
private static class ByteTreeReader extends TreeReader{
@@ -247,6 +283,12 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for Byte type");
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -291,6 +333,23 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ LongColumnVector result = null;
+ if (previousVector == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ reader.nextVector(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -335,6 +394,23 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ LongColumnVector result = null;
+ if (previousVector == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ reader.nextVector(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -379,6 +455,23 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ LongColumnVector result = null;
+ if (previousVector == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ reader.nextVector(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -423,6 +516,39 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ DoubleColumnVector result = null;
+ if (previousVector == null) {
+ result = new DoubleColumnVector();
+ } else {
+ result = (DoubleColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ for (int i = 0; i < batchSize; i++) {
+ if (!result.isNull[i]) {
+ result.vector[i] = SerializationUtils.readDouble(stream);
+ } else {
+
+ // If the value is not present then set NaN
+ result.vector[i] = Double.NaN;
+ }
+ }
+
+ // Set isRepeating flag
+ result.isRepeating = true;
+ for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) {
+ if (result.vector[i] != result.vector[i + 1]) {
+ result.isRepeating = false;
+ }
+ }
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(int i=0; i < items; ++i) {
@@ -471,6 +597,38 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ DoubleColumnVector result = null;
+ if (previousVector == null) {
+ result = new DoubleColumnVector();
+ } else {
+ result = (DoubleColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ for (int i = 0; i < batchSize; i++) {
+ if (!result.isNull[i]) {
+ result.vector[i] = SerializationUtils.readDouble(stream);
+ } else {
+ // If the value is not present then set NaN
+ result.vector[i] = Double.NaN;
+ }
+ }
+
+ // Set isRepeating flag
+ result.isRepeating = true;
+ for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) {
+ if (result.vector[i] != result.vector[i + 1]) {
+ result.isRepeating = false;
+ }
+ }
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
stream.skip(items * 8);
@@ -531,6 +689,12 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextBatch is not supported operation for Binary type");
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long lengthToSkip = 0;
@@ -592,6 +756,12 @@ class RecordReaderImpl implements Record
return result;
}
+ @Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for TimeStamp type");
+ }
+
private static int parseNanos(long serialized) {
int zeros = 7 & (int) serialized;
int result = (int) serialized >>> 3;
@@ -648,6 +818,12 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for Decimal type");
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(int i=0; i < items; i++) {
@@ -663,8 +839,11 @@ class RecordReaderImpl implements Record
private int[] dictionaryOffsets;
private RunLengthIntegerReader reader;
+ private final LongColumnVector scratchlcv;
+
StringTreeReader(int columnId) {
super(columnId);
+ scratchlcv = new LongColumnVector();
}
@Override
@@ -725,14 +904,7 @@ class RecordReaderImpl implements Record
result = (Text) previous;
}
int offset = dictionaryOffsets[entry];
- int length;
- // if it isn't the last entry, subtract the offsets otherwise use
- // the buffer length.
- if (entry < dictionaryOffsets.length - 1) {
- length = dictionaryOffsets[entry + 1] - offset;
- } else {
- length = dictionaryBuffer.size() - offset;
- }
+ int length = getDictionaryEntryLength(entry, offset);
// If the column is just empty strings, the size will be zero,
// so the buffer will be null, in that case just return result
// as it will default to empty
@@ -746,6 +918,62 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ BytesColumnVector result = null;
+ int offset = 0, length = 0;
+ if (previousVector == null) {
+ result = new BytesColumnVector();
+ } else {
+ result = (BytesColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ byte[] dictionaryBytes = dictionaryBuffer.get();
+
+ // Read string offsets
+ scratchlcv.isNull = result.isNull;
+ reader.nextVector(scratchlcv, batchSize);
+ if (!scratchlcv.isRepeating) {
+
+ // The vector has non-repeating strings. Iterate thru the batch
+ // and set strings one by one
+ for (int i = 0; i < batchSize; i++) {
+ if (!scratchlcv.isNull[i]) {
+ offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
+ length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
+ result.setRef(i, dictionaryBytes, offset, length);
+ } else {
+ // If the value is null then set offset and length to zero (null string)
+ result.setRef(i, dictionaryBytes, 0, 0);
+ }
+ }
+ } else {
+ // If the value is repeating then just set the first value in the
+ // vector and set the isRepeating flag to true. No need to iterate thru and
+ // set all the elements to the same value
+ offset = dictionaryOffsets[(int) scratchlcv.vector[0]];
+ length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset);
+ result.setRef(0, dictionaryBytes, offset, length);
+ }
+ result.isRepeating = scratchlcv.isRepeating;
+ return result;
+ }
+
+ int getDictionaryEntryLength(int entry, int offset) {
+ int length = 0;
+ // if it isn't the last entry, subtract the offsets otherwise use
+ // the buffer length.
+ if (entry < dictionaryOffsets.length - 1) {
+ length = dictionaryOffsets[entry + 1] - offset;
+ } else {
+ length = dictionaryBuffer.size() - offset;
+ }
+ return length;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -807,6 +1035,28 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ ColumnVector[] result = null;
+ if (previousVector == null) {
+ result = new ColumnVector[fields.length];
+ } else {
+ result = (ColumnVector[]) previousVector;
+ }
+
+ // Read all the members of struct as column vectors
+ for (int i = 0; i < fields.length; i++) {
+ if (fields[i] != null) {
+ if (result[i] == null) {
+ result[i] = (ColumnVector) fields[i].nextVector(null, batchSize);
+ } else {
+ fields[i].nextVector(result[i], batchSize);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
@@ -874,6 +1124,12 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for Union type");
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
@@ -950,6 +1206,12 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previous, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for List type");
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
@@ -1027,6 +1289,12 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previous, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for Map type");
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
@@ -1215,6 +1483,29 @@ class RecordReaderImpl implements Record
}
@Override
+ public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException {
+ VectorizedRowBatch result = null;
+ if (rowInStripe >= rowCountInStripe) {
+ currentStripe += 1;
+ readStripe();
+ }
+
+ long batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (rowCountInStripe - rowInStripe));
+ rowInStripe += batchSize;
+ if (previous == null) {
+ ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);
+ result = new VectorizedRowBatch(cols.length);
+ result.cols = cols;
+ } else {
+ result = (VectorizedRowBatch) previous;
+ reader.nextVector(result.cols, (int) batchSize);
+ }
+
+ result.size = (int) batchSize;
+ return result;
+ }
+
+ @Override
public void close() throws IOException {
file.close();
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java?rev=1477757&r1=1477756&r2=1477757&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java Tue Apr 30 18:32:06 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.EOFException;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
/**
* A reader that reads a sequence of integers.
* */
@@ -88,6 +90,24 @@ class RunLengthIntegerReader {
return result;
}
+ void nextVector(LongColumnVector previous, long previousLen)
+ throws IOException {
+ previous.isRepeating = true;
+ for (int i = 0; i < previousLen; i++) {
+ if (!previous.isNull[i]) {
+ previous.vector[i] = next();
+ } else {
+ // The default value of null for int type in vectorized
+ // processing is 1, so set that if the value is null
+ previous.vector[i] = 1;
+ }
+ if (previous.isRepeating && (delta != 0 || !repeat)) {
+ previous.isRepeating = false;
+ }
+ }
+ }
+
+
void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
Added: hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java?rev=1477757&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java (added)
+++ hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java Tue Apr 30 18:32:06 2013
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.File;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.Writable;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+*
+* Class that tests ORC reader vectorization by comparing records that are
+* returned by "row by row" reader with batch reader.
+*
+*/
+public class TestVectorizedORCReader {
+
+ private Configuration conf;
+ private FileSystem fs;
+ private Path testFilePath;
+
+ @Before
+ public void openFileSystem() throws Exception {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+ Path workDir = new Path(System.getProperty("test.tmp.dir",
+ "target" + File.separator + "test" + File.separator + "tmp"));
+ fs.setWorkingDirectory(workDir);
+ testFilePath = new Path("TestVectorizedORCReader.testDump.orc");
+ fs.delete(testFilePath, false);
+ }
+
+ static class MyRecord {
+ private final Integer i;
+ private final Long l;
+ private final Short s;
+ private final Double d;
+ private final String k;
+
+ MyRecord(Integer i, Long l, Short s, Double d, String k) {
+ this.i = i;
+ this.l = l;
+ this.s = s;
+ this.d = d;
+ this.k = k;
+ }
+ }
+
+ @Test
+ public void createFile() throws Exception {
+ ObjectInspector inspector;
+ synchronized (TestVectorizedORCReader.class) {
+ inspector = ObjectInspectorFactory.getReflectionObjectInspector
+ (MyRecord.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
+ }
+
+ Writer writer = OrcFile.createWriter(fs, testFilePath, conf, inspector,
+ 100000, CompressionKind.ZLIB, 10000, 10000);
+ Random r1 = new Random(1);
+ String[] words = new String[] {"It", "was", "the", "best", "of", "times,",
+ "it", "was", "the", "worst", "of", "times,", "it", "was", "the", "age",
+ "of", "wisdom,", "it", "was", "the", "age", "of", "foolishness,", "it",
+ "was", "the", "epoch", "of", "belief,", "it", "was", "the", "epoch",
+ "of", "incredulity,", "it", "was", "the", "season", "of", "Light,",
+ "it", "was", "the", "season", "of", "Darkness,", "it", "was", "the",
+ "spring", "of", "hope,", "it", "was", "the", "winter", "of", "despair,",
+ "we", "had", "everything", "before", "us,", "we", "had", "nothing",
+ "before", "us,", "we", "were", "all", "going", "direct", "to",
+ "Heaven,", "we", "were", "all", "going", "direct", "the", "other",
+ "way"};
+ for (int i = 0; i < 21000; ++i) {
+ if ((i % 3) != 0) {
+ writer.addRow(new MyRecord(i, (long) 200, (short) (300 + i), (double) (400 + i),
+ words[r1.nextInt(words.length)]));
+ } else {
+ writer.addRow(new MyRecord(i, (long) 200, null, null, null));
+ }
+ }
+ writer.close();
+ checkVectorizedReader();
+ }
+
+ private void checkVectorizedReader() throws Exception {
+
+ Reader vreader = OrcFile.createReader(testFilePath.getFileSystem(conf), testFilePath);
+ Reader reader = OrcFile.createReader(testFilePath.getFileSystem(conf), testFilePath);
+ RecordReaderImpl vrr = (RecordReaderImpl) vreader.rows(null);
+ RecordReaderImpl rr = (RecordReaderImpl) reader.rows(null);
+ VectorizedRowBatch batch = null;
+ OrcStruct row = null;
+
+ // Check Vectorized ORC reader against ORC row reader
+ while (vrr.hasNext()) {
+ batch = vrr.nextBatch(batch);
+ for (int i = 0; i < batch.size; i++) {
+ row = (OrcStruct) rr.next((Object) row);
+ for (int j = 0; j < batch.cols.length; j++) {
+ Object a = ((Writable) row.getFieldValue(j));
+ Object b = batch.cols[j].getWritableObject(i);
+ if (null == a) {
+ Assert.assertEquals(true, (b == null));
+ } else {
+ Assert.assertEquals(true, b.toString().equals(a.toString()));
+ }
+ }
+ }
+
+ // Check repeating
+ Assert.assertEquals(false, batch.cols[0].isRepeating);
+ Assert.assertEquals(true, batch.cols[1].isRepeating);
+ Assert.assertEquals(false, batch.cols[2].isRepeating);
+ Assert.assertEquals(false, batch.cols[3].isRepeating);
+ Assert.assertEquals(false, batch.cols[4].isRepeating);
+
+ // Check non null
+ Assert.assertEquals(true, batch.cols[0].noNulls);
+ Assert.assertEquals(true, batch.cols[1].noNulls);
+ Assert.assertEquals(false, batch.cols[2].noNulls);
+ Assert.assertEquals(false, batch.cols[3].noNulls);
+ Assert.assertEquals(false, batch.cols[4].noNulls);
+ }
+ Assert.assertEquals(false, rr.hasNext());
+ }
+}