You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/05/10 10:24:20 UTC
svn commit: r1480929 - in
/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql:
exec/vector/VectorizedRowBatchCtx.java exec/vector/VectorizedSerde.java
io/orc/OrcSerde.java io/orc/VectorizedOrcInputFormat.java
Author: hashutosh
Date: Fri May 10 08:24:19 2013
New Revision: 1480929
URL: http://svn.apache.org/r1480929
Log:
HIVE-4529 : Add partition support for vectorized ORC Input format (Sarvesh Sakalanaga via Ashutosh Chauhan)
Modified:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1480929&r1=1480928&r2=1480929&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Fri May 10 08:24:19 2013
@@ -50,8 +50,10 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
/**
- * Context for Vectorized row batch. this calss does eager deserialization of row data using serde in the RecordReader layer.
- * It has supports partitions in this layer so that the vectorized batch is populated correctly with the partition column.
+ * Context for Vectorized row batch. this calss does eager deserialization of row data using serde
+ * in the RecordReader layer.
+ * It has supports partitions in this layer so that the vectorized batch is populated correctly with
+ * the partition column.
* VectorizedRowBatchCtx.
*
*/
@@ -356,10 +358,22 @@ public class VectorizedRowBatchCtx {
}
}
- public void ConvertRowBatchBlobToVectorizedBatch(Writable[] rowBlobs, VectorizedRowBatch batch) {
- // No reader supports this operation. If a reader returns a set of rows then
- // this function can be used to converts that row blob batch into vectorized batch.
- throw new UnsupportedOperationException();
+ /**
+ * Deserialized set of rows and populates the batch
+ * @param rowBlob to deserialize
+ * @param batch Vectorized row batch which contains deserialized data
+ * @throws SerDeException
+ */
+ public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, VectorizedRowBatch batch)
+ throws SerDeException {
+
+ if (deserializer instanceof VectorizedSerde) {
+ batch = ((VectorizedSerde) deserializer).deserializeVector(rowBlob,
+ deserializer.getObjectInspector(), batch);
+ } else {
+ throw new SerDeException(
+ "Not able to deserialize row batch. Serde does not implement VectorizedSerde");
+ }
}
private int GetColIndexBasedOnColName(String colName) throws HiveException
@@ -384,14 +398,16 @@ public class VectorizedRowBatchCtx {
int colIndex;
String value;
BytesColumnVector bcv;
- for (String key : partitionValues.keySet()) {
- colIndex = GetColIndexBasedOnColName(key);
- value = partitionValues.get(key);
- bcv = (BytesColumnVector) batch.cols[colIndex];
- bcv.setRef(0, value.getBytes(), 0, value.length());
- bcv.isRepeating = true;
- bcv.isNull[0] = false;
- bcv.noNulls = true;
+ if (partitionValues != null) {
+ for (String key : partitionValues.keySet()) {
+ colIndex = GetColIndexBasedOnColName(key);
+ value = partitionValues.get(key);
+ bcv = (BytesColumnVector) batch.cols[colIndex];
+ bcv.setRef(0, value.getBytes(), 0, value.length());
+ bcv.isRepeating = true;
+ bcv.isNull[0] = false;
+ bcv.noNulls = true;
+ }
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java?rev=1480929&r1=1480928&r2=1480929&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java Fri May 10 08:24:19 2013
@@ -27,4 +27,7 @@ public interface VectorizedSerde {
Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
throws SerDeException;
+ VectorizedRowBatch deserializeVector(Object rowBlob, ObjectInspector objInspector,
+ VectorizedRowBatch reuseBatch)
+ throws SerDeException;
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1480929&r1=1480928&r2=1480929&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Fri May 10 08:24:19 2013
@@ -142,4 +142,10 @@ public class OrcSerde implements SerDe,
}
return vos.serialize(vrg, objInspector);
}
+
+ @Override
+ public VectorizedRowBatch deserializeVector(Object rowBlob, ObjectInspector objInspector,
+ VectorizedRowBatch reuseBatch) throws SerDeException {
+ return ((VectorizedRowBatch) rowBlob);
+ }
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1480929&r1=1480928&r2=1480929&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Fri May 10 08:24:19 2013
@@ -29,8 +29,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
@@ -50,24 +53,23 @@ public class VectorizedOrcInputFormat ex
private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
private final long offset;
private final long length;
- private final int numColumns;
private float progress = 0.0f;
- private final OrcStruct rowObj;
- private final List<OrcProto.Type> types;
+ private VectorizedRowBatchCtx rbCtx;
VectorizedOrcRecordReader(Reader file, Configuration conf,
- long offset, long length) throws IOException {
+ FileSplit fileSplit) throws IOException {
+
+ this.offset = fileSplit.getStart();
+ this.length = fileSplit.getLength();
this.reader = file.rows(offset, length,
findIncludedColumns(file.getTypes(), conf));
- types = file.getTypes();
- if (types.size() == 0) {
- numColumns = 0;
- } else {
- numColumns = types.get(0).getSubtypesCount();
- }
- this.offset = offset;
- this.length = length;
- rowObj = new OrcStruct(numColumns);
+
+ try {
+ rbCtx = new VectorizedRowBatchCtx();
+ rbCtx.Init(conf, fileSplit);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override
@@ -77,6 +79,11 @@ public class VectorizedOrcInputFormat ex
return false;
}
reader.nextBatch(value);
+ try {
+ rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value);
+ } catch (SerDeException e) {
+ new RuntimeException(e);
+ }
progress = reader.getProgress();
return true;
}
@@ -88,8 +95,17 @@ public class VectorizedOrcInputFormat ex
@Override
public VectorizedRowBatch createValue() {
- return new VectorizedRowBatch(numColumns,
- VectorizedRowBatch.DEFAULT_SIZE);
+ VectorizedRowBatch result = null;
+ try {
+ result = rbCtx.CreateVectorizedRowBatch();
+ // Since the record reader works only on one split and
+ // given a split the partition cannot change, we are setting the partition
+ // values only once during batch creation
+ rbCtx.AddPartitionColsToBatch(result);
+ } catch (HiveException e) {
+ new RuntimeException("Error creating a batch", e);
+ }
+ return result;
}
@Override
@@ -171,8 +187,7 @@ public class VectorizedOrcInputFormat ex
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
reporter.setStatus(fileSplit.toString());
- return new VectorizedOrcRecordReader(OrcFile.createReader(fs, path), conf,
- fileSplit.getStart(), fileSplit.getLength());
+ return new VectorizedOrcRecordReader(OrcFile.createReader(fs, path), conf, fileSplit);
}
@Override