You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/05/10 10:24:20 UTC

svn commit: r1480929 - in /hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql: exec/vector/VectorizedRowBatchCtx.java exec/vector/VectorizedSerde.java io/orc/OrcSerde.java io/orc/VectorizedOrcInputFormat.java

Author: hashutosh
Date: Fri May 10 08:24:19 2013
New Revision: 1480929

URL: http://svn.apache.org/r1480929
Log:
HIVE-4529 : Add partition support for vectorized ORC Input format (Sarvesh Sakalanaga via Ashutosh Chauhan)

Modified:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1480929&r1=1480928&r2=1480929&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Fri May 10 08:24:19 2013
@@ -50,8 +50,10 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileSplit;
 
 /**
- * Context for Vectorized row batch. this calss does eager deserialization of row data using serde in the RecordReader layer.
- * It has supports partitions in this layer so that the vectorized batch is populated correctly with the partition column.
+ * Context for Vectorized row batch. this calss does eager deserialization of row data using serde
+ * in the RecordReader layer.
+ * It has supports partitions in this layer so that the vectorized batch is populated correctly with
+ * the partition column.
  * VectorizedRowBatchCtx.
  *
  */
@@ -356,10 +358,22 @@ public class VectorizedRowBatchCtx {
     }
   }
 
-  public void ConvertRowBatchBlobToVectorizedBatch(Writable[] rowBlobs, VectorizedRowBatch batch) {
-    // No reader supports this operation. If a reader returns a set of rows then
-    // this function can be used to converts that row blob batch into vectorized batch.
-    throw new UnsupportedOperationException();
+  /**
+   * Deserialized set of rows and populates the batch
+   * @param rowBlob to deserialize
+   * @param batch Vectorized row batch which contains deserialized data
+   * @throws SerDeException
+   */
+  public void ConvertRowBatchBlobToVectorizedBatch(Object rowBlob, VectorizedRowBatch batch)
+      throws SerDeException {
+
+    if (deserializer instanceof VectorizedSerde) {
+      batch = ((VectorizedSerde) deserializer).deserializeVector(rowBlob,
+          deserializer.getObjectInspector(), batch);
+    } else {
+      throw new SerDeException(
+          "Not able to deserialize row batch. Serde does not implement VectorizedSerde");
+    }
   }
 
   private int GetColIndexBasedOnColName(String colName) throws HiveException
@@ -384,14 +398,16 @@ public class VectorizedRowBatchCtx {
     int colIndex;
     String value;
     BytesColumnVector bcv;
-    for (String key : partitionValues.keySet()) {
-      colIndex = GetColIndexBasedOnColName(key);
-      value = partitionValues.get(key);
-      bcv = (BytesColumnVector) batch.cols[colIndex];
-      bcv.setRef(0, value.getBytes(), 0, value.length());
-      bcv.isRepeating = true;
-      bcv.isNull[0] = false;
-      bcv.noNulls = true;
+    if (partitionValues != null) {
+      for (String key : partitionValues.keySet()) {
+        colIndex = GetColIndexBasedOnColName(key);
+        value = partitionValues.get(key);
+        bcv = (BytesColumnVector) batch.cols[colIndex];
+        bcv.setRef(0, value.getBytes(), 0, value.length());
+        bcv.isRepeating = true;
+        bcv.isNull[0] = false;
+        bcv.noNulls = true;
+      }
     }
   }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java?rev=1480929&r1=1480928&r2=1480929&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java Fri May 10 08:24:19 2013
@@ -27,4 +27,7 @@ public interface VectorizedSerde {
   Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
       throws SerDeException;
 
+  VectorizedRowBatch deserializeVector(Object rowBlob, ObjectInspector objInspector,
+      VectorizedRowBatch reuseBatch)
+      throws SerDeException;
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1480929&r1=1480928&r2=1480929&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Fri May 10 08:24:19 2013
@@ -142,4 +142,10 @@ public class OrcSerde implements SerDe, 
     }
     return vos.serialize(vrg, objInspector);
   }
+
+  @Override
+  public VectorizedRowBatch deserializeVector(Object rowBlob, ObjectInspector objInspector,
+      VectorizedRowBatch reuseBatch) throws SerDeException {
+    return ((VectorizedRowBatch) rowBlob);
+  }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1480929&r1=1480928&r2=1480929&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Fri May 10 08:24:19 2013
@@ -29,8 +29,11 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -50,24 +53,23 @@ public class VectorizedOrcInputFormat ex
     private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
     private final long offset;
     private final long length;
-    private final int numColumns;
     private float progress = 0.0f;
-    private final OrcStruct rowObj;
-    private final List<OrcProto.Type> types;
+    private VectorizedRowBatchCtx rbCtx;
 
     VectorizedOrcRecordReader(Reader file, Configuration conf,
-                    long offset, long length) throws IOException {
+        FileSplit fileSplit) throws IOException {
+
+      this.offset = fileSplit.getStart();
+      this.length = fileSplit.getLength();
       this.reader = file.rows(offset, length,
           findIncludedColumns(file.getTypes(), conf));
-      types = file.getTypes();
-      if (types.size() == 0) {
-        numColumns = 0;
-      } else {
-        numColumns = types.get(0).getSubtypesCount();
-      }
-      this.offset = offset;
-      this.length = length;
-      rowObj = new OrcStruct(numColumns);
+
+      try {
+        rbCtx = new VectorizedRowBatchCtx();
+        rbCtx.Init(conf, fileSplit);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
     }
 
     @Override
@@ -77,6 +79,11 @@ public class VectorizedOrcInputFormat ex
         return false;
       }
       reader.nextBatch(value);
+      try {
+        rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value);
+      } catch (SerDeException e) {
+        new RuntimeException(e);
+      }
       progress = reader.getProgress();
       return true;
     }
@@ -88,8 +95,17 @@ public class VectorizedOrcInputFormat ex
 
     @Override
     public VectorizedRowBatch createValue() {
-      return new VectorizedRowBatch(numColumns,
-          VectorizedRowBatch.DEFAULT_SIZE);
+      VectorizedRowBatch result = null;
+      try {
+        result = rbCtx.CreateVectorizedRowBatch();
+        // Since the record reader works only on one split and
+        // given a split the partition cannot change, we are setting the partition
+        // values only once during batch creation
+        rbCtx.AddPartitionColsToBatch(result);
+      } catch (HiveException e) {
+        new RuntimeException("Error creating a batch", e);
+      }
+      return result;
     }
 
     @Override
@@ -171,8 +187,7 @@ public class VectorizedOrcInputFormat ex
     Path path = fileSplit.getPath();
     FileSystem fs = path.getFileSystem(conf);
     reporter.setStatus(fileSplit.toString());
-    return new VectorizedOrcRecordReader(OrcFile.createReader(fs, path), conf,
-                               fileSplit.getStart(), fileSplit.getLength());
+    return new VectorizedOrcRecordReader(OrcFile.createReader(fs, path), conf, fileSplit);
   }
 
   @Override